Skip to content

Commit 88ecda1

Browse files
max-ostapenkoGCP Dataform
authored andcommitted
Merge branch 'smart-otter' into smart-otter
2 parents 95143fd + 09c168a commit 88ecda1

7 files changed

Lines changed: 173 additions & 104 deletions

File tree

definitions/output/reports/tech_report_categories.js

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ technology_stats AS (
7373
category_obj AS categories,
7474
origins.mobile AS mobile_origins
7575
FROM ${ctx.ref('reports', 'tech_report_technologies')}
76-
GROUP BY
77-
technology,
78-
categories
7976
)
8077
8178
SELECT

infra/bigquery-export/firestore.js

Lines changed: 121 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,67 @@ import { BigQueryExport } from './bigquery.js'
33

44
export class FirestoreBatch {
55
constructor () {
6-
this.firestore = new Firestore()
6+
this.firestore = new Firestore({
7+
gaxOptions: {
8+
grpc: {
9+
max_receive_message_length: 500 * 1024 * 1024, // 500MB
10+
max_send_message_length: 500 * 1024 * 1024, // 500MB
11+
'grpc.max_connection_idle_ms': 5 * 60 * 1000, // 5 minutes
12+
'grpc.keepalive_time_ms': 30 * 1000, // 30 seconds
13+
'grpc.keepalive_timeout_ms': 60 * 1000, // 1 minute
14+
'grpc.keepalive_permit_without_calls': true
15+
}
16+
}
17+
})
718
this.bigquery = new BigQueryExport()
8-
this.batchSize = 500
9-
this.maxConcurrentBatches = 200
19+
20+
// Configuration constants
21+
this.config = {
22+
batchSize: {
23+
delete: 500,
24+
write: 400
25+
},
26+
maxConcurrentBatches: 200,
27+
retryCount: 5,
28+
timeout: 10 * 60 * 1000 // 10 minutes
29+
}
30+
31+
this.reset()
1032
}
1133

12-
queueBatch (operation) {
34+
reset () {
35+
this.currentBatch = []
36+
this.batchPromises = []
37+
}
38+
39+
getCurrentBatchSize (operation) {
40+
return this.config.batchSize[operation === 'delete' ? 'delete' : 'write']
41+
}
42+
43+
async commitWithRetry (batch, index) {
44+
let lastError
45+
46+
for (let attempt = 1; attempt <= this.config.retryCount; attempt++) {
47+
try {
48+
await batch.commit()
49+
return
50+
} catch (error) {
51+
lastError = error
52+
console.warn(`Batch ${index} attempt ${attempt} failed:`, error.message)
53+
54+
if (attempt < this.config.retryCount) {
55+
const delayMs = Math.pow(2, attempt) * 500
56+
console.log(`Retrying batch ${index} in ${delayMs}ms...`)
57+
await new Promise(resolve => setTimeout(resolve, delayMs))
58+
}
59+
}
60+
}
61+
62+
console.error(`Batch ${index} failed after ${this.config.retryCount} attempts:`, lastError)
63+
throw lastError
64+
}
65+
66+
createBatch (operation) {
1367
const batch = this.firestore.batch()
1468

1569
this.currentBatch.forEach((doc) => {
@@ -19,119 +73,126 @@ export class FirestoreBatch {
1973
const docRef = this.firestore.collection(this.collectionName).doc()
2074
batch.set(docRef, doc)
2175
} else {
22-
throw new Error('Invalid operation')
76+
throw new Error(`Invalid operation: ${operation}`)
2377
}
2478
})
79+
80+
return batch
81+
}
82+
83+
queueBatch (operation) {
84+
const batch = this.createBatch(operation)
2585
this.batchPromises.push(batch)
2686
this.currentBatch = []
2787
}
2888

2989
async commitBatches () {
90+
if (this.batchPromises.length === 0) return
91+
3092
console.log(`Committing ${this.batchPromises.length} batches to ${this.collectionName}`)
93+
3194
await Promise.all(
32-
this.batchPromises.map(async (batchPromise) => await batchPromise.commit()
33-
.catch((error) => {
34-
console.error('Error committing batch:', error)
35-
throw error
36-
})
95+
this.batchPromises.map((batch, index) =>
96+
this.commitWithRetry(batch, index)
3797
)
3898
)
99+
39100
this.batchPromises = []
40101
}
41102

42-
async finalFlush (operation) {
43-
if (this.currentBatch.length > 0) {
103+
async processInBatches (operation, shouldFlush = false) {
104+
const batchSize = this.getCurrentBatchSize(operation)
105+
106+
if (this.currentBatch.length >= batchSize || shouldFlush) {
44107
this.queueBatch(operation)
45108
}
46109

47-
if (this.batchPromises.length > 0) {
110+
if (this.batchPromises.length >= this.config.maxConcurrentBatches || shouldFlush) {
48111
await this.commitBatches()
49112
}
50113
}
51114

115+
buildQuery (collectionRef) {
116+
const queryMap = {
117+
report: () => {
118+
console.info(`Deleting documents from ${this.collectionName} for date ${this.date}`)
119+
return collectionRef.where('date', '==', this.date)
120+
},
121+
dict: () => {
122+
console.info(`Deleting documents from ${this.collectionName}`)
123+
return collectionRef
124+
}
125+
}
126+
127+
const queryBuilder = queryMap[this.collectionType]
128+
if (!queryBuilder) {
129+
throw new Error(`Invalid collection type: ${this.collectionType}`)
130+
}
131+
132+
return queryBuilder()
133+
}
134+
52135
async batchDelete () {
53136
console.info('Starting batch deletion...')
54137
const startTime = Date.now()
55-
this.currentBatch = []
56-
this.batchPromises = []
138+
this.reset()
57139

58140
let totalDocsDeleted = 0
59141
const collectionRef = this.firestore.collection(this.collectionName)
60-
61-
let collectionQuery
62-
if (this.collectionType === 'report') {
63-
console.info('Deleting documents from ' + this.collectionName + ' for date ' + this.date)
64-
// Query to fetch monthly documents
65-
collectionQuery = collectionRef.where('date', '==', this.date)
66-
} else if (this.collectionType === 'dict') {
67-
console.info('Deleting documents from ' + this.collectionName)
68-
collectionQuery = collectionRef
69-
} else {
70-
throw new Error('Invalid collection type')
71-
}
142+
const collectionQuery = this.buildQuery(collectionRef)
143+
const batchSize = this.getCurrentBatchSize('delete')
72144

73145
while (true) {
74-
const snapshot = await collectionQuery.limit(this.batchSize * this.maxConcurrentBatches).get()
75-
if (snapshot.empty) {
76-
break
77-
}
146+
const snapshot = await collectionQuery.limit(batchSize * this.config.maxConcurrentBatches).get()
147+
if (snapshot.empty) break
78148

79-
for await (const doc of snapshot.docs) {
149+
for (const doc of snapshot.docs) {
80150
this.currentBatch.push(doc)
81-
82-
if (this.currentBatch.length >= this.batchSize) {
83-
this.queueBatch('delete')
84-
}
85-
if (this.batchPromises.length >= this.maxConcurrentBatches) {
86-
await this.commitBatches()
87-
}
151+
await this.processInBatches('delete')
88152
totalDocsDeleted++
89153
}
90154
}
91-
await this.finalFlush('delete')
155+
156+
// Final flush
157+
await this.processInBatches('delete', true)
92158

93159
const duration = (Date.now() - startTime) / 1000
94160
console.info(`Deletion complete. Total docs deleted: ${totalDocsDeleted}. Time: ${duration} seconds`)
95161
}
96162

97-
/**
98-
* Streams BigQuery query results into a Firestore collection using batch commits.
99-
* @param {string} query - The BigQuery SQL query.
100-
*/
101163
async streamFromBigQuery (rowStream) {
102164
console.info('Starting BigQuery to Firestore transfer...')
103165
const startTime = Date.now()
104166
let totalRowsProcessed = 0
105167

106-
this.currentBatch = []
107-
this.batchPromises = []
168+
this.reset()
108169

109170
for await (const row of rowStream) {
110171
this.currentBatch.push(row)
111-
112-
// Write batch when it reaches specified size
113-
if (this.currentBatch.length >= this.batchSize) {
114-
this.queueBatch('set')
115-
}
116-
117-
if (this.batchPromises.length >= this.maxConcurrentBatches) {
118-
await this.commitBatches()
119-
}
172+
await this.processInBatches('set')
120173
totalRowsProcessed++
121174
}
122-
await this.finalFlush('set')
175+
176+
// Final flush
177+
await this.processInBatches('set', true)
123178

124179
const duration = (Date.now() - startTime) / 1000
125180
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${totalRowsProcessed}. Time: ${duration} seconds`)
126181
}
127182

128183
async export (query, exportConfig) {
184+
// Configure Firestore settings
129185
this.firestore.settings({
130-
databaseId: exportConfig.database
186+
databaseId: exportConfig.database,
187+
timeout: this.config.timeout
188+
})
189+
190+
// Set instance properties
191+
Object.assign(this, {
192+
collectionName: exportConfig.collection,
193+
collectionType: exportConfig.type,
194+
date: exportConfig.date
131195
})
132-
this.collectionName = exportConfig.collection
133-
this.collectionType = exportConfig.type
134-
this.date = exportConfig.date
135196

136197
await this.batchDelete()
137198

infra/tf/.terraform.lock.hcl

Lines changed: 26 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

infra/tf/bigquery_export/main.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ resource "google_cloud_run_v2_job" "bigquery_export" {
3535
value = ""
3636
}
3737
}
38-
timeout = "3600s"
38+
timeout = "7200s"
3939
service_account = var.function_identity
4040
max_retries = 1
4141
}

infra/tf/functions/main.tf

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ terraform {
99
}
1010
}
1111

12-
resource "google_project_iam_member" "project" {
12+
resource "google_project_iam_member" "function_identity" {
1313
for_each = toset(["roles/bigquery.jobUser", "roles/dataform.serviceAgent", "roles/run.invoker", "roles/run.jobsExecutorWithOverrides", "roles/datastore.user", "roles/storage.objectUser"])
1414

1515
project = var.project
@@ -31,8 +31,10 @@ resource "google_bigquery_connection" "remote-functions" {
3131
cloud_resource {}
3232
}
3333

34-
resource "google_project_iam_member" "bigquery-remote-functions-connector" {
34+
resource "google_project_iam_member" "bigquery-connection-remote-functions" {
35+
for_each = toset(["roles/run.invoker"])
36+
3537
project = var.project
36-
role = "roles/run.invoker"
38+
role = each.value
3739
member = "serviceAccount:${google_bigquery_connection.remote-functions.cloud_resource[0].service_account_id}"
3840
}

infra/tf/functions/output.tf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
output "google_bigquery_connection-remote_functions-id" {
2+
description = "The connection ID for the remote functions BigQuery connection."
3+
value = google_bigquery_connection.remote-functions.id
4+
}
5+
6+
output "remote_functions_connection_service_account_id" {
7+
description = "The service account ID associated with the remote functions BigQuery connection."
8+
value = google_bigquery_connection.remote-functions.cloud_resource[0].service_account_id
9+
}

0 commit comments

Comments
 (0)