Skip to content

Commit e88bf90

Browse files
authored
Merge pull request #5088 from 317787106/feature/isolate
feat(net): add strategy to acquire effective peer if needed
2 parents 3889b5a + 29ac079 commit e88bf90

12 files changed

Lines changed: 350 additions & 21 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package org.tron.common.parameter;
22

3-
import static org.tron.core.Constant.DYNAMIC_ENERGY_FACTOR_DECIMAL;
4-
53
import com.beust.jcommander.Parameter;
64
import java.net.InetAddress;
75
import java.net.InetSocketAddress;
86
import java.util.ArrayList;
97
import java.util.List;
108
import java.util.Set;
119

12-
import com.google.common.annotations.VisibleForTesting;
1310
import lombok.Getter;
1411
import lombok.Setter;
1512
import org.quartz.CronExpression;
@@ -144,6 +141,9 @@ public class CommonParameter {
144141
public boolean nodeDiscoveryPersist;
145142
@Getter
146143
@Setter
144+
public boolean nodeEffectiveCheckEnable;
145+
@Getter
146+
@Setter
147147
public int nodeConnectionTimeout;
148148
@Getter
149149
@Setter

common/src/main/java/org/tron/core/Constant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class Constant {
8383
public static final String BLOCK_NEED_SYNC_CHECK = "block.needSyncCheck";
8484
public static final String NODE_DISCOVERY_ENABLE = "node.discovery.enable";
8585
public static final String NODE_DISCOVERY_PERSIST = "node.discovery.persist";
86+
public static final String NODE_EFFECTIVE_CHECK_ENABLE = "node.effectiveCheckEnable";
8687
public static final String NODE_CONNECTION_TIMEOUT = "node.connection.timeout";
8788
public static final String NODE_FETCH_BLOCK_TIMEOUT = "node.fetchBlock.timeout";
8889
public static final String NODE_CHANNEL_READ_TIMEOUT = "node.channel.read.timeout";

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public static void clearParam() {
124124
PARAMETER.needSyncCheck = false;
125125
PARAMETER.nodeDiscoveryEnable = false;
126126
PARAMETER.nodeDiscoveryPersist = false;
127+
PARAMETER.nodeEffectiveCheckEnable = false;
127128
PARAMETER.nodeConnectionTimeout = 2000;
128129
PARAMETER.activeNodes = new ArrayList<>();
129130
PARAMETER.passiveNodes = new ArrayList<>();
@@ -559,6 +560,10 @@ public static void setParam(final String[] args, final String confFileName) {
559560
config.hasPath(Constant.NODE_DISCOVERY_PERSIST)
560561
&& config.getBoolean(Constant.NODE_DISCOVERY_PERSIST);
561562

563+
PARAMETER.nodeEffectiveCheckEnable =
564+
config.hasPath(Constant.NODE_EFFECTIVE_CHECK_ENABLE)
565+
&& config.getBoolean(Constant.NODE_EFFECTIVE_CHECK_ENABLE);
566+
562567
PARAMETER.nodeConnectionTimeout =
563568
config.hasPath(Constant.NODE_CONNECTION_TIMEOUT)
564569
? config.getInt(Constant.NODE_CONNECTION_TIMEOUT) * 1000
@@ -1222,9 +1227,6 @@ public static List<InetSocketAddress> getInetSocketAddress(
12221227
List<String> list = config.getStringList(path);
12231228
for (String configString : list) {
12241229
InetSocketAddress inetSocketAddress = NetUtil.parseInetSocketAddress(configString);
1225-
if (inetSocketAddress == null) {
1226-
continue;
1227-
}
12281230
if (filter) {
12291231
String ip = inetSocketAddress.getAddress().getHostAddress();
12301232
int port = inetSocketAddress.getPort();
@@ -1250,9 +1252,7 @@ public static List<InetAddress> getInetAddress(
12501252
List<String> list = config.getStringList(path);
12511253
for (String configString : list) {
12521254
InetSocketAddress inetSocketAddress = NetUtil.parseInetSocketAddress(configString);
1253-
if (inetSocketAddress != null) {
1254-
ret.add(inetSocketAddress.getAddress());
1255-
}
1255+
ret.add(inetSocketAddress.getAddress());
12561256
}
12571257
return ret;
12581258
}
@@ -1321,9 +1321,7 @@ private static List<InetSocketAddress> loadSeeds(final com.typesafe.config.Confi
13211321
if (PARAMETER.seedNodes != null && !PARAMETER.seedNodes.isEmpty()) {
13221322
for (String s : PARAMETER.seedNodes) {
13231323
InetSocketAddress inetSocketAddress = NetUtil.parseInetSocketAddress(s);
1324-
if (inetSocketAddress != null) {
1325-
inetSocketAddressList.add(inetSocketAddress);
1326-
}
1324+
inetSocketAddressList.add(inetSocketAddress);
13271325
}
13281326
} else {
13291327
inetSocketAddressList = getInetSocketAddress(config, Constant.SEED_NODE_IP_LIST, false);
@@ -1632,6 +1630,7 @@ public static void logConfig() {
16321630
logger.info("Bind IP: {}", parameter.getNodeDiscoveryBindIp());
16331631
logger.info("External IP: {}", parameter.getNodeExternalIp());
16341632
logger.info("Listen port: {}", parameter.getNodeListenPort());
1633+
logger.info("Node ipv6 enable: {}", parameter.isNodeEnableIpv6());
16351634
logger.info("Discover enable: {}", parameter.isNodeDiscoveryEnable());
16361635
logger.info("Active node size: {}", parameter.getActiveNodes().size());
16371636
logger.info("Passive node size: {}", parameter.getPassiveNodes().size());
@@ -1646,6 +1645,7 @@ public static void logConfig() {
16461645
logger.info("Trx reference block: {}", parameter.getTrxReferenceBlock());
16471646
logger.info("Open full tcp disconnect: {}", parameter.isOpenFullTcpDisconnect());
16481647
logger.info("Node detect enable: {}", parameter.isNodeDetectEnable());
1648+
logger.info("Node effective check enable: {}", parameter.isNodeEffectiveCheckEnable());
16491649
logger.info("Rate limiter global qps: {}", parameter.getRateLimiterGlobalQps());
16501650
logger.info("Rate limiter global ip qps: {}", parameter.getRateLimiterGlobalIpQps());
16511651
logger.info("************************ Backup config ************************");

framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.tron.core.net.messagehandler.TransactionsMsgHandler;
2929
import org.tron.core.net.peer.PeerConnection;
3030
import org.tron.core.net.peer.PeerManager;
31+
import org.tron.core.net.service.effective.EffectiveCheckService;
3132
import org.tron.core.net.service.handshake.HandshakeService;
3233
import org.tron.core.net.service.keepalive.KeepAliveService;
3334
import org.tron.p2p.P2pEventHandler;
@@ -78,6 +79,9 @@ public class P2pEventHandlerImpl extends P2pEventHandler {
7879
@Autowired
7980
private KeepAliveService keepAliveService;
8081

82+
@Autowired
83+
private EffectiveCheckService effectiveCheckService;
84+
8185
private byte MESSAGE_MAX_TYPE = 127;
8286

8387
public P2pEventHandlerImpl() {
@@ -102,6 +106,7 @@ public synchronized void onDisconnect(Channel channel) {
102106
if (peerConnection != null) {
103107
peerConnection.onDisconnect();
104108
}
109+
effectiveCheckService.onDisconnect(channel.getInetSocketAddress());
105110
}
106111

107112
@Override

framework/src/main/java/org/tron/core/net/TronNetService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.tron.core.net.peer.PeerManager;
2222
import org.tron.core.net.peer.PeerStatusCheck;
2323
import org.tron.core.net.service.adv.AdvService;
24+
import org.tron.core.net.service.effective.EffectiveCheckService;
2425
import org.tron.core.net.service.fetchblock.FetchBlockService;
2526
import org.tron.core.net.service.nodepersist.NodePersistService;
2627
import org.tron.core.net.service.relay.RelayService;
@@ -69,6 +70,9 @@ public class TronNetService {
6970
@Autowired
7071
private RelayService relayService;
7172

73+
@Autowired
74+
private EffectiveCheckService effectiveCheckService;
75+
7276
private volatile boolean init;
7377

7478
private static void setP2pConfig(P2pConfig config) {
@@ -90,6 +94,7 @@ public void start() {
9094
tronStatsManager.init();
9195
PeerManager.init();
9296
relayService.init();
97+
effectiveCheckService.init();
9398
logger.info("Net service start successfully");
9499
} catch (Exception e) {
95100
logger.error("Net service start failed", e);
@@ -108,6 +113,7 @@ public void close() {
108113
peerStatusCheck.close();
109114
transactionsMsgHandler.close();
110115
fetchBlockService.close();
116+
effectiveCheckService.close();
111117
p2pService.close();
112118
relayService.close();
113119
logger.info("Net service closed successfully");
@@ -144,13 +150,11 @@ public static boolean hasIpv4Stack(Set<String> ipSet) {
144150
private P2pConfig getConfig() {
145151
List<InetSocketAddress> seeds = parameter.getSeedNode().getAddressList();
146152
seeds.addAll(nodePersistService.dbRead());
147-
for (InetSocketAddress inetSocketAddress : seeds) {
148-
logger.debug("Seed InetSocketAddress: {}", inetSocketAddress);
149-
}
153+
logger.debug("Seed InetSocketAddress: {}", seeds);
150154
P2pConfig config = new P2pConfig();
151-
config.setSeedNodes(seeds);
152-
config.setActiveNodes(parameter.getActiveNodes());
153-
config.setTrustNodes(parameter.getPassiveNodes());
155+
config.getSeedNodes().addAll(seeds);
156+
config.getActiveNodes().addAll(parameter.getActiveNodes());
157+
config.getTrustNodes().addAll(parameter.getPassiveNodes());
154158
config.getActiveNodes().forEach(n -> config.getTrustNodes().add(n.getAddress()));
155159
parameter.getFastForwardNodes().forEach(f -> config.getTrustNodes().add(f.getAddress()));
156160
int maxConnections = parameter.getMaxConnections();
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package org.tron.core.net.service.effective;
2+
3+
import com.google.common.cache.Cache;
4+
import com.google.common.cache.CacheBuilder;
5+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
6+
import java.net.InetSocketAddress;
7+
import java.util.Comparator;
8+
import java.util.HashSet;
9+
import java.util.List;
10+
import java.util.Optional;
11+
import java.util.Set;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
import lombok.Getter;
17+
import lombok.Setter;
18+
import lombok.extern.slf4j.Slf4j;
19+
import org.springframework.beans.factory.annotation.Autowired;
20+
import org.springframework.stereotype.Component;
21+
import org.tron.core.config.args.Args;
22+
import org.tron.core.net.TronNetDelegate;
23+
import org.tron.core.net.TronNetService;
24+
import org.tron.core.net.peer.PeerConnection;
25+
import org.tron.p2p.discover.Node;
26+
import org.tron.protos.Protocol.ReasonCode;
27+
28+
@Slf4j(topic = "net")
29+
@Component
30+
public class EffectiveCheckService {
31+
32+
@Getter
33+
private final boolean isEffectiveCheck = Args.getInstance().isNodeEffectiveCheckEnable();
34+
@Autowired
35+
private TronNetDelegate tronNetDelegate;
36+
37+
private final Cache<InetSocketAddress, Boolean> nodesCache = CacheBuilder.newBuilder()
38+
.initialCapacity(100)
39+
.maximumSize(10000)
40+
.expireAfterWrite(20, TimeUnit.MINUTES).build();
41+
@Getter
42+
@Setter
43+
private volatile InetSocketAddress cur;
44+
private final AtomicInteger count = new AtomicInteger(0);
45+
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
46+
new ThreadFactoryBuilder().setNameFormat("effective-thread-%d").build());
47+
private long MAX_HANDSHAKE_TIME = 60_000;
48+
49+
public void init() {
50+
if (isEffectiveCheck) {
51+
executor.scheduleWithFixedDelay(() -> {
52+
try {
53+
findEffectiveNode();
54+
} catch (Exception e) {
55+
logger.error("Check effective connection processing failed", e);
56+
}
57+
}, 60, 5, TimeUnit.SECONDS);
58+
} else {
59+
logger.info("EffectiveCheckService is disabled");
60+
}
61+
}
62+
63+
public void triggerNext() {
64+
try {
65+
executor.submit(this::findEffectiveNode);
66+
} catch (Exception e) {
67+
logger.warn("Submit effective service task failed, message:{}", e.getMessage());
68+
}
69+
}
70+
71+
public void close() {
72+
if (executor != null) {
73+
try {
74+
executor.shutdown();
75+
} catch (Exception e) {
76+
logger.error("Exception in shutdown effective service worker, {}", e.getMessage());
77+
}
78+
}
79+
}
80+
81+
public boolean isIsolateLand() {
82+
return (int) tronNetDelegate.getActivePeer().stream()
83+
.filter(PeerConnection::isNeedSyncFromUs)
84+
.count() == tronNetDelegate.getActivePeer().size();
85+
}
86+
87+
//try to find node which we can sync from
88+
private void findEffectiveNode() {
89+
if (!isIsolateLand()) {
90+
if (count.get() > 0) {
91+
logger.info("Success to verify effective node {}", cur);
92+
resetCount();
93+
}
94+
return;
95+
}
96+
97+
if (cur != null) {
98+
tronNetDelegate.getActivePeer().forEach(p -> {
99+
if (p.getInetSocketAddress().equals(cur)
100+
&& System.currentTimeMillis() - p.getChannel().getStartTime() >= MAX_HANDSHAKE_TIME) {
101+
// we encounter no effective connection again, so we disconnect with last used node
102+
logger.info("Disconnect with {}", cur);
103+
p.disconnect(ReasonCode.BELOW_THAN_ME);
104+
}
105+
});
106+
logger.info("Thread is running");
107+
return;
108+
}
109+
110+
List<Node> tableNodes = TronNetService.getP2pService().getConnectableNodes();
111+
tableNodes.sort(Comparator.comparingLong(node -> -node.getUpdateTime()));
112+
Set<InetSocketAddress> usedAddressSet = new HashSet<>();
113+
tronNetDelegate.getActivePeer().forEach(p -> usedAddressSet.add(p.getInetSocketAddress()));
114+
Optional<Node> chosenNode = tableNodes.stream()
115+
.filter(node -> nodesCache.getIfPresent(node.getPreferInetSocketAddress()) == null)
116+
.filter(node -> !usedAddressSet.contains(node.getPreferInetSocketAddress()))
117+
.filter(node -> !TronNetService.getP2pConfig().getActiveNodes()
118+
.contains(node.getPreferInetSocketAddress()))
119+
.findFirst();
120+
if (!chosenNode.isPresent()) {
121+
logger.warn("No available node to choose");
122+
return;
123+
}
124+
125+
count.incrementAndGet();
126+
nodesCache.put(chosenNode.get().getPreferInetSocketAddress(), true);
127+
cur = new InetSocketAddress(chosenNode.get().getPreferInetSocketAddress().getAddress(),
128+
chosenNode.get().getPreferInetSocketAddress().getPort());
129+
130+
logger.info("Try to get effective connection by using {} at seq {}", cur, count.get());
131+
TronNetService.getP2pService().connect(chosenNode.get(), future -> {
132+
if (future.isCancelled()) {
133+
// Connection attempt cancelled by user
134+
cur = null;
135+
} else if (!future.isSuccess()) {
136+
// You might get a NullPointerException here because the future might not be completed yet.
137+
logger.warn("Connect to chosen peer {} fail, cause:{}", cur, future.cause().getMessage());
138+
future.channel().close();
139+
cur = null;
140+
triggerNext();
141+
} else {
142+
// Connection established successfully
143+
}
144+
});
145+
}
146+
147+
private void resetCount() {
148+
count.set(0);
149+
}
150+
151+
public void onDisconnect(InetSocketAddress inetSocketAddress) {
152+
if (inetSocketAddress.equals(cur)) {
153+
logger.warn("Close chosen peer: {}", cur);
154+
cur = null;
155+
triggerNext();
156+
}
157+
}
158+
}

framework/src/main/java/org/tron/core/net/service/handshake/HandshakeService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.tron.core.net.message.handshake.HelloMessage;
1212
import org.tron.core.net.peer.PeerConnection;
1313
import org.tron.core.net.peer.PeerManager;
14+
import org.tron.core.net.service.effective.EffectiveCheckService;
1415
import org.tron.core.net.service.relay.RelayService;
1516
import org.tron.p2p.discover.Node;
1617
import org.tron.protos.Protocol.ReasonCode;
@@ -22,6 +23,9 @@ public class HandshakeService {
2223
@Autowired
2324
private RelayService relayService;
2425

26+
@Autowired
27+
private EffectiveCheckService effectiveCheckService;
28+
2529
@Autowired
2630
private ChainBaseManager chainBaseManager;
2731

@@ -98,6 +102,15 @@ public void processHelloMessage(PeerConnection peer, HelloMessage msg) {
98102
return;
99103
}
100104

105+
if (msg.getHeadBlockId().getNum() < chainBaseManager.getHeadBlockId().getNum()
106+
&& peer.getInetSocketAddress().equals(effectiveCheckService.getCur())) {
107+
logger.info("Peer's head block {} is below than we, peer->{}, me->{}",
108+
peer.getInetSocketAddress(), msg.getHeadBlockId().getNum(),
109+
chainBaseManager.getHeadBlockId().getNum());
110+
peer.disconnect(ReasonCode.BELOW_THAN_ME);
111+
return;
112+
}
113+
101114
peer.setHelloMessageReceive(msg);
102115

103116
peer.getChannel().updateAvgLatency(

framework/src/main/java/org/tron/core/net/service/statistics/MessageStatistics.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,17 @@ private void addTcpMessage(Message msg, boolean flag) {
143143
break;
144144
case TRX:
145145
if (flag) {
146-
tronInMessage.add();
146+
tronInTrx.add();
147147
} else {
148-
tronOutMessage.add();
148+
tronOutTrx.add();
149149
}
150150
break;
151151
case BLOCK:
152152
if (flag) {
153153
tronInBlock.add();
154+
} else {
155+
tronOutBlock.add();
154156
}
155-
tronOutBlock.add();
156157
break;
157158
default:
158159
break;

0 commit comments

Comments
 (0)