Skip to content

Commit 1996f7b

Browse files
authored
Merge pull request #5984 from 317787106/feature/test_isolated3
feat(net): prefer to disconnect from broadcast nodes
2 parents ea7ef8e + 59e92c8 commit 1996f7b

File tree

3 files changed

+166
-19
lines changed

3 files changed

+166
-19
lines changed

framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.tron.core.net.service.effective;
22

33
import java.util.Comparator;
4+
import java.util.HashMap;
45
import java.util.List;
6+
import java.util.Map;
57
import java.util.Optional;
68
import java.util.Random;
79
import java.util.concurrent.ScheduledExecutorService;
@@ -29,6 +31,7 @@ public class ResilienceService {
2931
//when node is isolated, retention percent peers will not be disconnected
3032
public static final double retentionPercent = 0.8;
3133
private static final int initialDelay = 300;
34+
public static final int minBroadcastPeerSize = 3;
3235
private static final String esName = "resilience-service";
3336
private final ScheduledExecutorService executor = ExecutorServiceManager
3437
.newSingleThreadScheduledExecutor(esName);
@@ -47,7 +50,7 @@ public void init() {
4750
} catch (Exception e) {
4851
logger.error("DisconnectRandom node failed", e);
4952
}
50-
}, initialDelay, 60, TimeUnit.SECONDS);
53+
}, initialDelay, 30, TimeUnit.SECONDS);
5154
} else {
5255
logger.info("OpenFullTcpDisconnect is disabled");
5356
}
@@ -71,21 +74,50 @@ public void init() {
7174

7275
private void disconnectRandom() {
7376
int peerSize = tronNetDelegate.getActivePeer().size();
74-
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
77+
if (peerSize < CommonParameter.getInstance().getMaxConnections()) {
78+
return;
79+
}
80+
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
81+
.filter(peer -> !peer.getChannel().isTrustPeer())
82+
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
83+
.collect(Collectors.toList());
84+
85+
if (peers.size() >= minBroadcastPeerSize) {
7586
long now = System.currentTimeMillis();
76-
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
77-
.filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold)
87+
Map<Object, Integer> weights = new HashMap<>();
88+
peers.forEach(peer -> {
89+
int weight = (int) Math.ceil((double) (now - peer.getLastInteractiveTime()) / 500);
90+
weights.put(peer, Math.max(weight, 1));
91+
});
92+
WeightedRandom weightedRandom = new WeightedRandom(weights);
93+
PeerConnection one = (PeerConnection) weightedRandom.next();
94+
disconnectFromPeer(one, ReasonCode.RANDOM_ELIMINATION, DisconnectCause.RANDOM_ELIMINATION);
95+
return;
96+
}
97+
98+
int needSyncFromPeerCount = (int) tronNetDelegate.getActivePeer().stream()
99+
.filter(peer -> !peer.getChannel().isTrustPeer())
100+
.filter(PeerConnection::isNeedSyncFromPeer)
101+
.count();
102+
if (needSyncFromPeerCount >= 2) {
103+
peers = tronNetDelegate.getActivePeer().stream()
78104
.filter(peer -> !peer.getChannel().isTrustPeer())
79-
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
105+
.filter(peer -> peer.isNeedSyncFromUs() || peer.isNeedSyncFromPeer())
106+
.collect(Collectors.toList());
107+
} else {
108+
peers = tronNetDelegate.getActivePeer().stream()
109+
.filter(peer -> !peer.getChannel().isTrustPeer())
110+
.filter(PeerConnection::isNeedSyncFromUs)
80111
.collect(Collectors.toList());
81-
if (!peers.isEmpty()) {
82-
int index = new Random().nextInt(peers.size());
83-
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
84-
DisconnectCause.RANDOM_ELIMINATION);
85-
}
112+
}
113+
if (!peers.isEmpty()) {
114+
int index = new Random().nextInt(peers.size());
115+
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
116+
DisconnectCause.RANDOM_ELIMINATION);
86117
}
87118
}
88119

