1+ package org .tron .plugins ;
2+
3+ import static java .nio .charset .StandardCharsets .UTF_8 ;
4+ import static org .iq80 .leveldb .impl .Iq80DBFactory .factory ;
5+
6+ import java .io .BufferedInputStream ;
7+ import java .io .BufferedReader ;
8+ import java .io .BufferedWriter ;
9+ import java .io .File ;
10+ import java .io .FileInputStream ;
11+ import java .io .FileOutputStream ;
12+ import java .io .IOException ;
13+ import java .io .InputStream ;
14+ import java .io .InputStreamReader ;
15+ import java .io .OutputStream ;
16+ import java .io .OutputStreamWriter ;
17+ import java .nio .charset .StandardCharsets ;
18+ import java .nio .file .Path ;
19+ import java .nio .file .Paths ;
20+ import java .util .ArrayList ;
21+ import java .util .Arrays ;
22+ import java .util .List ;
23+ import java .util .Objects ;
24+ import java .util .Properties ;
25+ import java .util .concurrent .Callable ;
26+ import java .util .stream .Collectors ;
27+ import lombok .extern .slf4j .Slf4j ;
28+ import me .tongfei .progressbar .ProgressBar ;
29+ import org .iq80 .leveldb .CompressionType ;
30+ import org .iq80 .leveldb .DB ;
31+ import org .iq80 .leveldb .Options ;
32+ import org .iq80 .leveldb .impl .Filename ;
33+ import picocli .CommandLine ;
34+ import picocli .CommandLine .Option ;
35+
36+ @ Slf4j (topic = "archive" )
37+ @ CommandLine .Command (name = "archive" , description = "a helper to rewrite leveldb manifest." )
38+ public class DbArchive implements Callable <Integer > {
39+
40+ @ CommandLine .Spec
41+ CommandLine .Model .CommandSpec spec ;
42+
43+ @ Option (names = {"-d" , "--database-directory" },
44+ defaultValue = "output-directory/database" ,
45+ description = "java-tron database directory. Default: ${DEFAULT-VALUE}" )
46+ private String databaseDirectory ;
47+
48+ @ Option (names = {"-b" , "--batch-size" },
49+ defaultValue = "80000" ,
50+ description = "deal manifest batch size. Default: ${DEFAULT-VALUE}" )
51+ private int maxBatchSize ;
52+
53+ @ Option (names = {"-m" , "--manifest-size" },
54+ defaultValue = "0" ,
55+ description = "manifest min size(M) to archive. Default: ${DEFAULT-VALUE}" )
56+ private int maxManifestSize ;
57+
58+ @ Option (names = {"-h" , "--help" }, help = true )
59+ private boolean help ;
60+
61+
62+ @ Override
63+ public Integer call () throws Exception {
64+ if (help ) {
65+ spec .commandLine ().usage (System .out );
66+ return 0 ;
67+ }
68+
69+ File dbDirectory = new File (databaseDirectory );
70+ if (!dbDirectory .exists ()) {
71+ spec .commandLine ().getErr ().format ("Directory %s does not exist." ,
72+ databaseDirectory ).println ();
73+ logger .info ("Directory {} does not exist." , databaseDirectory );
74+ return 404 ;
75+ }
76+
77+ if (!dbDirectory .isDirectory ()) {
78+ spec .commandLine ().getErr ().format (" %s is not directory." ,
79+ databaseDirectory ).println ();
80+ logger .info ("{} is not directory." , databaseDirectory );
81+ return 405 ;
82+ }
83+
84+ List <File > files = Arrays .stream (Objects .requireNonNull (dbDirectory .listFiles ()))
85+ .filter (File ::isDirectory ).collect (
86+ Collectors .toList ());
87+
88+ if (files .isEmpty ()) {
89+ spec .commandLine ().getErr ().format ("Directory %s does not contain any database." ,
90+ databaseDirectory ).println ();
91+ logger .info ("Directory {} does not contain any database." , databaseDirectory );
92+ return 0 ;
93+ }
94+ final long time = System .currentTimeMillis ();
95+ List <ArchiveManifest > services = new ArrayList <>();
96+ files .forEach (f -> services .add (new ArchiveManifest (databaseDirectory , f .getName (),
97+ maxManifestSize , maxBatchSize )));
98+ ProgressBar .wrap (services .stream (), "archive task" ).parallel ().forEach (Archive ::doArchive );
99+ spec .commandLine ().getOut ().println ("archive db done." );
100+
101+ logger .info ("DatabaseDirectory:{}, maxManifestSize:{}, maxBatchSize:{},"
102+ + "database reopen use {} seconds total." ,
103+ databaseDirectory , maxManifestSize , maxBatchSize ,
104+ (System .currentTimeMillis () - time ) / 1000 );
105+
106+ return 0 ;
107+ }
108+
109+
110+ interface Archive {
111+
112+ default void doArchive () {
113+
114+ }
115+ }
116+
117+ static class ArchiveManifest implements Archive {
118+
119+ private static final String KEY_ENGINE = "ENGINE" ;
120+ private static final String LEVELDB = "LEVELDB" ;
121+
122+ private final Path srcDbPath ;
123+ private final String name ;
124+ private final Options options ;
125+ private final long startTime ;
126+
127+ public ArchiveManifest (String src , String name , int maxManifestSize , int maxBatchSize ) {
128+ this .name = name ;
129+ this .srcDbPath = Paths .get (src , name );
130+ this .startTime = System .currentTimeMillis ();
131+ this .options = newDefaultLevelDbOptions ();
132+ this .options .maxManifestSize (maxManifestSize );
133+ this .options .maxBatchSize (maxBatchSize );
134+ }
135+
136+ public static Options newDefaultLevelDbOptions () {
137+ Options dbOptions = new Options ();
138+ dbOptions .createIfMissing (true );
139+ dbOptions .paranoidChecks (true );
140+ dbOptions .verifyChecksums (true );
141+ dbOptions .compressionType (CompressionType .SNAPPY );
142+ dbOptions .blockSize (4 * 1024 );
143+ dbOptions .writeBufferSize (10 * 1024 * 1024 );
144+ dbOptions .cacheSize (10 * 1024 * 1024L );
145+ dbOptions .maxOpenFiles (1000 );
146+ dbOptions .maxBatchSize (64_000 );
147+ dbOptions .maxManifestSize (128 );
148+ return dbOptions ;
149+ }
150+
151+ public void open () throws IOException {
152+ DB database = factory .open (this .srcDbPath .toFile (), this .options );
153+ database .close ();
154+ }
155+
156+ public boolean checkManifest (String dir ) throws IOException {
157+ // Read "CURRENT" file, which contains a pointer to the current manifest file
158+ File currentFile = new File (dir , Filename .currentFileName ());
159+ if (!currentFile .exists ()) {
160+ return false ;
161+ }
162+ String currentName = com .google .common .io .Files .asCharSource (currentFile , UTF_8 ).read ();
163+ if (currentName .isEmpty () || currentName .charAt (currentName .length () - 1 ) != '\n' ) {
164+ return false ;
165+ }
166+ currentName = currentName .substring (0 , currentName .length () - 1 );
167+ File current = new File (dir , currentName );
168+ if (!current .isFile ()) {
169+ return false ;
170+ }
171+ long maxSize = options .maxManifestSize ();
172+ if (maxSize < 0 ) {
173+ return false ;
174+ }
175+ logger .info ("CurrentName {}/{},size {} kb." , dir , currentName , current .length () / 1024 );
176+ if ("market_pair_price_to_order" .equalsIgnoreCase (this .name )) {
177+ logger .info ("Db {} ignored." , this .name );
178+ return false ;
179+ }
180+ return current .length () >= maxSize * 1024 * 1024 ;
181+ }
182+
183+ @ Override
184+ public void doArchive () {
185+ File levelDbFile = srcDbPath .toFile ();
186+ if (!levelDbFile .exists ()) {
187+ logger .info ("File {},does not exist, ignored." , srcDbPath );
188+ return ;
189+ }
190+ if (!checkEngine ()) {
191+ logger .info ("Db {},not leveldb, ignored." , this .name );
192+ return ;
193+ }
194+ try {
195+ if (checkManifest (levelDbFile .toString ())) {
196+ open ();
197+ logger .info ("Db {} archive use {} ms." , this .name ,
198+ (System .currentTimeMillis () - startTime ));
199+ } else {
200+ logger .info ("Db {},no need, ignored." , levelDbFile );
201+ }
202+ } catch (Exception e ) {
203+ throw new RuntimeException ("Db " + this .name + " archive failed." , e );
204+ }
205+ }
206+
207+ public boolean checkEngine () {
208+ String dir = this .srcDbPath .toString ();
209+ String enginePath = dir + File .separator + "engine.properties" ;
210+ if (!new File (enginePath ).exists () && !writeProperty (enginePath , KEY_ENGINE , LEVELDB )) {
211+ return false ;
212+ }
213+ String engine = readProperty (enginePath , KEY_ENGINE );
214+ return LEVELDB .equals (engine );
215+ }
216+
217+ public static String readProperty (String file , String key ) {
218+ try (FileInputStream fileInputStream = new FileInputStream (file );
219+ InputStream inputStream = new BufferedInputStream (fileInputStream )) {
220+ Properties prop = new Properties ();
221+ prop .load (inputStream );
222+ return new String (prop .getProperty (key , "" ).getBytes (StandardCharsets .ISO_8859_1 ),
223+ UTF_8 );
224+ } catch (Exception e ) {
225+ logger .error ("readProperty" , e );
226+ return "" ;
227+ }
228+ }
229+
230+ public static boolean writeProperty (String file , String key , String value ) {
231+ try (OutputStream out = new FileOutputStream (file );
232+ FileInputStream fis = new FileInputStream (file );
233+ BufferedWriter bw = new BufferedWriter (new OutputStreamWriter (out , UTF_8 ))) {
234+ BufferedReader bf = new BufferedReader (new InputStreamReader (fis , UTF_8 ));
235+ Properties properties = new Properties ();
236+ properties .load (bf );
237+ properties .setProperty (key , value );
238+ properties .store (bw , "Generated by the application. PLEASE DO NOT EDIT! " );
239+ } catch (Exception e ) {
240+ logger .warn ("writeProperty" , e );
241+ return false ;
242+ }
243+ return true ;
244+ }
245+
246+ }
247+
248+ }
0 commit comments