@@ -290,4 +290,115 @@ describe("RunEngine delays", () => {
290290 engine . quit ( ) ;
291291 }
292292 } ) ;
293+
294+ containerTest ( "Cancelling a delayed run" , async ( { prisma, redisOptions } ) => {
295+ //create environment
296+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
297+
298+ const engine = new RunEngine ( {
299+ prisma,
300+ worker : {
301+ redis : redisOptions ,
302+ workers : 1 ,
303+ tasksPerWorker : 10 ,
304+ pollIntervalMs : 100 ,
305+ } ,
306+ queue : {
307+ redis : redisOptions ,
308+ } ,
309+ runLock : {
310+ redis : redisOptions ,
311+ } ,
312+ machines : {
313+ defaultMachine : "small-1x" ,
314+ machines : {
315+ "small-1x" : {
316+ name : "small-1x" as const ,
317+ cpu : 0.5 ,
318+ memory : 0.5 ,
319+ centsPerMs : 0.0001 ,
320+ } ,
321+ } ,
322+ baseCostInCents : 0.0001 ,
323+ } ,
324+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
325+ } ) ;
326+
327+ try {
328+ const taskIdentifier = "test-task" ;
329+
330+ //create background worker
331+ const backgroundWorker = await setupBackgroundWorker (
332+ engine ,
333+ authenticatedEnvironment ,
334+ taskIdentifier
335+ ) ;
336+
337+ //trigger the run with a 1 second delay
338+ const run = await engine . trigger (
339+ {
340+ number : 1 ,
341+ friendlyId : "run_1234" ,
342+ environment : authenticatedEnvironment ,
343+ taskIdentifier,
344+ payload : "{}" ,
345+ payloadType : "application/json" ,
346+ context : { } ,
347+ traceContext : { } ,
348+ traceId : "t12345" ,
349+ spanId : "s12345" ,
350+ masterQueue : "main" ,
351+ queue : "task/test-task" ,
352+ isTest : false ,
353+ tags : [ ] ,
354+ delayUntil : new Date ( Date . now ( ) + 1000 ) ,
355+ } ,
356+ prisma
357+ ) ;
358+
359+ //verify it's created but not queued
360+ const executionData = await engine . getRunExecutionData ( { runId : run . id } ) ;
361+ assertNonNullable ( executionData ) ;
362+ expect ( executionData . snapshot . executionStatus ) . toBe ( "RUN_CREATED" ) ;
363+ expect ( run . status ) . toBe ( "DELAYED" ) ;
364+
365+ //cancel the run
366+ await engine . cancelRun ( {
367+ runId : run . id ,
368+ reason : "Cancelled by test" ,
369+ } ) ;
370+
371+ //verify it's cancelled
372+ const executionData2 = await engine . getRunExecutionData ( { runId : run . id } ) ;
373+ assertNonNullable ( executionData2 ) ;
374+ expect ( executionData2 . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
375+ expect ( executionData2 . run . status ) . toBe ( "CANCELED" ) ;
376+
377+ //wait past the original delay time
378+ await setTimeout ( 1500 ) ;
379+
380+ //verify the run is still cancelled
381+ const executionData3 = await engine . getRunExecutionData ( { runId : run . id } ) ;
382+ assertNonNullable ( executionData3 ) ;
383+ expect ( executionData3 . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
384+ expect ( executionData3 . run . status ) . toBe ( "CANCELED" ) ;
385+
386+ //attempt to dequeue - should get nothing
387+ const dequeued = await engine . dequeueFromMasterQueue ( {
388+ consumerId : "test_12345" ,
389+ masterQueue : run . masterQueue ,
390+ maxRunCount : 10 ,
391+ } ) ;
392+
393+ expect ( dequeued . length ) . toBe ( 0 ) ;
394+
395+ //verify final state is still cancelled
396+ const executionData4 = await engine . getRunExecutionData ( { runId : run . id } ) ;
397+ assertNonNullable ( executionData4 ) ;
398+ expect ( executionData4 . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
399+ expect ( executionData4 . run . status ) . toBe ( "CANCELED" ) ;
400+ } finally {
401+ engine . quit ( ) ;
402+ }
403+ } ) ;
293404} ) ;
0 commit comments