Skip to content

Commit 619a20f

Browse files
authored
Merge pull request #5196 from wubin01/optimize_invmsg_proc
feat(net): optimize InventoryMessage processing logic
2 parents 3fa8c4f + c929ee3 commit 619a20f

7 files changed

Lines changed: 149 additions & 20 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ public class CommonParameter {
165165
public int maxConnectionsWithSameIp;
166166
@Getter
167167
@Setter
168+
public int maxTps;
169+
@Getter
170+
@Setter
168171
public int minParticipationRate;
169172
@Getter
170173
@Setter

common/src/main/java/org/tron/core/Constant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public class Constant {
9494

9595
public static final String NODE_MAX_ACTIVE_NODES = "node.maxActiveNodes";
9696
public static final String NODE_MAX_ACTIVE_NODES_WITH_SAME_IP = "node.maxActiveNodesWithSameIp";
97+
public static final String NODE_MAX_TPS = "node.maxTps";
9798
public static final String NODE_CONNECT_FACTOR = "node.connectFactor";
9899
public static final String NODE_ACTIVE_CONNECT_FACTOR = "node.activeConnectFactor";
99100

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public static void clearParam() {
135135
PARAMETER.minConnections = 8;
136136
PARAMETER.minActiveConnections = 3;
137137
PARAMETER.maxConnectionsWithSameIp = 2;
138+
PARAMETER.maxTps = 1000;
138139
PARAMETER.minParticipationRate = 0;
139140
PARAMETER.nodeListenPort = 0;
140141
PARAMETER.nodeDiscoveryBindIp = "";
@@ -622,6 +623,9 @@ public static void setParam(final String[] args, final String confFileName) {
622623
.getInt(Constant.NODE_MAX_CONNECTIONS_WITH_SAME_IP) : 2;
623624
}
624625

626+
PARAMETER.maxTps = config.hasPath(Constant.NODE_MAX_TPS)
627+
? config.getInt(Constant.NODE_MAX_TPS) : 1000;
628+
625629
PARAMETER.minParticipationRate =
626630
config.hasPath(Constant.NODE_MIN_PARTICIPATION_RATE)
627631
? config.getInt(Constant.NODE_MIN_PARTICIPATION_RATE)

framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.tron.core.net;
22

3+
import static org.tron.core.net.message.MessageTypes.INVENTORY;
4+
35
import java.util.HashSet;
46
import java.util.Set;
57
import java.util.concurrent.atomic.AtomicInteger;
@@ -11,11 +13,13 @@
1113
import org.tron.common.prometheus.MetricKeys;
1214
import org.tron.common.prometheus.Metrics;
1315
import org.tron.consensus.pbft.message.PbftMessage;
16+
import org.tron.core.config.args.Args;
1417
import org.tron.core.exception.P2pException;
1518
import org.tron.core.net.message.MessageTypes;
1619
import org.tron.core.net.message.PbftMessageFactory;
1720
import org.tron.core.net.message.TronMessage;
1821
import org.tron.core.net.message.TronMessageFactory;
22+
import org.tron.core.net.message.adv.InventoryMessage;
1923
import org.tron.core.net.message.base.DisconnectMessage;
2024
import org.tron.core.net.message.handshake.HelloMessage;
2125
import org.tron.core.net.messagehandler.BlockMsgHandler;
@@ -84,6 +88,8 @@ public class P2pEventHandlerImpl extends P2pEventHandler {
8488

8589
private byte MESSAGE_MAX_TYPE = 127;
8690

91+
private int maxCountIn10s = Args.getInstance().getMaxTps() * 10;
92+
8793
public P2pEventHandlerImpl() {
8894
Set<Byte> set = new HashSet<>();
8995
for (byte i = 0; i < MESSAGE_MAX_TYPE; i++) {
@@ -140,10 +146,27 @@ private void processMessage(PeerConnection peer, byte[] data) {
140146
try {
141147
msg = TronMessageFactory.create(data);
142148
type = msg.getType();
149+
150+
if (INVENTORY.equals(type)) {
151+
InventoryMessage message = (InventoryMessage) msg;
152+
Protocol.Inventory.InventoryType inventoryType = message.getInventoryType();
153+
int count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement
154+
.getCount(10);
155+
if (inventoryType.equals(Protocol.Inventory.InventoryType.TRX) && count > maxCountIn10s) {
156+
logger.warn("Drop inventory from Peer {}, cur:{}, max:{}",
157+
peer.getInetAddress(), count, maxCountIn10s);
158+
if (Args.getInstance().isOpenPrintLog()) {
159+
logger.warn("[overload]Drop tx list is: {}", ((InventoryMessage) msg).getHashList());
160+
}
161+
return;
162+
}
163+
}
164+
143165
peer.getPeerStatistics().messageStatistics.addTcpInMessage(msg);
144166
if (PeerConnection.needToLog(msg)) {
145167
logger.info("Receive message from peer: {}, {}", peer.getInetSocketAddress(), msg);
146168
}
169+
147170
switch (type) {
148171
case P2P_PING:
149172
case P2P_PONG:

framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ public class InventoryMsgHandler implements TronMsgHandler {
2626
@Autowired
2727
private TransactionsMsgHandler transactionsMsgHandler;
2828

29-
private int maxCountIn10s = 10_000;
30-
3129
@Override
3230
public void processMessage(PeerConnection peer, TronMessage msg) {
3331
InventoryMessage inventoryMessage = (InventoryMessage) msg;
@@ -54,25 +52,13 @@ private boolean check(PeerConnection peer, InventoryMessage inventoryMessage) {
5452
return false;
5553
}
5654

57-
if (type.equals(InventoryType.TRX)) {
58-
int count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10);
59-
if (count > maxCountIn10s) {
60-
logger.warn("Drop inv: {} size: {} from Peer {}, Inv count: {} is overload",
61-
type, size, peer.getInetAddress(), count);
62-
if (Args.getInstance().isOpenPrintLog()) {
63-
logger.warn("[overload]Drop tx list is: {}", inventoryMessage.getHashList());
64-
}
65-
return false;
66-
}
67-
68-
if (transactionsMsgHandler.isBusy()) {
69-
logger.warn("Drop inv: {} size: {} from Peer {}, transactionsMsgHandler is busy",
70-
type, size, peer.getInetAddress());
71-
if (Args.getInstance().isOpenPrintLog()) {
72-
logger.warn("[isBusy]Drop tx list is: {}", inventoryMessage.getHashList());
73-
}
74-
return false;
55+
if (type.equals(InventoryType.TRX) && transactionsMsgHandler.isBusy()) {
56+
logger.warn("Drop inv: {} size: {} from Peer {}, transactionsMsgHandler is busy",
57+
type, size, peer.getInetAddress());
58+
if (Args.getInstance().isOpenPrintLog()) {
59+
logger.warn("[isBusy]Drop tx list is: {}", inventoryMessage.getHashList());
7560
}
61+
return false;
7662
}
7763

7864
return true;

framework/src/test/java/org/tron/common/config/args/ArgsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ public void testConfig() {
3737
Assert.assertEquals(Args.getInstance().getRateLimiterGlobalQps(), 50000);
3838
Assert.assertEquals(Args.getInstance().getRateLimiterGlobalIpQps(), 10000);
3939
Assert.assertEquals(Args.getInstance().p2pDisable, true);
40+
Assert.assertEquals(Args.getInstance().getMaxTps(), 1000);
4041
}
4142
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package org.tron.core.net;
2+
3+
import static org.mockito.Mockito.mock;
4+
5+
import java.lang.reflect.Method;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import org.junit.Assert;
9+
import org.junit.Test;
10+
import org.mockito.Mockito;
11+
import org.tron.common.parameter.CommonParameter;
12+
import org.tron.common.utils.Sha256Hash;
13+
import org.tron.core.Constant;
14+
import org.tron.core.config.args.Args;
15+
import org.tron.core.net.message.adv.InventoryMessage;
16+
import org.tron.core.net.peer.PeerConnection;
17+
import org.tron.core.net.service.statistics.PeerStatistics;
18+
import org.tron.protos.Protocol;
19+
20+
public class P2pEventHandlerImplTest {
21+
22+
@Test
23+
public void testProcessInventoryMessage() throws Exception {
24+
String[] a = new String[0];
25+
Args.setParam(a, Constant.TESTNET_CONF);
26+
CommonParameter parameter = CommonParameter.getInstance();
27+
parameter.setMaxTps(10);
28+
29+
PeerStatistics peerStatistics = new PeerStatistics();
30+
31+
PeerConnection peer = mock(PeerConnection.class);
32+
Mockito.when(peer.getPeerStatistics()).thenReturn(peerStatistics);
33+
34+
P2pEventHandlerImpl p2pEventHandler = new P2pEventHandlerImpl();
35+
36+
Method method = p2pEventHandler.getClass()
37+
.getDeclaredMethod("processMessage", PeerConnection.class, byte[].class);
38+
method.setAccessible(true);
39+
40+
int count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement
41+
.getCount(10);
42+
43+
Assert.assertEquals(0, count);
44+
45+
List<Sha256Hash> list = new ArrayList<>();
46+
for (int i = 0; i < 10; i++) {
47+
list.add(new Sha256Hash(i, new byte[32]));
48+
}
49+
50+
InventoryMessage msg = new InventoryMessage(list, Protocol.Inventory.InventoryType.TRX);
51+
52+
method.invoke(p2pEventHandler, peer, msg.getSendBytes());
53+
54+
count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10);
55+
56+
Assert.assertEquals(10, count);
57+
58+
list.clear();
59+
for (int i = 0; i < 100; i++) {
60+
list.add(new Sha256Hash(i, new byte[32]));
61+
}
62+
63+
msg = new InventoryMessage(list, Protocol.Inventory.InventoryType.TRX);
64+
65+
method.invoke(p2pEventHandler, peer, msg.getSendBytes());
66+
67+
count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10);
68+
69+
Assert.assertEquals(110, count);
70+
71+
list.clear();
72+
for (int i = 0; i < 100; i++) {
73+
list.add(new Sha256Hash(i, new byte[32]));
74+
}
75+
76+
msg = new InventoryMessage(list, Protocol.Inventory.InventoryType.TRX);
77+
78+
method.invoke(p2pEventHandler, peer, msg.getSendBytes());
79+
80+
count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10);
81+
82+
Assert.assertEquals(110, count);
83+
84+
list.clear();
85+
for (int i = 0; i < 200; i++) {
86+
list.add(new Sha256Hash(i, new byte[32]));
87+
}
88+
89+
msg = new InventoryMessage(list, Protocol.Inventory.InventoryType.BLOCK);
90+
91+
method.invoke(p2pEventHandler, peer, msg.getSendBytes());
92+
93+
count = peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10);
94+
95+
Assert.assertEquals(200, count);
96+
97+
list.clear();
98+
for (int i = 0; i < 100; i++) {
99+
list.add(new Sha256Hash(i, new byte[32]));
100+
}
101+
102+
msg = new InventoryMessage(list, Protocol.Inventory.InventoryType.BLOCK);
103+
104+
method.invoke(p2pEventHandler, peer, msg.getSendBytes());
105+
106+
count = peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10);
107+
108+
Assert.assertEquals(300, count);
109+
110+
}
111+
}

0 commit comments

Comments
 (0)