120+
89121
private void disconnectLan() {
90122
if (!isLanNode()) {
91123
return;
@@ -198,6 +230,32 @@ private enum DisconnectCause {
198230
ISOLATE2_PASSIVE,
199231
}
200232

233+
static class WeightedRandom {
234+
235+
private final Map<Object, Integer> weights;
236+
private final Random random;
237+
238+
public WeightedRandom(Map<Object, Integer> weights) {
239+
this.weights = weights;
240+
this.random = new Random();
241+
}
242+
243+
public Object next() {
244+
int totalWeight = 0;
245+
for (int weight : weights.values()) {
246+
totalWeight += weight;
247+
}
248+
int randomNum = random.nextInt(totalWeight);
249+
for (Object key : weights.keySet()) {
250+
randomNum -= weights.get(key);
251+
if (randomNum < 0) {
252+
return key;
253+
}
254+
}
255+
throw new IllegalStateException("Sum of weights should not be negative.");
256+
}
257+
}
258+
201259
public void close() {
202260
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
203261
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.tron.core.net.peer;
2+
3+
import static org.mockito.ArgumentMatchers.any;
4+
import static org.mockito.Mockito.spy;
5+
6+
import io.netty.channel.ChannelHandlerContext;
7+
import java.io.IOException;
8+
import java.net.InetSocketAddress;
9+
import org.junit.After;
10+
import org.junit.Assert;
11+
import org.junit.Before;
12+
import org.junit.Rule;
13+
import org.junit.Test;
14+
import org.junit.rules.TemporaryFolder;
15+
import org.mockito.Mockito;
16+
import org.tron.common.application.TronApplicationContext;
17+
import org.tron.common.utils.ReflectUtils;
18+
import org.tron.core.Constant;
19+
import org.tron.core.capsule.BlockCapsule.BlockId;
20+
import org.tron.core.config.DefaultConfig;
21+
import org.tron.core.config.Parameter.NetConstants;
22+
import org.tron.core.config.args.Args;
23+
import org.tron.p2p.connection.Channel;
24+
25+
26+
public class PeerStatusCheckTest {
27+
28+
protected TronApplicationContext context;
29+
private PeerStatusCheck service;
30+
@Rule
31+
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
32+
33+
@Before
34+
public void init() throws IOException {
35+
Args.setParam(new String[] {"--output-directory",
36+
temporaryFolder.newFolder().toString(), "--debug"}, Constant.TEST_CONF);
37+
context = new TronApplicationContext(DefaultConfig.class);
38+
service = context.getBean(PeerStatusCheck.class);
39+
}
40+
41+
/**
42+
* destroy.
43+
*/
44+
@After
45+
public void destroy() {
46+
Args.clearParam();
47+
context.destroy();
48+
}
49+
50+
@Test
51+
public void testCheck() {
52+
int maxConnection = 30;
53+
Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections());
54+
Assert.assertEquals(0, PeerManager.getPeers().size());
55+
56+
for (int i = 0; i < maxConnection; i++) {
57+
InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001);
58+
Channel c1 = spy(Channel.class);
59+
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
60+
ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress());
61+
ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class));
62+
Mockito.doNothing().when(c1).send((byte[]) any());
63+
64+
PeerManager.add(context, c1);
65+
}
66+
67+
PeerManager.getPeers().get(0).getSyncBlockRequested()
68+
.put(new BlockId(), System.currentTimeMillis() - NetConstants.SYNC_TIME_OUT - 1000);
69+
ReflectUtils.invokeMethod(service, "statusCheck");
70+
71+
Assert.assertEquals(maxConnection - 1L, PeerManager.getPeers().size());
72+
}
73+
}

framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testDisconnectRandom() {
5151
clearPeers();
5252
Assert.assertEquals(0, PeerManager.getPeers().size());
5353

54-
for (int i = 0; i < maxConnection; i++) {
54+
for (int i = 0; i < maxConnection + 1; i++) {
5555
InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001);
5656
Channel c1 = spy(Channel.class);
5757
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
@@ -61,21 +61,37 @@ public void testDisconnectRandom() {
6161

6262
PeerManager.add(context, c1);
6363
}
64-
for (PeerConnection peer : PeerManager.getPeers()) {
64+
for (PeerConnection peer : PeerManager.getPeers()
65+
.subList(0, ResilienceService.minBroadcastPeerSize)) {
6566
peer.setNeedSyncFromPeer(false);
6667
peer.setNeedSyncFromUs(false);
68+
peer.setLastInteractiveTime(System.currentTimeMillis() - 1000);
69+
}
70+
for (PeerConnection peer : PeerManager.getPeers()
71+
.subList(ResilienceService.minBroadcastPeerSize, maxConnection + 1)) {
72+
peer.setNeedSyncFromPeer(false);
73+
peer.setNeedSyncFromUs(true);
6774
}
75+
int size1 = (int) PeerManager.getPeers().stream()
76+
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
77+
.count();
78+
Assert.assertEquals(ResilienceService.minBroadcastPeerSize, size1);
79+
Assert.assertEquals(maxConnection + 1, PeerManager.getPeers().size());
80+
81+
//disconnect from broadcasting peer
6882
ReflectUtils.invokeMethod(service, "disconnectRandom");
83+
size1 = (int) PeerManager.getPeers().stream()
84+
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
85+
.count();
86+
Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1);
6987
Assert.assertEquals(maxConnection, PeerManager.getPeers().size());
7088

71-
PeerConnection p1 = PeerManager.getPeers().get(1);
72-
p1.setLastInteractiveTime(
73-
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000);
74-
PeerConnection p2 = PeerManager.getPeers().get(10);
75-
p2.setLastInteractiveTime(
76-
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000);
77-
89+
//disconnect from syncing peer
7890
ReflectUtils.invokeMethod(service, "disconnectRandom");
91+
size1 = (int) PeerManager.getPeers().stream()
92+
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
93+
.count();
94+
Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1);
7995
Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size());
8096
}
8197

0 commit comments

Comments
 (0)