Skip to content

Commit 6c3a526

Browse files
committed
use WeightedRandom to choose broadcast peer
1 parent e242ae1 commit 6c3a526

1 file changed

Lines changed: 57 additions & 16 deletions

File tree

framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.tron.core.net.service.effective;
22

33
import java.util.Comparator;
4+
import java.util.HashMap;
45
import java.util.List;
6+
import java.util.Map;
57
import java.util.Optional;
68
import java.util.Random;
79
import java.util.concurrent.ScheduledExecutorService;
@@ -72,27 +74,40 @@ public void init() {
7274

7375
private void disconnectRandom() {
7476
int peerSize = tronNetDelegate.getActivePeer().size();
75-
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
76-
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
77+
if (peerSize < CommonParameter.getInstance().getMaxConnections()) {
78+
return;
79+
}
80+
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
81+
.filter(peer -> !peer.getChannel().isTrustPeer())
82+
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
83+
.collect(Collectors.toList());
84+
85+
if (peers.size() >= broadcastPeerSize) {
86+
long now = System.currentTimeMillis();
87+
Map<Object, Integer> weights = new HashMap<>();
88+
peers.forEach(peer -> weights.put(peer,
89+
(int) Math.ceil((double) (now - peer.getLastInteractiveTime()) / 500)));
90+
WeightedRandom weightedRandom = new WeightedRandom(weights);
91+
PeerConnection one;
92+
try {
93+
one = (PeerConnection) weightedRandom.next();
94+
} catch (Exception e) {
95+
logger.warn("Get random peer failed: {}", e.getMessage());
96+
return;
97+
}
98+
disconnectFromPeer(one, ReasonCode.RANDOM_ELIMINATION, DisconnectCause.RANDOM_ELIMINATION);
99+
} else {
100+
peers = tronNetDelegate.getActivePeer().stream()
77101
.filter(peer -> !peer.getChannel().isTrustPeer())
78-
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
102+
.filter(peer -> peer.isNeedSyncFromUs() || peer.isNeedSyncFromPeer())
79103
.collect(Collectors.toList());
80-
if (peers.size() >= broadcastPeerSize) {
81-
Optional<PeerConnection> one = getEarliestPeer(peers);
82-
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.RANDOM_ELIMINATION,
83-
DisconnectCause.RANDOM_ELIMINATION));
84-
} else {
85-
peers = tronNetDelegate.getActivePeer().stream()
86-
.filter(peer -> !peer.getChannel().isTrustPeer())
87-
.filter(peer -> peer.isNeedSyncFromUs() || peer.isNeedSyncFromPeer())
88-
.collect(Collectors.toList());
89-
int index = new Random().nextInt(peers.size());
90-
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
91-
DisconnectCause.RANDOM_ELIMINATION);
92-
}
104+
int index = new Random().nextInt(peers.size());
105+
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
106+
DisconnectCause.RANDOM_ELIMINATION);
93107
}
94108
}
95109

110+
96111
private void disconnectLan() {
97112
if (!isLanNode()) {
98113
return;
@@ -205,6 +220,32 @@ private enum DisconnectCause {
205220
ISOLATE2_PASSIVE,
206221
}
207222

223+
class WeightedRandom {
224+
225+
private final Map<Object, Integer> weights;
226+
private final Random random;
227+
228+
public WeightedRandom(Map<Object, Integer> weights) {
229+
this.weights = weights;
230+
this.random = new Random();
231+
}
232+
233+
public Object next() {
234+
int totalWeight = 0;
235+
for (int weight : weights.values()) {
236+
totalWeight += weight;
237+
}
238+
int randomNum = random.nextInt(totalWeight);
239+
for (Object key : weights.keySet()) {
240+
randomNum -= weights.get(key);
241+
if (randomNum < 0) {
242+
return key;
243+
}
244+
}
245+
throw new IllegalStateException("Sum of weights should not be negative.");
246+
}
247+
}
248+
208249
public void close() {
209250
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
210251
}

0 commit comments

Comments
 (0)