Skip to content

Commit 66a646e

Browse files
author
morgan.peng
committed
feat(db): prune lite node data automatically
Prune lite node data automatically
1 parent d4c2761 commit 66a646e

11 files changed

Lines changed: 492 additions & 4 deletions

File tree

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package org.tron.common.storage.prune;
2+
3+
import com.google.common.primitives.Longs;
4+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5+
import java.time.OffsetDateTime;
6+
import java.time.ZoneOffset;
7+
import java.util.Arrays;
8+
import java.util.HashMap;
9+
import java.util.HashSet;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Objects;
13+
import java.util.Set;
14+
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.ScheduledExecutorService;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.stream.Collectors;
19+
import javax.annotation.PostConstruct;
20+
import lombok.extern.slf4j.Slf4j;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.stereotype.Component;
23+
import org.tron.common.parameter.CommonParameter;
24+
import org.tron.common.utils.Commons;
25+
import org.tron.core.ChainBaseManager;
26+
import org.tron.core.capsule.BlockCapsule;
27+
import org.tron.core.db2.common.Flusher;
28+
import org.tron.core.db2.common.Value;
29+
import org.tron.core.db2.common.WrappedByteArray;
30+
import org.tron.core.db2.core.SnapshotRoot;
31+
32+
@Slf4j(topic = "db")
33+
@Component
34+
public class ChainDataPruner {
35+
36+
private static final String BLOCK_INDEX_STORE_NAME = "block-index";
37+
private static final String BLOCK_STORE_NAME = "block";
38+
private static final String TRANSACTION_STORE_NAME = "trans";
39+
private static final String TRANSACTION_RET_STORE_NAME = "transactionRetStore";
40+
private static final Set<String> PRUNE_DBS = new HashSet<>(Arrays.asList(BLOCK_INDEX_STORE_NAME
41+
, BLOCK_STORE_NAME, TRANSACTION_STORE_NAME, TRANSACTION_RET_STORE_NAME));
42+
private long blocksToRetain = CommonParameter.getInstance().getStorage().
43+
getDbAutoPruneRetain();
44+
private long blocksBatchFlush = CommonParameter.getInstance().getStorage().getDbAutoPruneBatch();
45+
private static Map<String, SnapshotRoot> snapshotRootMap = new ConcurrentHashMap<>();
46+
47+
private ScheduledExecutorService pruneExecutor = null;
48+
49+
@Autowired
50+
private ChainBaseManager chainBaseManager;
51+
52+
@PostConstruct
53+
public void init() {
54+
if ((!chainBaseManager.isLiteNode()) ||
55+
(!CommonParameter.getInstance().getStorage().isDbAutoPrune())) {
56+
return;
57+
}
58+
pruneExecutor = Executors.newSingleThreadScheduledExecutor(
59+
new ThreadFactoryBuilder().setNameFormat("db-prune-thread-%d").build());
60+
pruneExecutor.scheduleWithFixedDelay(() -> {
61+
try {
62+
if (!shouldBeginPrune()) {
63+
return;
64+
}
65+
prune();
66+
} catch (InterruptedException e) {
67+
logger.warn("Prune chain data thread interrupted!");
68+
Thread.currentThread().interrupt();
69+
}
70+
}, 60,
71+
CommonParameter.getInstance().getStorage().getDbAutoPruneFrequency(), TimeUnit.SECONDS);
72+
}
73+
74+
private boolean shouldBeginPrune() {
75+
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
76+
int currentHour = now.getHour();
77+
return currentHour >= 20 || currentHour < 2;
78+
}
79+
80+
public static void register(String dbName, SnapshotRoot snapshotRoot) {
81+
if ((!CommonParameter.getInstance().getStorage().isDbAutoPrune())
82+
||(!PRUNE_DBS.contains(dbName))) {
83+
return;
84+
}
85+
snapshotRootMap.put(dbName, snapshotRoot);
86+
}
87+
88+
public void prune() throws InterruptedException {
89+
long lowestBlockNumber = chainBaseManager.getBlockStore().getLimitNumber(1, 1).
90+
stream().map(BlockCapsule::getNum).findFirst().get();
91+
long latestBlockNumber = chainBaseManager.getDynamicPropertiesStore().
92+
getLatestBlockHeaderNumberFromDB();
93+
if (latestBlockNumber - lowestBlockNumber + 1 > blocksToRetain) {
94+
doPrune(lowestBlockNumber, latestBlockNumber);
95+
}
96+
}
97+
98+
private void doPrune(long lowestBlockNumber, long latestBlockNumber) {
99+
long toFetchCount =
100+
Math.min((latestBlockNumber - lowestBlockNumber + 1 - blocksToRetain), blocksBatchFlush);
101+
List<BlockCapsule> blockCapsuleList = chainBaseManager.getBlockStore()
102+
.getLimitNumber(lowestBlockNumber, toFetchCount);
103+
Map<WrappedByteArray, WrappedByteArray> blockIdBatch = new HashMap<>();
104+
Map<WrappedByteArray, WrappedByteArray> blockNumBatch = new HashMap<>();
105+
Map<WrappedByteArray, WrappedByteArray> transIdBatch = new HashMap<>();
106+
prepareWriteBatch(blockIdBatch, blockNumBatch, transIdBatch, blockCapsuleList);
107+
flushDb(blockIdBatch, blockNumBatch, transIdBatch);
108+
}
109+
110+
private void flushDb(Map<WrappedByteArray, WrappedByteArray> blockIdBatch,
111+
Map<WrappedByteArray, WrappedByteArray> blockNumBatch,
112+
Map<WrappedByteArray, WrappedByteArray> transIdBatch) {
113+
SnapshotRoot transactionRoot = snapshotRootMap.get(TRANSACTION_STORE_NAME);
114+
((Flusher)transactionRoot.getDb()).flush(transIdBatch);
115+
SnapshotRoot transactionRetRoot = snapshotRootMap.get(TRANSACTION_RET_STORE_NAME);
116+
((Flusher)transactionRetRoot.getDb()).flush(blockNumBatch);
117+
SnapshotRoot blockIndexRoot = snapshotRootMap.get(BLOCK_INDEX_STORE_NAME);
118+
((Flusher)blockIndexRoot.getDb()).flush(blockNumBatch);
119+
SnapshotRoot blockRoot = snapshotRootMap.get(BLOCK_STORE_NAME);
120+
((Flusher)blockRoot.getDb()).flush(blockIdBatch);
121+
}
122+
123+
private void prepareWriteBatch (
124+
Map<WrappedByteArray, WrappedByteArray> blockIdBatch,
125+
Map<WrappedByteArray, WrappedByteArray> blockNumBatch,
126+
Map<WrappedByteArray, WrappedByteArray> transIdBatch,
127+
List<BlockCapsule> blockCapsuleList) {
128+
for (BlockCapsule blockCapsule: blockCapsuleList) {
129+
blockIdBatch.put(WrappedByteArray.of(blockCapsule.getBlockId().getBytes()),
130+
WrappedByteArray.of(Value.of(Value.Operator.DELETE, null).getBytes()));
131+
blockNumBatch.put(WrappedByteArray.of(Longs.toByteArray(blockCapsule.getNum())),
132+
WrappedByteArray.of(Value.of(Value.Operator.DELETE, null).getBytes()));
133+
blockCapsule.getTransactions().forEach(tx-> transIdBatch.put(WrappedByteArray.of(tx.
134+
getTransactionId().getBytes()), WrappedByteArray.of(Value.of(Value.Operator.DELETE,
135+
null).getBytes())));
136+
}
137+
}
138+
139+
public void shutdown() {
140+
if (Objects.nonNull(pruneExecutor)) {
141+
try {
142+
pruneExecutor.shutdown();
143+
} catch (Exception e) {
144+
logger.error("Chain pruner shutdown error: {}", e.getMessage());
145+
}
146+
}
147+
}
148+
149+
}

