11import { Logger } from "@trigger.dev/core/logger" ;
22import { nanoid } from "nanoid" ;
33import pLimit from "p-limit" ;
4+ import { signalsEmitter } from "~/services/signals.server" ;
45
56export type DynamicFlushSchedulerConfig < T > = {
67 batchSize : number ;
@@ -22,6 +23,7 @@ export class DynamicFlushScheduler<T> {
2223 private readonly BATCH_SIZE : number ;
2324 private readonly FLUSH_INTERVAL : number ;
2425 private flushTimer : NodeJS . Timeout | null ;
26+ private metricsReporterTimer : NodeJS . Timeout | undefined ;
2527 private readonly callback : ( flushId : string , batch : T [ ] ) => Promise < void > ;
2628
2729 // New properties for dynamic scaling
@@ -41,6 +43,7 @@ export class DynamicFlushScheduler<T> {
4143 droppedEvents : 0 ,
4244 droppedEventsByKind : new Map < string , number > ( ) ,
4345 } ;
46+ private isShuttingDown : boolean = false ;
4447
4548 // New properties for load shedding
4649 private readonly loadSheddingThreshold : number ;
@@ -75,6 +78,7 @@ export class DynamicFlushScheduler<T> {
7578
7679 this . startFlushTimer ( ) ;
7780 this . startMetricsReporter ( ) ;
81+ this . setupShutdownHandlers ( ) ;
7882 }
7983
8084 addToBatch ( items : T [ ] ) : void {
@@ -119,8 +123,8 @@ export class DynamicFlushScheduler<T> {
119123 this . currentBatch . push ( ...itemsToAdd ) ;
120124 this . totalQueuedItems += itemsToAdd . length ;
121125
122- // Check if we need to create a batch
123- if ( this . currentBatch . length >= this . currentBatchSize ) {
126+ // Check if we need to create a batch (if we are shutting down, create a batch immediately because the flush timer is stopped)
127+ if ( this . currentBatch . length >= this . currentBatchSize || this . isShuttingDown ) {
124128 this . createBatch ( ) ;
125129 }
126130
@@ -137,6 +141,23 @@ export class DynamicFlushScheduler<T> {
137141 this . resetFlushTimer ( ) ;
138142 }
139143
144+ private setupShutdownHandlers ( ) : void {
145+ signalsEmitter . on ( "SIGTERM" , ( ) =>
146+ this . shutdown ( ) . catch ( ( error ) => {
147+ this . logger . error ( "Error shutting down dynamic flush scheduler" , {
148+ error,
149+ } ) ;
150+ } )
151+ ) ;
152+ signalsEmitter . on ( "SIGINT" , ( ) =>
153+ this . shutdown ( ) . catch ( ( error ) => {
154+ this . logger . error ( "Error shutting down dynamic flush scheduler" , {
155+ error,
156+ } ) ;
157+ } )
158+ ) ;
159+ }
160+
140161 private startFlushTimer ( ) : void {
141162 this . flushTimer = setInterval ( ( ) => this . checkAndFlush ( ) , this . FLUSH_INTERVAL ) ;
142163 }
@@ -145,6 +166,9 @@ export class DynamicFlushScheduler<T> {
145166 if ( this . flushTimer ) {
146167 clearInterval ( this . flushTimer ) ;
147168 }
169+
170+ if ( this . isShuttingDown ) return ;
171+
148172 this . startFlushTimer ( ) ;
149173 }
150174
@@ -226,7 +250,7 @@ export class DynamicFlushScheduler<T> {
226250 }
227251
228252 private lastConcurrencyAdjustment : number = Date . now ( ) ;
229-
253+
230254 private adjustConcurrency ( backOff : boolean = false ) : void {
231255 const currentConcurrency = this . limiter . concurrency ;
232256 let newConcurrency = currentConcurrency ;
@@ -281,7 +305,7 @@ export class DynamicFlushScheduler<T> {
281305
282306 private startMetricsReporter ( ) : void {
283307 // Report metrics every 30 seconds
284- setInterval ( ( ) => {
308+ this . metricsReporterTimer = setInterval ( ( ) => {
285309 const droppedByKind : Record < string , number > = { } ;
286310 this . metrics . droppedEventsByKind . forEach ( ( count , kind ) => {
287311 droppedByKind [ kind ] = count ;
@@ -356,10 +380,18 @@ export class DynamicFlushScheduler<T> {
356380
357381 // Graceful shutdown
358382 async shutdown ( ) : Promise < void > {
383+ if ( this . isShuttingDown ) return ;
384+
385+ this . isShuttingDown = true ;
386+
359387 if ( this . flushTimer ) {
360388 clearInterval ( this . flushTimer ) ;
361389 }
362390
391+ if ( this . metricsReporterTimer ) {
392+ clearInterval ( this . metricsReporterTimer ) ;
393+ }
394+
363395 // Flush any remaining items
364396 if ( this . currentBatch . length > 0 ) {
365397 this . createBatch ( ) ;
0 commit comments