@@ -259,6 +259,59 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
259259 } ) ;
260260 }
261261
262+ public async getReleaseQueueMetrics ( releaseQueueDescriptor : T ) {
263+ const releaseQueue = this . keys . fromDescriptor ( releaseQueueDescriptor ) ;
264+ const currentTokensRaw = await this . redis . get ( this . #bucketKey( releaseQueue ) ) ;
265+ const queueLength = await this . redis . zcard ( this . #queueKey( releaseQueue ) ) ;
266+
267+ const currentTokens = currentTokensRaw ? Number ( currentTokensRaw ) : undefined ;
268+
269+ return { currentTokens, queueLength } ;
270+ }
271+
272+ /**
273+ * Refill a token only if the releaserId is not in the release queue.
274+ * Returns true if the token was refilled, false if the releaserId was found in the queue.
275+ */
276+ public async refillTokenIfNotInQueue (
277+ releaseQueueDescriptor : T ,
278+ releaserId : string
279+ ) : Promise < boolean > {
280+ const maxTokens = await this . #callMaxTokens( releaseQueueDescriptor ) ;
281+ const releaseQueue = this . keys . fromDescriptor ( releaseQueueDescriptor ) ;
282+
283+ if ( maxTokens === 0 ) {
284+ this . logger . debug ( "No tokens available, skipping refill" , {
285+ releaseQueueDescriptor,
286+ releaserId,
287+ maxTokens,
288+ releaseQueue,
289+ } ) ;
290+
291+ return false ;
292+ }
293+
294+ const result = await this . redis . refillTokenIfNotInQueue (
295+ this . masterQueuesKey ,
296+ this . #bucketKey( releaseQueue ) ,
297+ this . #queueKey( releaseQueue ) ,
298+ this . #metadataKey( releaseQueue ) ,
299+ releaseQueue ,
300+ releaserId ,
301+ String ( maxTokens )
302+ ) ;
303+
304+ this . logger . debug ( "Attempted to refill token if not in queue" , {
305+ releaseQueueDescriptor,
306+ releaserId,
307+ maxTokens,
308+ releaseQueue,
309+ result,
310+ } ) ;
311+
312+ return result === "true" ;
313+ }
314+
262315 /**
263316 * Get the next queue that has available capacity and process one item from it
264317 * Returns true if an item was processed, false if no items were available
783836return true
784837 ` ,
785838 } ) ;
839+
840+ this . redis . defineCommand ( "refillTokenIfNotInQueue" , {
841+ numberOfKeys : 4 ,
842+ lua : `
843+ local masterQueuesKey = KEYS[1]
844+ local bucketKey = KEYS[2]
845+ local queueKey = KEYS[3]
846+ local metadataKey = KEYS[4]
847+
848+ local releaseQueue = ARGV[1]
849+ local releaserId = ARGV[2]
850+ local maxTokens = tonumber(ARGV[3])
851+
852+ -- Check if the releaserId is in the queue
853+ local score = redis.call("ZSCORE", queueKey, releaserId)
854+ if score then
855+ -- Item is in queue, don't refill token
856+ return redis.status_reply("false")
857+ end
858+
859+ -- Return the token to the bucket
860+ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
861+ local remainingTokens = currentTokens + 1
862+
863+ -- Don't exceed maxTokens
864+ if remainingTokens > maxTokens then
865+ remainingTokens = maxTokens
866+ end
867+
868+ redis.call("SET", bucketKey, remainingTokens)
869+
870+ -- Clean up any metadata just in case
871+ redis.call("HDEL", metadataKey, releaserId)
872+
873+ -- Update the master queue based on remaining queue length
874+ local queueLength = redis.call("ZCARD", queueKey)
875+ if queueLength > 0 then
876+ redis.call("ZADD", masterQueuesKey, remainingTokens, releaseQueue)
877+ else
878+ redis.call("ZREM", masterQueuesKey, releaseQueue)
879+ end
880+
881+ return redis.status_reply("true")
882+ ` ,
883+ } ) ;
786884 }
787885}
788886
@@ -839,6 +937,17 @@ declare module "@internal/redis" {
839937 releaserId : string ,
840938 callback ?: Callback < void >
841939 ) : Result < void , Context > ;
940+
941+ refillTokenIfNotInQueue (
942+ masterQueuesKey : string ,
943+ bucketKey : string ,
944+ queueKey : string ,
945+ metadataKey : string ,
946+ releaseQueue : string ,
947+ releaserId : string ,
948+ maxTokens : string ,
949+ callback ?: Callback < string >
950+ ) : Result < string , Context > ;
842951 }
843952}
844953
0 commit comments