chainbase/src/main/java/org/tron/core/ChainBaseManager.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.commons.collections4.CollectionUtils;
1212
import org.springframework.beans.factory.annotation.Autowired;
1313
import org.springframework.stereotype.Component;
14+
import org.tron.common.parameter.CommonParameter;
1415
import org.tron.common.storage.metric.DbStatService;
1516
import org.tron.common.utils.ForkController;
1617
import org.tron.common.utils.Sha256Hash;
@@ -241,8 +242,6 @@ public class ChainBaseManager {
241242
@Setter
242243
private NodeType nodeType;
243244

244-
@Getter
245-
@Setter
246245
private long lowestBlockNum = -1; // except num = 0.
247246

248247
public void closeOneStore(ITronChainBase database) {
@@ -431,6 +430,15 @@ private void init() {
431430
this.nodeType = getLowestBlockNum() > 1 ? NodeType.LITE : NodeType.FULL;
432431
}
433432

433+
public long getLowestBlockNum(){
434+
if(isLiteNode() && CommonParameter.getInstance().getStorage().isDbAutoPrune()) {
435+
return this.blockIndexStore.getLimitNumber(1, 1).stream()
436+
.map(BlockId::getNum).findFirst().orElseThrow(
437+
() -> new IllegalArgumentException("LowestBlockNum not found!"));
438+
}
439+
return lowestBlockNum;
440+
}
441+
434442
public boolean isLiteNode() {
435443
return getNodeType() == NodeType.LITE;
436444
}

chainbase/src/main/java/org/tron/core/db2/core/SnapshotRoot.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@
22

33
import com.google.common.collect.Maps;
44
import com.google.common.collect.Streams;
5+
import java.util.Arrays;
56
import java.util.HashMap;
67
import java.util.Iterator;
78
import java.util.List;
89
import java.util.Map;
910
import java.util.Objects;
11+
import java.util.Set;
1012
import java.util.stream.Collectors;
1113
import lombok.Getter;
1214
import org.tron.common.cache.CacheManager;
1315
import org.tron.common.cache.CacheType;
1416
import org.tron.common.cache.TronCache;
1517
import org.tron.common.parameter.CommonParameter;
18+
import org.tron.common.storage.prune.ChainDataPruner;
1619
import org.tron.common.utils.ByteArray;
1720
import org.tron.core.ChainBaseManager;
1821
import org.tron.core.capsule.AccountCapsule;
@@ -38,6 +41,7 @@ public SnapshotRoot(DB<byte[], byte[]> db) {
3841
if (CACHE_DBS.contains(this.db.getDbName())) {
3942
this.cache = CacheManager.allocate(CacheType.findByType(this.db.getDbName()));
4043
}
44+
ChainDataPruner.register(this.db.getDbName(), this);
4145
}
4246

4347
private boolean needOptAsset() {

common/src/main/java/org/tron/core/Constant.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.tron.core;
22

3+
import org.tron.core.config.Parameter.ChainConstant;
4+
35
public class Constant {
46

57
//config for testnet, mainnet, beta
@@ -39,6 +41,8 @@ public class Constant {
3941
public static final int ONE_HUNDRED = 100;
4042
public static final int ONE_THOUSAND = 1000;
4143

44+
public static final long ONE_DAY_BLOCKS_PREDICT = 24 * 60 * 60 * 1000 / ChainConstant.BLOCK_PRODUCED_INTERVAL;
45+
4246
public static final byte[] ZTRON_EXPANDSEED_PERSONALIZATION = {'Z', 't', 'r', 'o', 'n', '_', 'E',
4347
'x',
4448
'p', 'a', 'n', 'd', 'S', 'e', 'e', 'd'};

common/src/main/java/org/tron/core/config/args/Storage.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.tron.common.utils.DbOptionalsUtils;
3333
import org.tron.common.utils.FileUtil;
3434
import org.tron.common.utils.Property;
35+
import org.tron.core.Constant;
3536

3637
/**
3738
* Custom storage configurations
@@ -54,6 +55,10 @@ public class Storage {
5455
private static final String ESTIMATED_TRANSACTIONS_CONFIG_KEY =
5556
"storage.txCache.estimatedTransactions";
5657
private static final String SNAPSHOT_MAX_FLUSH_COUNT_CONFIG_KEY = "storage.snapshot.maxFlushCount";
58+
private static final String DB_AUTO_PRUNE_SWITCH_CONFIG_KEY = "storage.prune.enable";
59+
private static final String DB_AUTO_PRUNE_RETAIN_CONFIG_KEY = "storage.prune.retain";
60+
private static final String DB_AUTO_PRUNE_FREQUENCY_CONFIG_KEY = "storage.prune.frequency";
61+
private static final String DB_AUTO_PRUNE_BATCH_CONFIG_KEY = "storage.prune.batch";
5762
private static final String PROPERTIES_CONFIG_KEY = "storage.properties";
5863
private static final String PROPERTIES_CONFIG_DB_KEY = "storage";
5964
private static final String PROPERTIES_CONFIG_DEFAULT_KEY = "default";
@@ -91,6 +96,11 @@ public class Storage {
9196
private static final boolean DEFAULT_CHECKPOINT_SYNC = true;
9297
private static final int DEFAULT_ESTIMATED_TRANSACTIONS = 1000;
9398
private static final int DEFAULT_SNAPSHOT_MAX_FLUSH_COUNT = 1;
99+
private static final boolean DEFAULT_DB_AUTO_PRUNE_SWITCH = false;
100+
private static final int DEFAULT_DB_AUTO_PRUNE_RETAIN = 864000;
101+
private static final int DEFAULT_DB_AUTO_PRUNE_FREQUENCY = 3;
102+
private static final int DEFAULT_DB_AUTO_PRUNE_BATCH = 50;
103+
94104
private Config storage;
95105

96106
/**
@@ -112,6 +122,22 @@ public class Storage {
112122
@Setter
113123
private int maxFlushCount;
114124

125+
@Getter
126+
@Setter
127+
private boolean dbAutoPrune;
128+
129+
@Getter
130+
@Setter
131+
private int dbAutoPruneRetain;
132+
133+
@Getter
134+
@Setter
135+
private int dbAutoPruneFrequency;
136+
137+
@Getter
138+
@Setter
139+
private int dbAutoPruneBatch;
140+
115141
/**
116142
* Index storage directory: /path/to/{indexDirectory}
117143
*/
@@ -182,6 +208,57 @@ public static int getSnapshotMaxFlushCountFromConfig(final Config config) {
182208
return maxFlushCountConfig;
183209
}
184210

211+
public void setDbAutoPruneInfo(final Config config) {
212+
setDbAutoPrune(Storage.getDbAutoPruneSwitchFromConfig(config));
213+
setDbAutoPruneRetain(Storage.getDbAutoPruneRetainFromConfig(config));
214+
setDbAutoPruneFrequency(
215+
Storage.getDbAutoPruneFrequencyFromConfig(config));
216+
setDbAutoPruneBatch(Storage.getDbAutoPruneBatchFromConfig(config));
217+
}
218+
219+
public static boolean getDbAutoPruneSwitchFromConfig(final Config config) {
220+
return config.hasPath(DB_AUTO_PRUNE_SWITCH_CONFIG_KEY)
221+
? config.getBoolean(DB_AUTO_PRUNE_SWITCH_CONFIG_KEY) : DEFAULT_DB_AUTO_PRUNE_SWITCH;
222+
}
223+
224+
public static int getDbAutoPruneRetainFromConfig(final Config config) {
225+
if (!config.hasPath(DB_AUTO_PRUNE_RETAIN_CONFIG_KEY)) {
226+
return DEFAULT_DB_AUTO_PRUNE_RETAIN;
227+
}
228+
int dbAutoPruneRetain = config.getInt(DB_AUTO_PRUNE_RETAIN_CONFIG_KEY);
229+
if (dbAutoPruneRetain < 30 * Constant.ONE_DAY_BLOCKS_PREDICT) {
230+
throw new IllegalArgumentException(
231+
"[storage.prune.retain] value must not be less than 864000!");
232+
}
233+
return dbAutoPruneRetain;
234+
}
235+
236+
public static int getDbAutoPruneFrequencyFromConfig(final Config config) {
237+
if (!config.hasPath(DB_AUTO_PRUNE_FREQUENCY_CONFIG_KEY)) {
238+
return DEFAULT_DB_AUTO_PRUNE_FREQUENCY;
239+
}
240+
int dbAutoPruneFrequency = config.getInt(DB_AUTO_PRUNE_FREQUENCY_CONFIG_KEY);
241+
if (dbAutoPruneFrequency < 1) {
242+
throw new IllegalArgumentException("[storage.prune.frequency] value must not be less than " +
243+
"1!");
244+
}
245+
return dbAutoPruneFrequency;
246+
}
247+
248+
public static int getDbAutoPruneBatchFromConfig(final Config config) {
249+
if (!config.hasPath(DB_AUTO_PRUNE_BATCH_CONFIG_KEY)) {
250+
return DEFAULT_DB_AUTO_PRUNE_BATCH;
251+
}
252+
int dbAutoPruneBatch = config.getInt(DB_AUTO_PRUNE_BATCH_CONFIG_KEY);
253+
if (dbAutoPruneBatch < 1) {
254+
throw new IllegalArgumentException("[storage.prune.batch] value must not be less than 1!");
255+
}
256+
if (dbAutoPruneBatch > 50) {
257+
throw new IllegalArgumentException("[storage.prune.batch] value must not be more than 50!");
258+
}
259+
return dbAutoPruneBatch;
260+
}
261+
185262
public static Boolean getContractParseSwitchFromConfig(final Config config) {
186263
return config.hasPath(EVENT_SUBSCRIBE_CONTRACT_PARSE)
187264
? config.getBoolean(EVENT_SUBSCRIBE_CONTRACT_PARSE)

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,7 @@ public static void setParam(final String[] args, final String confFileName) {
522522
PARAMETER.storage.setEstimatedBlockTransactions(
523523
Storage.getEstimatedTransactionsFromConfig(config));
524524
PARAMETER.storage.setMaxFlushCount(Storage.getSnapshotMaxFlushCountFromConfig(config));
525+
PARAMETER.storage.setDbAutoPruneInfo(config);
525526

526527
PARAMETER.storage.setDefaultDbOptions(config);
527528
PARAMETER.storage.setPropertyMapFromConfig(config);

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.tron.common.prometheus.MetricLabels;
6969
import org.tron.common.prometheus.Metrics;
7070
import org.tron.common.runtime.RuntimeImpl;
71+
import org.tron.common.storage.prune.ChainDataPruner;
7172
import org.tron.common.utils.ByteArray;
7273
import org.tron.common.utils.JsonUtil;
7374
import org.tron.common.utils.Pair;
@@ -224,6 +225,8 @@ public class Manager {
224225
@Autowired
225226
@Getter
226227
private ChainBaseManager chainBaseManager;
228+
@Autowired
229+
private ChainDataPruner chainDataPruner;
227230
// transactions cache
228231
private BlockingQueue<TransactionCapsule> pendingTransactions;
229232
@Getter
@@ -1911,6 +1914,7 @@ public NullifierStore getNullifierStore() {
19111914

19121915
public void closeAllStore() {
19131916
logger.info("******** Begin to close db. ********");
1917+
chainDataPruner.shutdown();
19141918
chainBaseManager.closeAllStore();
19151919
logger.info("******** End to close db. ********");
19161920
}

0 commit comments

Comments
 (0)