@@ -64,6 +64,10 @@ export type ClickhouseEventRepositoryConfig = {
6464 clickhouse : ClickHouse ;
6565 batchSize ?: number ;
6666 flushInterval ?: number ;
67+ insertStrategy ?: "insert" | "insert_async" ;
68+ waitForAsyncInsert ?: boolean ;
69+ asyncInsertMaxDataSize ?: number ;
70+ asyncInsertBusyTimeoutMs ?: number ;
6771 tracer ?: Tracer ;
6872 maximumTraceSummaryViewCount ?: number ;
6973 maximumTraceDetailedSummaryViewCount ?: number ;
@@ -119,7 +123,11 @@ export class ClickhouseEventRepository implements IEventRepository {
119123 } ) ;
120124 }
121125
122- const [ insertError , insertResult ] = await this . _clickhouse . taskEvents . insert ( events ) ;
126+ const [ insertError , insertResult ] = await this . _clickhouse . taskEvents . insert ( events , {
127+ params : {
128+ clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
129+ } ,
130+ } ) ;
123131
124132 if ( insertError ) {
125133 throw insertError ;
@@ -134,6 +142,19 @@ export class ClickhouseEventRepository implements IEventRepository {
134142 } ) ;
135143 }
136144
145+ #getClickhouseInsertSettings( ) {
146+ if ( this . _config . insertStrategy === "insert" ) {
147+ return { } ;
148+ } else {
149+ return {
150+ async_insert : 1 as const ,
151+ async_insert_max_data_size : this . _config . asyncInsertMaxDataSize ?. toString ( ) ?? "10485760" ,
152+ async_insert_busy_timeout_ms : this . _config . asyncInsertBusyTimeoutMs ?? 5000 ,
153+ wait_for_async_insert : this . _config . waitForAsyncInsert ? ( 1 as const ) : ( 0 as const ) ,
154+ } ;
155+ }
156+ }
157+
137158 async #publishToRedis( events : TaskEventV1Input [ ] ) {
138159 if ( events . length === 0 ) return ;
139160 await tracePubSub . publish ( events . map ( ( e ) => e . trace_id ) ) ;
0 commit comments