33import com .google .common .hash .BloomFilter ;
44import com .google .common .hash .Funnels ;
55import com .google .common .primitives .Longs ;
6+ import java .io .BufferedInputStream ;
7+ import java .io .BufferedOutputStream ;
8+ import java .io .InputStream ;
9+ import java .io .InputStreamReader ;
10+ import java .io .OutputStream ;
11+ import java .io .Reader ;
12+ import java .io .Writer ;
13+ import java .nio .charset .StandardCharsets ;
14+ import java .nio .file .Files ;
15+ import java .nio .file .Path ;
616import java .nio .file .Paths ;
17+ import java .nio .file .StandardOpenOption ;
718import java .util .Iterator ;
819import java .util .Map ;
920import java .util .Map .Entry ;
21+ import java .util .Properties ;
22+ import java .util .concurrent .CompletableFuture ;
23+ import java .util .concurrent .atomic .AtomicBoolean ;
1024import lombok .extern .slf4j .Slf4j ;
1125import org .apache .commons .lang3 .ArrayUtils ;
1226import org .bouncycastle .util .encoders .Hex ;
1731import org .tron .common .storage .leveldb .LevelDbDataSourceImpl ;
1832import org .tron .common .storage .rocksdb .RocksDbDataSourceImpl ;
1933import org .tron .common .utils .ByteArray ;
34+ import org .tron .common .utils .FileUtil ;
2035import org .tron .common .utils .JsonUtil ;
2136import org .tron .common .utils .StorageUtils ;
2237import org .tron .core .capsule .BytesCapsule ;
@@ -42,6 +57,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
4257 private BloomFilter <byte []>[] bloomFilters = new BloomFilter [2 ];
4358 // filterStartBlock record the start block of the active filter
4459 private volatile long filterStartBlock = INVALID_BLOCK ;
60+ private volatile long currentBlockNum = INVALID_BLOCK ;
4561 // currentFilterIndex records the index of the active filter
4662 private volatile int currentFilterIndex = 0 ;
4763
@@ -57,6 +73,12 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
5773 // replace persistentStore and optimizes startup performance
5874 private RecentTransactionStore recentTransactionStore ;
5975
76+ private final Path cacheFile0 ;
77+ private final Path cacheFile1 ;
78+ private final Path cacheProperties ;
79+ private final Path cacheDir ;
80+ private AtomicBoolean isValid = new AtomicBoolean (false );
81+
6082 public TxCacheDB (String name , RecentTransactionStore recentTransactionStore ) {
6183 this .name = name ;
6284 this .TRANSACTION_COUNT =
@@ -85,6 +107,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
85107 MAX_BLOCK_SIZE * TRANSACTION_COUNT );
86108 this .bloomFilters [1 ] = BloomFilter .create (Funnels .byteArrayFunnel (),
87109 MAX_BLOCK_SIZE * TRANSACTION_COUNT );
110+ cacheDir = Paths .get (CommonParameter .getInstance ().getOutputDirectory (), ".cache" );
111+ this .cacheFile0 = Paths .get (cacheDir .toString (), "bloomFilters_0" );
112+ this .cacheFile1 = Paths .get (cacheDir .toString (), "bloomFilters_1" );
113+ this .cacheProperties = Paths .get (cacheDir .toString (), "txCache.properties" );
88114
89115 }
90116
@@ -110,6 +136,10 @@ private void initCache() {
110136 }
111137
112138 public void init () {
139+ if (recovery ()) {
140+ isValid .set (true );
141+ return ;
142+ }
113143 long size = recentTransactionStore .size ();
114144 if (size != MAX_BLOCK_SIZE ) {
115145 // 0. load from persistentStore
@@ -129,6 +159,7 @@ public void init() {
129159 logger .info ("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms." ,
130160 bloomFilters [1 ].approximateElementCount (), bloomFilters [1 ].expectedFpp (),
131161 System .currentTimeMillis () - start );
162+ isValid .set (true );
132163 }
133164
134165 @ Override
@@ -172,7 +203,7 @@ public void put(byte[] key, byte[] value) {
172203 MAX_BLOCK_SIZE * TRANSACTION_COUNT );
173204 }
174205 bloomFilters [currentFilterIndex ].put (key );
175-
206+ currentBlockNum = blockNum ;
176207 if (lastMetricBlock != blockNum ) {
177208 lastMetricBlock = blockNum ;
178209 Metrics .gaugeSet (MetricKeys .Gauge .TX_CACHE ,
@@ -208,13 +239,15 @@ public Iterator<Entry<byte[], byte[]>> iterator() {
208239 }
209240
210241 @ Override
211- public void flush (Map <WrappedByteArray , WrappedByteArray > batch ) {
242+ public synchronized void flush (Map <WrappedByteArray , WrappedByteArray > batch ) {
243+ isValid .set (false );
212244 batch .forEach ((k , v ) -> this .put (k .getBytes (), v .getBytes ()));
245+ isValid .set (true );
213246 }
214247
215248 @ Override
216249 public void close () {
217- reset ();
250+ dump ();
218251 bloomFilters [0 ] = null ;
219252 bloomFilters [1 ] = null ;
220253 persistentStore .close ();
@@ -224,6 +257,116 @@ public void close() {
224257 public void reset () {
225258 }
226259
260+ private boolean recovery () {
261+ FileUtil .createDirIfNotExists (this .cacheDir .toString ());
262+ logger .info ("recovery bloomFilters start." );
263+ CompletableFuture <Boolean > loadProperties = CompletableFuture .supplyAsync (this ::loadProperties );
264+ CompletableFuture <Boolean > tk0 = loadProperties .thenApplyAsync (
265+ v -> recovery (0 , this .cacheFile0 ));
266+ CompletableFuture <Boolean > tk1 = loadProperties .thenApplyAsync (
267+ v -> recovery (1 , this .cacheFile1 ));
268+
269+ return CompletableFuture .allOf (tk0 , tk1 ).thenApply (v -> {
270+ logger .info ("recovery bloomFilters success." );
271+ return true ;
272+ }).exceptionally (this ::handleException ).join ();
273+ }
274+
275+ private boolean recovery (int index , Path file ) {
276+ try (InputStream in = new BufferedInputStream (Files .newInputStream (file ,
277+ StandardOpenOption .READ , StandardOpenOption .DELETE_ON_CLOSE ))) {
278+ logger .info ("recovery bloomFilter[{}] from file." , index );
279+ long start = System .currentTimeMillis ();
280+ bloomFilters [index ] = BloomFilter .readFrom (in , Funnels .byteArrayFunnel ());
281+ logger .info ("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms." ,
282+ index , bloomFilters [index ].approximateElementCount (), bloomFilters [index ].expectedFpp (),
283+ System .currentTimeMillis () - start );
284+ return true ;
285+ } catch (Exception e ) {
286+ throw new RuntimeException (e );
287+ }
288+ }
289+
290+ private boolean handleException (Throwable e ) {
291+ bloomFilters [0 ] = BloomFilter .create (Funnels .byteArrayFunnel (),
292+ MAX_BLOCK_SIZE * TRANSACTION_COUNT );
293+ bloomFilters [1 ] = BloomFilter .create (Funnels .byteArrayFunnel (),
294+ MAX_BLOCK_SIZE * TRANSACTION_COUNT );
295+ try {
296+ Files .deleteIfExists (this .cacheFile0 );
297+ Files .deleteIfExists (this .cacheFile1 );
298+ } catch (Exception ignored ) {
299+
300+ }
301+ logger .info ("recovery bloomFilters failed. {}" , e .getMessage ());
302+ logger .info ("rollback to previous mode." );
303+ return false ;
304+ }
305+
306+ private void dump () {
307+ if (!isValid .get ()) {
308+ logger .info ("bloomFilters is not valid." );
309+ }
310+ FileUtil .createDirIfNotExists (this .cacheDir .toString ());
311+ logger .info ("dump bloomFilters start." );
312+ CompletableFuture <Void > task0 = CompletableFuture .runAsync (
313+ () -> dump (0 , this .cacheFile0 ));
314+ CompletableFuture <Void > task1 = CompletableFuture .runAsync (
315+ () -> dump (1 , this .cacheFile1 ));
316+ CompletableFuture .allOf (task0 , task1 ).thenRun (() -> {
317+ writeProperties ();
318+ logger .info ("dump bloomFilters done." );
319+
320+ }).exceptionally (e -> {
321+ logger .info ("dump bloomFilters to file failed. {}" , e .getMessage ());
322+ return null ;
323+ }).join ();
324+ }
325+
326+ private void dump (int index , Path file ) {
327+ try (OutputStream out = new BufferedOutputStream (Files .newOutputStream (file ))) {
328+ logger .info ("dump bloomFilters[{}] to file." , index );
329+ long start = System .currentTimeMillis ();
330+ bloomFilters [index ].writeTo (out );
331+ logger .info ("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms." ,
332+ index , bloomFilters [index ].approximateElementCount (), bloomFilters [index ].expectedFpp (),
333+ System .currentTimeMillis () - start );
334+ } catch (Exception e ) {
335+ throw new RuntimeException (e );
336+ }
337+ }
338+
339+ private boolean loadProperties () {
340+ try (Reader r = new InputStreamReader (new BufferedInputStream (Files .newInputStream (
341+ this .cacheProperties , StandardOpenOption .READ , StandardOpenOption .DELETE_ON_CLOSE )),
342+ StandardCharsets .UTF_8 )) {
343+ Properties properties = new Properties ();
344+ properties .load (r );
345+ filterStartBlock = Long .parseLong (properties .getProperty ("filterStartBlock" ));
346+ currentBlockNum = Long .parseLong (properties .getProperty ("currentBlockNum" ));
347+ currentFilterIndex = Integer .parseInt (properties .getProperty ("currentFilterIndex" ));
348+ logger .info ("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done." ,
349+ filterStartBlock , currentBlockNum , currentFilterIndex );
350+ return true ;
351+ } catch (Exception e ) {
352+ throw new RuntimeException (e );
353+ }
354+ }
355+
356+ private void writeProperties () {
357+ try (Writer w = Files .newBufferedWriter (this .cacheProperties , StandardCharsets .UTF_8 )) {
358+ Properties properties = new Properties ();
359+ properties .setProperty ("filterStartBlock" , String .valueOf (filterStartBlock ));
360+ properties .setProperty ("currentBlockNum" , String .valueOf (currentBlockNum ));
361+ properties .setProperty ("currentFilterIndex" , String .valueOf (currentFilterIndex ));
362+ properties .store (w , "Generated by the application. PLEASE DO NOT EDIT! " );
363+ logger .info ("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done." ,
364+ filterStartBlock , currentBlockNum , currentFilterIndex );
365+ } catch (Exception e ) {
366+ throw new RuntimeException (e );
367+ }
368+ }
369+
227370 @ Override
228371 public TxCacheDB newInstance () {
229372 return new TxCacheDB (name , recentTransactionStore );
0 commit comments