@@ -550,7 +550,7 @@ export class RunQueue {
550550 public async acknowledgeMessage (
551551 orgId : string ,
552552 messageId : string ,
553- options ?: { skipDequeueProcessing ?: boolean }
553+ options ?: { skipDequeueProcessing ?: boolean ; removeFromWorkerQueue ?: boolean }
554554 ) {
555555 return this . #trace(
556556 "acknowledgeMessage" ,
@@ -586,6 +586,7 @@ export class RunQueue {
586586
587587 await this . #callAcknowledgeMessage( {
588588 message,
589+ removeFromWorkerQueue : options ?. removeFromWorkerQueue ,
589590 } ) ;
590591 } ,
591592 {
@@ -836,6 +837,14 @@ export class RunQueue {
836837 await this . redis . quit ( ) ;
837838 }
838839
840+ /**
841+ * Peek all messages on a worker queue (useful for tests or debugging)
842+ */
843+ async peekAllOnWorkerQueue ( workerQueue : string ) {
844+ const workerQueueKey = this . keys . workerQueueKey ( workerQueue ) ;
845+ return await this . redis . lrange ( workerQueueKey , 0 , - 1 ) ;
846+ }
847+
839848 private async handleRedriveMessage ( channel : string , message : string ) {
840849 try {
841850 const { runId, envId, projectId, orgId } = JSON . parse ( message ) as any ;
@@ -1430,7 +1439,13 @@ export class RunQueue {
14301439 } ;
14311440 }
14321441
1433- async #callAcknowledgeMessage( { message } : { message : OutputPayload } ) {
1442+ async #callAcknowledgeMessage( {
1443+ message,
1444+ removeFromWorkerQueue,
1445+ } : {
1446+ message : OutputPayload ;
1447+ removeFromWorkerQueue ?: boolean ;
1448+ } ) {
14341449 const messageId = message . runId ;
14351450 const messageKey = this . keys . messageKey ( message . orgId , messageId ) ;
14361451 const messageQueue = message . queue ;
@@ -1441,6 +1456,9 @@ export class RunQueue {
14411456 message . environmentId ,
14421457 this . shardCount
14431458 ) ;
1459+ const workerQueue = this . #getWorkerQueueFromMessage( message ) ;
1460+ const workerQueueKey = this . keys . workerQueueKey ( workerQueue ) ;
1461+ const messageKeyValue = this . keys . messageKey ( message . orgId , messageId ) ;
14441462
14451463 this . logger . debug ( "Calling acknowledgeMessage" , {
14461464 messageKey,
@@ -1450,6 +1468,10 @@ export class RunQueue {
14501468 envQueueKey,
14511469 messageId,
14521470 masterQueueKey,
1471+ workerQueue,
1472+ workerQueueKey,
1473+ removeFromWorkerQueue,
1474+ messageKeyValue,
14531475 service : this . name ,
14541476 } ) ;
14551477
@@ -1460,8 +1482,11 @@ export class RunQueue {
14601482 queueCurrentConcurrencyKey ,
14611483 envCurrentConcurrencyKey ,
14621484 envQueueKey ,
1485+ workerQueueKey ,
14631486 messageId ,
1464- messageQueue
1487+ messageQueue ,
1488+ messageKeyValue ,
1489+ removeFromWorkerQueue ? "1" : "0"
14651490 ) ;
14661491 }
14671492
@@ -1767,7 +1792,7 @@ return results
17671792 } ) ;
17681793
17691794 this . redis . defineCommand ( "acknowledgeMessage" , {
1770- numberOfKeys : 6 ,
1795+ numberOfKeys : 7 ,
17711796 lua : `
17721797-- Keys:
17731798local masterQueueKey = KEYS[1]
@@ -1776,10 +1801,13 @@ local messageQueueKey = KEYS[3]
17761801local queueCurrentConcurrencyKey = KEYS[4]
17771802local envCurrentConcurrencyKey = KEYS[5]
17781803local envQueueKey = KEYS[6]
1804+ local workerQueueKey = KEYS[7]
17791805
17801806-- Args:
17811807local messageId = ARGV[1]
17821808local messageQueueName = ARGV[2]
1809+ local messageKeyValue = ARGV[3]
1810+ local removeFromWorkerQueue = ARGV[4]
17831811
17841812-- Remove the message from the message key
17851813redis.call('DEL', messageKey)
@@ -1799,6 +1827,11 @@ end
17991827-- Update the concurrency keys
18001828redis.call('SREM', queueCurrentConcurrencyKey, messageId)
18011829redis.call('SREM', envCurrentConcurrencyKey, messageId)
1830+
1831+ -- Remove the message from the worker queue
1832+ if removeFromWorkerQueue == '1' then
1833+ redis.call('LREM', workerQueueKey, 0, messageKeyValue)
1834+ end
18021835` ,
18031836 } ) ;
18041837
@@ -2025,8 +2058,11 @@ declare module "@internal/redis" {
20252058 concurrencyKey : string ,
20262059 envConcurrencyKey : string ,
20272060 envQueueKey : string ,
2061+ workerQueueKey : string ,
20282062 messageId : string ,
20292063 messageQueueName : string ,
2064+ messageKeyValue : string ,
2065+ removeFromWorkerQueue : string ,
20302066 callback ?: Callback < void >
20312067 ) : Result < void , Context > ;
20322068
0 commit comments