Skip to content

Commit 4108020

Browse files
committed
feat(net): optimize InventoryMessage processing logic
1 parent afb14ec commit 4108020

6 files changed

Lines changed: 39 additions & 20 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ public class CommonParameter {
165165
public int maxConnectionsWithSameIp;
166166
@Getter
167167
@Setter
168+
public int maxTps;
169+
@Getter
170+
@Setter
168171
public int minParticipationRate;
169172
@Getter
170173
@Setter

common/src/main/java/org/tron/core/Constant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public class Constant {
9393

9494
public static final String NODE_MAX_ACTIVE_NODES = "node.maxActiveNodes";
9595
public static final String NODE_MAX_ACTIVE_NODES_WITH_SAME_IP = "node.maxActiveNodesWithSameIp";
96+
public static final String NODE_MAX_TPS = "node.maxTps";
9697
public static final String NODE_CONNECT_FACTOR = "node.connectFactor";
9798
public static final String NODE_ACTIVE_CONNECT_FACTOR = "node.activeConnectFactor";
9899

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public static void clearParam() {
135135
PARAMETER.minConnections = 8;
136136
PARAMETER.minActiveConnections = 3;
137137
PARAMETER.maxConnectionsWithSameIp = 2;
138+
PARAMETER.maxTps = 1000;
138139
PARAMETER.minParticipationRate = 0;
139140
PARAMETER.nodeListenPort = 0;
140141
PARAMETER.nodeDiscoveryBindIp = "";
@@ -621,6 +622,9 @@ public static void setParam(final String[] args, final String confFileName) {
621622
.getInt(Constant.NODE_MAX_CONNECTIONS_WITH_SAME_IP) : 2;
622623
}
623624

625+
PARAMETER.maxTps = config.hasPath(Constant.NODE_MAX_TPS)
626+
? config.getInt(Constant.NODE_MAX_TPS) : 1000;
627+
624628
PARAMETER.minParticipationRate =
625629
config.hasPath(Constant.NODE_MIN_PARTICIPATION_RATE)
626630
? config.getInt(Constant.NODE_MIN_PARTICIPATION_RATE)

framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.tron.core.net;
22

3+
import static org.tron.core.net.message.MessageTypes.INVENTORY;
4+
import static org.tron.core.net.message.MessageTypes.TRX;
5+
36
import java.util.HashSet;
47
import java.util.Set;
58
import java.util.concurrent.atomic.AtomicInteger;
@@ -11,11 +14,13 @@
1114
import org.tron.common.prometheus.MetricKeys;
1215
import org.tron.common.prometheus.Metrics;
1316
import org.tron.consensus.pbft.message.PbftMessage;
17+
import org.tron.core.config.args.Args;
1418
import org.tron.core.exception.P2pException;
1519
import org.tron.core.net.message.MessageTypes;
1620
import org.tron.core.net.message.PbftMessageFactory;
1721
import org.tron.core.net.message.TronMessage;
1822
import org.tron.core.net.message.TronMessageFactory;
23+
import org.tron.core.net.message.adv.InventoryMessage;
1924
import org.tron.core.net.message.base.DisconnectMessage;
2025
import org.tron.core.net.message.handshake.HelloMessage;
2126
import org.tron.core.net.messagehandler.BlockMsgHandler;
@@ -84,6 +89,8 @@ public class P2pEventHandlerImpl extends P2pEventHandler {
8489

8590
private byte MESSAGE_MAX_TYPE = 127;
8691

92+
private int maxCountIn10s = Args.getInstance().getMaxTps() * 10;
93+
8794
public P2pEventHandlerImpl() {
8895
Set<Byte> set = new HashSet<>();
8996
for (byte i = 0; i < MESSAGE_MAX_TYPE; i++) {
@@ -140,10 +147,27 @@ private void processMessage(PeerConnection peer, byte[] data) {
140147
try {
141148
msg = TronMessageFactory.create(data);
142149
type = msg.getType();
150+
151+
if (INVENTORY.equals(type)) {
152+
InventoryMessage message = (InventoryMessage) msg;
153+
Protocol.Inventory.InventoryType inventoryType = message.getInventoryType();
154+
int count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement
155+
.getCount(10);
156+
if (inventoryType.equals(Protocol.Inventory.InventoryType.TRX) && count > maxCountIn10s) {
157+
logger.warn("Drop inventory from Peer {}, cur:{}, max:{}",
158+
peer.getInetAddress(), count, maxCountIn10s);
159+
if (Args.getInstance().isOpenPrintLog()) {
160+
logger.warn("[overload]Drop tx list is: {}", ((InventoryMessage) msg).getHashList());
161+
}
162+
return;
163+
}
164+
}
165+
143166
peer.getPeerStatistics().messageStatistics.addTcpInMessage(msg);
144167
if (PeerConnection.needToLog(msg)) {
145168
logger.info("Receive message from peer: {}, {}", peer.getInetSocketAddress(), msg);
146169
}
170+
147171
switch (type) {
148172
case P2P_PING:
149173
case P2P_PONG:

framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ public class InventoryMsgHandler implements TronMsgHandler {
2626
@Autowired
2727
private TransactionsMsgHandler transactionsMsgHandler;
2828

29-
private int maxCountIn10s = 10_000;
30-
3129
@Override
3230
public void processMessage(PeerConnection peer, TronMessage msg) {
3331
InventoryMessage inventoryMessage = (InventoryMessage) msg;
@@ -54,25 +52,13 @@ private boolean check(PeerConnection peer, InventoryMessage inventoryMessage) {
5452
return false;
5553
}
5654

57-
if (type.equals(InventoryType.TRX)) {
58-
int count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10);
59-
if (count > maxCountIn10s) {
60-
logger.warn("Drop inv: {} size: {} from Peer {}, Inv count: {} is overload",
61-
type, size, peer.getInetAddress(), count);
62-
if (Args.getInstance().isOpenPrintLog()) {
63-
logger.warn("[overload]Drop tx list is: {}", inventoryMessage.getHashList());
64-
}
65-
return false;
66-
}
67-
68-
if (transactionsMsgHandler.isBusy()) {
69-
logger.warn("Drop inv: {} size: {} from Peer {}, transactionsMsgHandler is busy",
70-
type, size, peer.getInetAddress());
71-
if (Args.getInstance().isOpenPrintLog()) {
72-
logger.warn("[isBusy]Drop tx list is: {}", inventoryMessage.getHashList());
73-
}
74-
return false;
55+
if (type.equals(InventoryType.TRX) && transactionsMsgHandler.isBusy()) {
56+
logger.warn("Drop inv: {} size: {} from Peer {}, transactionsMsgHandler is busy",
57+
type, size, peer.getInetAddress());
58+
if (Args.getInstance().isOpenPrintLog()) {
59+
logger.warn("[isBusy]Drop tx list is: {}", inventoryMessage.getHashList());
7560
}
61+
return false;
7662
}
7763

7864
return true;

framework/src/test/java/org/tron/common/config/args/ArgsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ public void testConfig() {
3737
Assert.assertEquals(Args.getInstance().getRateLimiterGlobalQps(), 50000);
3838
Assert.assertEquals(Args.getInstance().getRateLimiterGlobalIpQps(), 10000);
3939
Assert.assertEquals(Args.getInstance().p2pDisable, true);
40+
Assert.assertEquals(Args.getInstance().getMaxTps(), 1000);
4041
}
4142
}

0 commit comments

Comments
 (0)