@@ -4,11 +4,10 @@ import { BigQueryExport } from './bigquery.js'
44export class FirestoreBatch {
55 constructor ( ) {
66 this . firestore = new Firestore ( {
7- // Increase timeout to 10 minutes for large batch operations
87 gaxOptions : {
98 grpc : {
10- max_receive_message_length : 100 * 1024 * 1024 , // 100MB
11- max_send_message_length : 100 * 1024 * 1024 , // 100MB
9+ max_receive_message_length : 500 * 1024 * 1024 , // 500MB
10+ max_send_message_length : 500 * 1024 * 1024 , // 500MB
1211 'grpc.max_connection_idle_ms' : 5 * 60 * 1000 , // 5 minutes
1312 'grpc.keepalive_time_ms' : 30 * 1000 , // 30 seconds
1413 'grpc.keepalive_timeout_ms' : 60 * 1000 , // 1 minute
@@ -17,12 +16,54 @@ export class FirestoreBatch {
1716 }
1817 } )
1918 this . bigquery = new BigQueryExport ( )
20- this . batchSizeDelete = 500
21- this . batchSizeWrite = 400 // Reduced batch size for better performance
22- this . maxConcurrentBatches = 100 // Reduced concurrent batches to avoid overwhelming
19+
20+ // Configuration constants
21+ this . config = {
22+ batchSize : {
23+ delete : 500 ,
24+ write : 400
25+ } ,
26+ maxConcurrentBatches : 200 ,
27+ retryCount : 5 ,
28+ timeout : 10 * 60 * 1000 // 10 minutes
29+ }
30+
31+ this . reset ( )
2332 }
2433
25- queueBatch ( operation ) {
34+ reset ( ) {
35+ this . currentBatch = [ ]
36+ this . batchPromises = [ ]
37+ }
38+
39+ getCurrentBatchSize ( operation ) {
40+ return this . config . batchSize [ operation === 'delete' ? 'delete' : 'write' ]
41+ }
42+
43+ async commitWithRetry ( batch , index ) {
44+ let lastError
45+
46+ for ( let attempt = 1 ; attempt <= this . config . retryCount ; attempt ++ ) {
47+ try {
48+ await batch . commit ( )
49+ return
50+ } catch ( error ) {
51+ lastError = error
52+ console . warn ( `Batch ${ index } attempt ${ attempt } failed:` , error . message )
53+
54+ if ( attempt < this . config . retryCount ) {
55+ const delayMs = Math . pow ( 2 , attempt ) * 500
56+ console . log ( `Retrying batch ${ index } in ${ delayMs } ms...` )
57+ await new Promise ( resolve => setTimeout ( resolve , delayMs ) )
58+ }
59+ }
60+ }
61+
62+ console . error ( `Batch ${ index } failed after ${ this . config . retryCount } attempts:` , lastError )
63+ throw lastError
64+ }
65+
66+ createBatch ( operation ) {
2667 const batch = this . firestore . batch ( )
2768
2869 this . currentBatch . forEach ( ( doc ) => {
@@ -32,140 +73,126 @@ export class FirestoreBatch {
3273 const docRef = this . firestore . collection ( this . collectionName ) . doc ( )
3374 batch . set ( docRef , doc )
3475 } else {
35- throw new Error ( ' Invalid operation' )
76+ throw new Error ( ` Invalid operation: ${ operation } ` )
3677 }
3778 } )
79+
80+ return batch
81+ }
82+
83+ queueBatch ( operation ) {
84+ const batch = this . createBatch ( operation )
3885 this . batchPromises . push ( batch )
3986 this . currentBatch = [ ]
4087 }
4188
4289 async commitBatches ( ) {
90+ if ( this . batchPromises . length === 0 ) return
91+
4392 console . log ( `Committing ${ this . batchPromises . length } batches to ${ this . collectionName } ` )
4493
4594 await Promise . all (
46- this . batchPromises . map ( async ( batchPromise , index ) => {
47- const retryCount = 3
48- let lastError
49-
50- for ( let attempt = 1 ; attempt <= retryCount ; attempt ++ ) {
51- try {
52- await batchPromise . commit ( )
53- return
54- } catch ( error ) {
55- lastError = error
56- console . warn ( `Batch ${ index } attempt ${ attempt } failed:` , error . message )
57-
58- if ( attempt < retryCount ) {
59- // Exponential backoff: 2^attempt seconds
60- const delayMs = Math . pow ( 2 , attempt ) * 1000
61- console . log ( `Retrying batch ${ index } in ${ delayMs } ms...` )
62- await new Promise ( resolve => setTimeout ( resolve , delayMs ) )
63- }
64- }
65- }
66-
67- console . error ( `Batch ${ index } failed after ${ retryCount } attempts:` , lastError )
68- throw lastError
69- } )
95+ this . batchPromises . map ( ( batch , index ) =>
96+ this . commitWithRetry ( batch , index )
97+ )
7098 )
7199
72100 this . batchPromises = [ ]
73101 }
74102
75- async finalFlush ( operation ) {
76- if ( this . currentBatch . length > 0 ) {
103+ async processInBatches ( operation , shouldFlush = false ) {
104+ const batchSize = this . getCurrentBatchSize ( operation )
105+
106+ if ( this . currentBatch . length >= batchSize || shouldFlush ) {
77107 this . queueBatch ( operation )
78108 }
79109
80- if ( this . batchPromises . length > 0 ) {
110+ if ( this . batchPromises . length >= this . config . maxConcurrentBatches || shouldFlush ) {
81111 await this . commitBatches ( )
82112 }
83113 }
84114
115+ buildQuery ( collectionRef ) {
116+ const queryMap = {
117+ report : ( ) => {
118+ console . info ( `Deleting documents from ${ this . collectionName } for date ${ this . date } ` )
119+ return collectionRef . where ( 'date' , '==' , this . date )
120+ } ,
121+ dict : ( ) => {
122+ console . info ( `Deleting documents from ${ this . collectionName } ` )
123+ return collectionRef
124+ }
125+ }
126+
127+ const queryBuilder = queryMap [ this . collectionType ]
128+ if ( ! queryBuilder ) {
129+ throw new Error ( `Invalid collection type: ${ this . collectionType } ` )
130+ }
131+
132+ return queryBuilder ( )
133+ }
134+
85135 async batchDelete ( ) {
86136 console . info ( 'Starting batch deletion...' )
87137 const startTime = Date . now ( )
88- this . currentBatch = [ ]
89- this . batchPromises = [ ]
138+ this . reset ( )
90139
91140 let totalDocsDeleted = 0
92141 const collectionRef = this . firestore . collection ( this . collectionName )
93-
94- let collectionQuery
95- if ( this . collectionType === 'report' ) {
96- console . info ( 'Deleting documents from ' + this . collectionName + ' for date ' + this . date )
97- // Query to fetch monthly documents
98- collectionQuery = collectionRef . where ( 'date' , '==' , this . date )
99- } else if ( this . collectionType === 'dict' ) {
100- console . info ( 'Deleting documents from ' + this . collectionName )
101- collectionQuery = collectionRef
102- } else {
103- throw new Error ( 'Invalid collection type' )
104- }
142+ const collectionQuery = this . buildQuery ( collectionRef )
143+ const batchSize = this . getCurrentBatchSize ( 'delete' )
105144
106145 while ( true ) {
107- const snapshot = await collectionQuery . limit ( this . batchSizeDelete * this . maxConcurrentBatches ) . get ( )
108- if ( snapshot . empty ) {
109- break
110- }
146+ const snapshot = await collectionQuery . limit ( batchSize * this . config . maxConcurrentBatches ) . get ( )
147+ if ( snapshot . empty ) break
111148
112- for await ( const doc of snapshot . docs ) {
149+ for ( const doc of snapshot . docs ) {
113150 this . currentBatch . push ( doc )
114-
115- if ( this . currentBatch . length >= this . batchSize ) {
116- this . queueBatch ( 'delete' )
117- }
118- if ( this . batchPromises . length >= this . maxConcurrentBatches ) {
119- await this . commitBatches ( )
120- }
151+ await this . processInBatches ( 'delete' )
121152 totalDocsDeleted ++
122153 }
123154 }
124- await this . finalFlush ( 'delete' )
155+
156+ // Final flush
157+ await this . processInBatches ( 'delete' , true )
125158
126159 const duration = ( Date . now ( ) - startTime ) / 1000
127160 console . info ( `Deletion complete. Total docs deleted: ${ totalDocsDeleted } . Time: ${ duration } seconds` )
128161 }
129162
130- /**
131- * Streams BigQuery query results into a Firestore collection using batch commits.
132- * @param {string } query - The BigQuery SQL query.
133- */
134163 async streamFromBigQuery ( rowStream ) {
135164 console . info ( 'Starting BigQuery to Firestore transfer...' )
136165 const startTime = Date . now ( )
137166 let totalRowsProcessed = 0
138167
139- this . currentBatch = [ ]
140- this . batchPromises = [ ]
168+ this . reset ( )
141169
142170 for await ( const row of rowStream ) {
143171 this . currentBatch . push ( row )
144-
145- // Write batch when it reaches specified size
146- if ( this . currentBatch . length >= this . batchSize ) {
147- this . queueBatch ( 'set' )
148- }
149-
150- if ( this . batchPromises . length >= this . maxConcurrentBatches ) {
151- await this . commitBatches ( )
152- }
172+ await this . processInBatches ( 'set' )
153173 totalRowsProcessed ++
154174 }
155- await this . finalFlush ( 'set' )
175+
176+ // Final flush
177+ await this . processInBatches ( 'set' , true )
156178
157179 const duration = ( Date . now ( ) - startTime ) / 1000
158180 console . info ( `Transfer to ${ this . collectionName } complete. Total rows processed: ${ totalRowsProcessed } . Time: ${ duration } seconds` )
159181 }
160182
161183 async export ( query , exportConfig ) {
184+ // Configure Firestore settings
162185 this . firestore . settings ( {
163186 databaseId : exportConfig . database ,
164- timeout : 10 * 60 * 1000 // 10 minutes timeout
187+ timeout : this . config . timeout
188+ } )
189+
190+ // Set instance properties
191+ Object . assign ( this , {
192+ collectionName : exportConfig . collection ,
193+ collectionType : exportConfig . type ,
194+ date : exportConfig . date
165195 } )
166- this . collectionName = exportConfig . collection
167- this . collectionType = exportConfig . type
168- this . date = exportConfig . date
169196
170197 await this . batchDelete ( )
171198
0 commit comments