Skip to content

Commit 3fa8c4f

Browse files
authored
Merge pull request #5195 from chengtx01/optimize_sync
feat(net):optimize sync service
2 parents afb14ec + 1e3e385 commit 3fa8c4f

6 files changed

Lines changed: 37 additions & 4 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ public class CommonParameter {
199199
@Getter
200200
@Setter
201201
public PublishConfig dnsPublishConfig;
202+
@Getter
203+
@Setter
204+
public long syncFetchBatchNum;
202205

203206
//If you are running a solidity node for java tron, this flag is set to true
204207
@Getter

common/src/main/java/org/tron/core/Constant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public class Constant {
9090
public static final String NODE_MAX_CONNECTIONS = "node.maxConnections";
9191
public static final String NODE_MIN_CONNECTIONS = "node.minConnections";
9292
public static final String NODE_MIN_ACTIVE_CONNECTIONS = "node.minActiveConnections";
93+
public static final String NODE_SYNC_FETCH_BATCH_NUM = "node.syncFetchBatchNum";
9394

9495
public static final String NODE_MAX_ACTIVE_NODES = "node.maxActiveNodes";
9596
public static final String NODE_MAX_ACTIVE_NODES_WITH_SAME_IP = "node.maxActiveNodesWithSameIp";

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public static void clearParam() {
146146
PARAMETER.nodeEnableIpv6 = false;
147147
PARAMETER.dnsTreeUrls = new ArrayList<>();
148148
PARAMETER.dnsPublishConfig = null;
149+
PARAMETER.syncFetchBatchNum = 2000;
149150
PARAMETER.rpcPort = 0;
150151
PARAMETER.rpcOnSolidityPort = 0;
151152
PARAMETER.rpcOnPBFTPort = 0;
@@ -657,6 +658,15 @@ public static void setParam(final String[] args, final String confFileName) {
657658

658659
PARAMETER.dnsPublishConfig = loadDnsPublishConfig(config);
659660

661+
PARAMETER.syncFetchBatchNum = config.hasPath(Constant.NODE_SYNC_FETCH_BATCH_NUM) ? config
662+
.getInt(Constant.NODE_SYNC_FETCH_BATCH_NUM) : 2000;
663+
if (PARAMETER.syncFetchBatchNum > 2000) {
664+
PARAMETER.syncFetchBatchNum = 2000;
665+
}
666+
if (PARAMETER.syncFetchBatchNum < 100) {
667+
PARAMETER.syncFetchBatchNum = 100;
668+
}
669+
660670
PARAMETER.rpcPort =
661671
config.hasPath(Constant.NODE_RPC_PORT)
662672
? config.getInt(Constant.NODE_RPC_PORT) : 50051;

framework/src/main/java/org/tron/core/net/messagehandler/ChainInventoryMsgHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.tron.core.capsule.BlockCapsule.BlockId;
1414
import org.tron.core.config.Parameter.ChainConstant;
1515
import org.tron.core.config.Parameter.NetConstants;
16+
import org.tron.core.config.args.Args;
1617
import org.tron.core.exception.P2pException;
1718
import org.tron.core.exception.P2pException.TypeEnum;
1819
import org.tron.core.net.TronNetDelegate;
@@ -32,6 +33,8 @@ public class ChainInventoryMsgHandler implements TronMsgHandler {
3233
@Autowired
3334
private SyncService syncService;
3435

36+
private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum();
37+
3538
@Override
3639
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
3740

@@ -88,7 +91,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
8891
peer.setFetchAble(true);
8992
if ((chainInventoryMessage.getRemainNum() == 0 && !peer.getSyncBlockToFetch().isEmpty())
9093
|| (chainInventoryMessage.getRemainNum() != 0
91-
&& peer.getSyncBlockToFetch().size() > NetConstants.SYNC_FETCH_BATCH_NUM)) {
94+
&& peer.getSyncBlockToFetch().size() > syncFetchBatchNum)) {
9295
syncService.setFetchFlag(true);
9396
} else {
9497
syncService.syncNext(peer);

framework/src/main/java/org/tron/core/net/service/sync/SyncService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.tron.common.utils.Pair;
2222
import org.tron.core.capsule.BlockCapsule;
2323
import org.tron.core.capsule.BlockCapsule.BlockId;
24-
import org.tron.core.config.Parameter.NetConstants;
2524
import org.tron.core.config.args.Args;
2625
import org.tron.core.exception.P2pException;
2726
import org.tron.core.exception.P2pException.TypeEnum;
@@ -65,6 +64,8 @@ public class SyncService {
6564
@Setter
6665
private volatile boolean fetchFlag = false;
6766

67+
private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum();
68+
6869
public void init() {
6970
fetchExecutor.scheduleWithFixedDelay(() -> {
7071
try {
@@ -132,7 +133,7 @@ public void processBlock(PeerConnection peer, BlockMessage blockMessage) {
132133
handleFlag = true;
133134
if (peer.isIdle()) {
134135
if (peer.getRemainNum() > 0
135-
&& peer.getSyncBlockToFetch().size() <= NetConstants.SYNC_FETCH_BATCH_NUM) {
136+
&& peer.getSyncBlockToFetch().size() <= syncFetchBatchNum) {
136137
syncNext(peer);
137138
} else {
138139
fetchFlag = true;
@@ -250,6 +251,7 @@ private synchronized void handleSyncBlock() {
250251
}
251252

252253
final boolean[] isProcessed = {true};
254+
long solidNum = tronNetDelegate.getSolidBlockId().getNum();
253255

254256
while (isProcessed[0]) {
255257

@@ -262,6 +264,10 @@ private synchronized void handleSyncBlock() {
262264
invalid(msg.getBlockId(), peerConnection);
263265
return;
264266
}
267+
if (msg.getBlockId().getNum() <= solidNum) {
268+
blockWaitToProcess.remove(msg);
269+
return;
270+
}
265271
final boolean[] isFound = {false};
266272
tronNetDelegate.getActivePeer().stream()
267273
.filter(peer -> msg.getBlockId().equals(peer.getSyncBlockToFetch().peek()))

framework/src/test/java/org/tron/core/net/services/SyncServiceTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,21 @@ public void testHandleSyncBlock() throws Exception {
183183
Map<BlockMessage, PeerConnection> blockJustReceived =
184184
(Map<BlockMessage, PeerConnection>)
185185
ReflectUtils.getFieldObject(service, "blockJustReceived");
186+
Protocol.BlockHeader.raw.Builder blockHeaderRawBuild = Protocol.BlockHeader.raw.newBuilder();
187+
Protocol.BlockHeader.raw blockHeaderRaw = blockHeaderRawBuild
188+
.setNumber(100000)
189+
.build();
186190

187-
BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder().build());
191+
// block header
192+
Protocol.BlockHeader.Builder blockHeaderBuild = Protocol.BlockHeader.newBuilder();
193+
Protocol.BlockHeader blockHeader = blockHeaderBuild.setRawData(blockHeaderRaw).build();
194+
195+
BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder()
196+
.setBlockHeader(blockHeader).build());
188197

189198
BlockCapsule.BlockId blockId = blockCapsule.getBlockId();
190199

200+
191201
InetSocketAddress a1 = new InetSocketAddress("127.0.0.1", 10001);
192202
Channel c1 = mock(Channel.class);
193203
Mockito.when(c1.getInetSocketAddress()).thenReturn(a1);

0 commit comments

Comments
 (0)