diff --git a/backend/analytics/jobs/analyticsAggregationJob.ts b/backend/analytics/jobs/analyticsAggregationJob.ts new file mode 100644 index 00000000..781cbe0c --- /dev/null +++ b/backend/analytics/jobs/analyticsAggregationJob.ts @@ -0,0 +1,39 @@ +/** + * Analytics aggregation background jobs. + * + * Heavy aggregation and reporting — low priority. + */ + +import type { PriorityClass, PriorityQueue, QueueJob, WeightedFairQueue } from '../../shared/queue'; + +export const ANALYTICS_JOB_PRIORITY: PriorityClass = 'low'; + +export const ANALYTICS_JOB_NAMES = { + AGGREGATE_DAILY: 'analytics:aggregate-daily', + AGGREGATE_HOURLY: 'analytics:aggregate-hourly', + MAINTENANCE_CLEANUP: 'analytics:maintenance-cleanup', +} as const; + +export type AnalyticsJobName = (typeof ANALYTICS_JOB_NAMES)[keyof typeof ANALYTICS_JOB_NAMES]; + +export interface AnalyticsAggregationPayload { + reportType: string; + merchantId?: string; + dateRange: { start: string; end: string }; +} + +export async function enqueueAnalyticsAggregation( + queue: PriorityQueue, + payload: AnalyticsAggregationPayload, +): Promise> { + return queue.add(ANALYTICS_JOB_NAMES.AGGREGATE_DAILY, payload); +} + +export async function enqueueMaintenanceCleanup( + scheduler: WeightedFairQueue, + olderThanDays: number, +): Promise> { + return scheduler.spawnSubJob(ANALYTICS_JOB_PRIORITY, ANALYTICS_JOB_NAMES.MAINTENANCE_CLEANUP, { + olderThanDays, + }); +} diff --git a/backend/analytics/jobs/mvRefreshJob.ts b/backend/analytics/jobs/mvRefreshJob.ts index e6a35428..58602ca0 100644 --- a/backend/analytics/jobs/mvRefreshJob.ts +++ b/backend/analytics/jobs/mvRefreshJob.ts @@ -6,9 +6,15 @@ * * Runs on a configurable interval (default 60 s for real-time views). * Exposes a Prometheus-style metric for view freshness monitoring. + * + * Queue priority: low (analytics / maintenance class). */ import { QueryClient } from '../../../backend/shared/query/queryRouter'; +import type { PriorityClass } from '../../shared/queue'; + +/** Background queue priority for MV refresh work. */ +export const MV_REFRESH_JOB_PRIORITY: PriorityClass = 'low'; // ── View definitions ────────────────────────────────────────────────────────── diff --git a/backend/billing/jobs/__tests__/billingJobQueue.test.ts b/backend/billing/jobs/__tests__/billingJobQueue.test.ts new file mode 100644 index 00000000..fafcb164 --- /dev/null +++ b/backend/billing/jobs/__tests__/billingJobQueue.test.ts @@ -0,0 +1,75 @@ +import { BillingJobQueue } from '../billingJobQueue'; +import { + PAYMENT_JOB_NAMES, + type PaymentConfirmationPayload, +} from '../paymentConfirmationJob'; +import type { QueueJob } from '../../../shared/queue'; + +describe('BillingJobQueue', () => { + it('enqueues and processes payment confirmation end-to-end', async () => { + const sent: string[] = []; + const queue = new BillingJobQueue({ + connection: { host: 'localhost', port: 6379 }, + paymentHandlers: { + [PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: async (job: QueueJob) => { + sent.push(job.data.transactionId); + }, + }, + }); + + await queue.schedulePaymentConfirmation({ + subscriptionId: 'sub_1', + subscriberId: 'user_1', + merchantId: 'merch_1', + amount: 29.99, + currency: 'USD', + transactionId: 'tx_abc123', + }); + + const processed = await queue.processNext(); + expect(processed).toBe(true); + expect(sent).toEqual(['tx_abc123']); + expect(queue.scheduler.getStats().critical.totalProcessed).toBe(1); + + await queue.close(); + }); + + it('processes critical payment before low analytics under load', async () => { + const order: string[] = []; + const queue = new BillingJobQueue({ + connection: { host: 'localhost', port: 6379 }, + paymentHandlers: { + [PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: async (job: QueueJob) => { + order.push(`pay:${job.data.transactionId}`); + }, + }, + }); + + const scheduler = queue.scheduler; + + for (let i = 0; i < 5; i++) { + await scheduler.enqueue('low', 'analytics:aggregate', { i }); + } + await queue.schedulePaymentConfirmation({ + subscriptionId: 'sub_1', + subscriberId: 'user_1', + merchantId: 'merch_1', + amount: 10, + currency: 'USD', + transactionId: 'tx_urgent', + }); + + await scheduler.processNext({ + [PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: async (job: QueueJob) => { + order.push(`pay:${job.data.transactionId}`); + }, + 'analytics:aggregate': async (job: QueueJob<{ i: number }>) => { + order.push(`analytics:${job.data.i}`); + }, + }); + + expect(order[0]).toBe('pay:tx_urgent'); + + await queue.close(); + }); +}); diff --git a/backend/billing/jobs/billingJobQueue.ts b/backend/billing/jobs/billingJobQueue.ts new file mode 100644 index 00000000..98d51424 --- /dev/null +++ b/backend/billing/jobs/billingJobQueue.ts @@ -0,0 +1,84 @@ +/** + * Billing job queue — wires payment/dunning jobs to the WFQ scheduler. + */ + +import type { ConnectionOptions } from 'bullmq'; +import { + createJobQueueSystem, + type JobHandlerMap, + type JobQueueSystem, + type QueueJob, + type WeightedFairQueue, +} from '../../shared/queue'; +import { createPaymentJobHandlers } from './paymentJobHandlers'; +import { + PAYMENT_JOB_PRIORITY, + type ChargeSettlementPayload, + type PaymentConfirmationPayload, + enqueueChargeSettlement, + enqueuePaymentConfirmation, +} from './paymentConfirmationJob'; + +export interface BillingJobQueueConfig { + connection: ConnectionOptions; + baseQueueName?: string; + maxQueueSize?: number; + paymentHandlers?: JobHandlerMap; +} + +export class BillingJobQueue { + private readonly system: JobQueueSystem; + private readonly handlers: JobHandlerMap; + + constructor(config: BillingJobQueueConfig) { + this.system = createJobQueueSystem({ + connection: config.connection, + baseQueueName: config.baseQueueName ?? 'subtrackr:billing', + maxQueueSize: config.maxQueueSize, + }); + this.handlers = config.paymentHandlers ?? createPaymentJobHandlers(); + } + + get scheduler(): WeightedFairQueue { + return this.system.scheduler; + } + + /** Enqueue a payment confirmation at critical priority (with auto backpressure). */ + async schedulePaymentConfirmation( + payload: PaymentConfirmationPayload, + ): Promise> { + return this.system.scheduler.enqueue( + PAYMENT_JOB_PRIORITY, + 'payment:confirmation-email', + payload, + ); + } + + /** Enqueue charge settlement at critical priority. */ + async scheduleChargeSettlement( + payload: ChargeSettlementPayload, + ): Promise> { + return this.system.scheduler.enqueue(PAYMENT_JOB_PRIORITY, 'payment:charge-settlement', payload); + } + + /** Process one job from the queue. */ + async processNext(): Promise { + return this.system.scheduler.processNext(this.handlers); + } + + /** Start the background worker loop. */ + start(intervalMs = 50): void { + this.system.scheduler.startProcessing(this.handlers, intervalMs); + } + + stop(): void { + this.system.scheduler.stopProcessing(); + } + + async close(): Promise { + await this.system.scheduler.close(); + } +} + +// Re-export convenience enqueue functions that use WFQ +export { enqueuePaymentConfirmation, enqueueChargeSettlement }; diff --git a/backend/billing/jobs/dunningJob.ts b/backend/billing/jobs/dunningJob.ts new file mode 100644 index 00000000..e67fb03f --- /dev/null +++ b/backend/billing/jobs/dunningJob.ts @@ -0,0 +1,40 @@ +/** + * Dunning background jobs. + * + * Payment recovery and dunning communications — critical priority. + */ + +import type { PriorityClass, PriorityQueue, QueueJob, WeightedFairQueue } from '../../shared/queue'; + +export const DUNNING_JOB_PRIORITY: PriorityClass = 'critical'; + +export const DUNNING_JOB_NAMES = { + SEND_REMINDER: 'dunning:send-reminder', + ESCALATE_STAGE: 'dunning:escalate-stage', + SUSPEND_SUBSCRIPTION: 'dunning:suspend-subscription', +} as const; + +export type DunningJobName = (typeof DUNNING_JOB_NAMES)[keyof typeof DUNNING_JOB_NAMES]; + +export interface DunningReminderPayload { + subscriptionId: string; + subscriberId: string; + merchantId: string; + stage: string; + failedAttempts: number; +} + +export async function enqueueDunningReminder( + queue: PriorityQueue, + payload: DunningReminderPayload, +): Promise> { + return queue.add(DUNNING_JOB_NAMES.SEND_REMINDER, payload); +} + +export async function enqueueDunningEscalation( + scheduler: WeightedFairQueue, + payload: DunningReminderPayload, + parentPriority: PriorityClass = DUNNING_JOB_PRIORITY, +): Promise> { + return scheduler.spawnSubJob(parentPriority, DUNNING_JOB_NAMES.ESCALATE_STAGE, payload); +} diff --git a/backend/billing/jobs/index.ts b/backend/billing/jobs/index.ts new file mode 100644 index 00000000..3257a07c --- /dev/null +++ b/backend/billing/jobs/index.ts @@ -0,0 +1,20 @@ +export { + PAYMENT_JOB_PRIORITY, + PAYMENT_JOB_NAMES, + enqueuePaymentConfirmation, + enqueueChargeSettlement, +} from './paymentConfirmationJob'; +export type { + PaymentConfirmationPayload, + ChargeSettlementPayload, + PaymentJobName, +} from './paymentConfirmationJob'; + +export { DUNNING_JOB_PRIORITY, DUNNING_JOB_NAMES, enqueueDunningReminder, enqueueDunningEscalation } from './dunningJob'; +export type { DunningReminderPayload, DunningJobName } from './dunningJob'; + +export { createPaymentJobHandlers, defaultPaymentConfirmationHandler } from './paymentJobHandlers'; +export type { PaymentConfirmationHandler, PaymentConfirmationResult } from './paymentJobHandlers'; + +export { BillingJobQueue } from './billingJobQueue'; +export type { BillingJobQueueConfig } from './billingJobQueue'; diff --git a/backend/billing/jobs/paymentConfirmationJob.ts b/backend/billing/jobs/paymentConfirmationJob.ts new file mode 100644 index 00000000..c02d2d08 --- /dev/null +++ b/backend/billing/jobs/paymentConfirmationJob.ts @@ -0,0 +1,45 @@ +/** + * Payment confirmation background jobs. + * + * Time-sensitive billing notifications — scheduled at critical priority. + */ + +import type { PriorityClass, PriorityQueue, QueueJob } from '../../shared/queue'; + +export const PAYMENT_JOB_PRIORITY: PriorityClass = 'critical'; + +export const PAYMENT_JOB_NAMES = { + CONFIRMATION_EMAIL: 'payment:confirmation-email', + RECEIPT_GENERATION: 'payment:receipt-generation', + CHARGE_SETTLEMENT: 'payment:charge-settlement', +} as const; + +export type PaymentJobName = (typeof PAYMENT_JOB_NAMES)[keyof typeof PAYMENT_JOB_NAMES]; + +export interface PaymentConfirmationPayload { + subscriptionId: string; + subscriberId: string; + merchantId: string; + amount: number; + currency: string; + transactionId: string; +} + +export interface ChargeSettlementPayload { + batchRunId: string; + subscriptionIds: string[]; +} + +export async function enqueuePaymentConfirmation( + queue: PriorityQueue, + payload: PaymentConfirmationPayload, +): Promise> { + return queue.add(PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL, payload); +} + +export async function enqueueChargeSettlement( + queue: PriorityQueue, + payload: ChargeSettlementPayload, +): Promise> { + return queue.add(PAYMENT_JOB_NAMES.CHARGE_SETTLEMENT, payload); +} diff --git a/backend/billing/jobs/paymentJobHandlers.ts b/backend/billing/jobs/paymentJobHandlers.ts new file mode 100644 index 00000000..7ffc2cd5 --- /dev/null +++ b/backend/billing/jobs/paymentJobHandlers.ts @@ -0,0 +1,57 @@ +/** + * Payment job handlers — executed by the WFQ worker loop. + */ + +import type { JobHandlerMap, QueueJob } from '../../shared/queue'; +import { + PAYMENT_JOB_NAMES, + type ChargeSettlementPayload, + type PaymentConfirmationPayload, +} from './paymentConfirmationJob'; + +export interface PaymentConfirmationResult { + transactionId: string; + sentAt: number; + channel: 'email'; +} + +export type PaymentConfirmationHandler = ( + payload: PaymentConfirmationPayload, +) => Promise; + +/** Default handler — logs confirmation; swap for real email service in production. */ +export const defaultPaymentConfirmationHandler: PaymentConfirmationHandler = async (payload) => { + console.info( + `[PaymentJob] Sending confirmation email for tx ${payload.transactionId} ` + + `(subscriber=${payload.subscriberId}, amount=${payload.amount} ${payload.currency})`, + ); + return { + transactionId: payload.transactionId, + sentAt: Date.now(), + channel: 'email', + }; +}; + +async function handleConfirmationEmail( + job: QueueJob, + sendFn: PaymentConfirmationHandler, +): Promise { + await sendFn(job.data); +} + +async function handleChargeSettlement(job: QueueJob): Promise { + console.info( + `[PaymentJob] Settling batch ${job.data.batchRunId} (${job.data.subscriptionIds.length} subscriptions)`, + ); +} + +export function createPaymentJobHandlers( + sendConfirmation: PaymentConfirmationHandler = defaultPaymentConfirmationHandler, +): JobHandlerMap { + return { + [PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: (job) => + handleConfirmationEmail(job as QueueJob, sendConfirmation), + [PAYMENT_JOB_NAMES.CHARGE_SETTLEMENT]: (job) => + handleChargeSettlement(job as QueueJob), + }; +} diff --git a/backend/monitoring/__tests__/queueMetricsExporter.test.ts b/backend/monitoring/__tests__/queueMetricsExporter.test.ts new file mode 100644 index 00000000..6b0e10b3 --- /dev/null +++ b/backend/monitoring/__tests__/queueMetricsExporter.test.ts @@ -0,0 +1,50 @@ +import { createEmptyStats } from '../../shared/queue'; +import { collectQueueMetrics, formatQueuePrometheus } from '../queueMetricsExporter'; + +describe('queueMetricsExporter', () => { + it('formats per-priority depth, wait, and processing metrics', () => { + const stats = createEmptyStats(); + stats.critical.depth = 3; + stats.critical.totalEnqueued = 10; + stats.critical.totalProcessed = 8; + stats.critical.totalWaitTimeMs = 80_000; + stats.critical.totalProcessingTimeMs = 4_000; + stats.critical.lastWaitTimeMs = 12_000; + stats.critical.lastProcessingTimeMs = 500; + stats.critical.sloViolations = 1; + + stats.low.depth = 50; + stats.low.totalEnqueued = 200; + stats.low.paused = true; + + const output = formatQueuePrometheus( + collectQueueMetrics(stats, ['low']), + ); + + expect(output).toContain('subtrackr_queue_depth{priority="critical"} 3'); + expect(output).toContain('subtrackr_queue_depth{priority="low"} 50'); + expect(output).toContain('subtrackr_queue_enqueued_total{priority="critical"} 10'); + expect(output).toContain('subtrackr_queue_processed_total{priority="critical"} 8'); + expect(output).toContain('subtrackr_queue_wait_time_ms{priority="critical"} 12000'); + expect(output).toContain('subtrackr_queue_avg_wait_time_ms{priority="critical"} 10000'); + expect(output).toContain('subtrackr_queue_slo_violations_total{priority="critical"} 1'); + expect(output).toContain('subtrackr_queue_slo_threshold_ms{priority="critical"} 30000'); + expect(output).toContain('subtrackr_queue_slo_threshold_ms{priority="high"} 120000'); + expect(output).toContain('subtrackr_queue_slo_threshold_ms{priority="normal"} 600000'); + expect(output).toContain('subtrackr_queue_paused{priority="low"} 1'); + }); + + it('reports zero averages when no jobs processed', () => { + const stats = createEmptyStats(); + const output = formatQueuePrometheus(collectQueueMetrics(stats)); + + expect(output).toContain('subtrackr_queue_avg_wait_time_ms{priority="high"} 0'); + expect(output).toContain('subtrackr_queue_avg_processing_time_ms{priority="normal"} 0'); + }); + + it('uses -1 for low priority SLO threshold (no SLO)', () => { + const stats = createEmptyStats(); + const output = formatQueuePrometheus(collectQueueMetrics(stats)); + expect(output).toContain('subtrackr_queue_slo_threshold_ms{priority="low"} -1'); + }); +}); diff --git a/backend/monitoring/queueMetricsExporter.ts b/backend/monitoring/queueMetricsExporter.ts new file mode 100644 index 00000000..6ec60f8f --- /dev/null +++ b/backend/monitoring/queueMetricsExporter.ts @@ -0,0 +1,128 @@ +/** + * Job Queue Prometheus Exporter + * + * Exposes per-priority queue depth, wait time, processing time, + * and SLO violation metrics for Prometheus scraping. + */ + +import type { PriorityClass, PriorityStatsMap } from '../shared/queue'; +import { LATENCY_SLO_MS, PRIORITY_ORDER } from '../shared/queue'; + +export interface QueueMetricsSnapshot { + stats: PriorityStatsMap; + schedulerPaused: PriorityClass[]; +} + +export function collectQueueMetrics( + stats: PriorityStatsMap, + schedulerPaused: PriorityClass[] = [], +): QueueMetricsSnapshot { + return { stats, schedulerPaused }; +} + +function avgWaitMs(stat: PriorityStatsMap[PriorityClass]): number { + return stat.totalProcessed > 0 ? stat.totalWaitTimeMs / stat.totalProcessed : 0; +} + +function avgProcessingMs(stat: PriorityStatsMap[PriorityClass]): number { + return stat.totalProcessed > 0 ? stat.totalProcessingTimeMs / stat.totalProcessed : 0; +} + +/** + * Render Prometheus text format for queue monitoring. + * + * Metrics: + * subtrackr_queue_depth{priority="..."} + * subtrackr_queue_paused{priority="..."} + * subtrackr_queue_enqueued_total{priority="..."} + * subtrackr_queue_processed_total{priority="..."} + * subtrackr_queue_wait_time_ms{priority="..."} + * subtrackr_queue_processing_time_ms{priority="..."} + * subtrackr_queue_avg_wait_time_ms{priority="..."} + * subtrackr_queue_avg_processing_time_ms{priority="..."} + * subtrackr_queue_slo_violations_total{priority="..."} + * subtrackr_queue_slo_threshold_ms{priority="..."} + */ +export function formatQueuePrometheus(snapshot: QueueMetricsSnapshot): string { + const lines: string[] = []; + const { stats } = snapshot; + + lines.push('# HELP subtrackr_queue_depth Current number of jobs waiting per priority class'); + lines.push('# TYPE subtrackr_queue_depth gauge'); + for (const p of PRIORITY_ORDER) { + lines.push(`subtrackr_queue_depth{priority="${p}"} ${stats[p].depth}`); + } + + lines.push('# HELP subtrackr_queue_paused Whether the priority queue is paused (1=yes, 0=no)'); + lines.push('# TYPE subtrackr_queue_paused gauge'); + for (const p of PRIORITY_ORDER) { + const paused = stats[p].paused || snapshot.schedulerPaused.includes(p) ? 1 : 0; + lines.push(`subtrackr_queue_paused{priority="${p}"} ${paused}`); + } + + lines.push('# HELP subtrackr_queue_enqueued_total Total jobs enqueued per priority class'); + lines.push('# TYPE subtrackr_queue_enqueued_total counter'); + for (const p of PRIORITY_ORDER) { + lines.push(`subtrackr_queue_enqueued_total{priority="${p}"} ${stats[p].totalEnqueued}`); + } + + lines.push('# HELP subtrackr_queue_processed_total Total jobs processed per priority class'); + lines.push('# TYPE subtrackr_queue_processed_total counter'); + for (const p of PRIORITY_ORDER) { + lines.push(`subtrackr_queue_processed_total{priority="${p}"} ${stats[p].totalProcessed}`); + } + + lines.push('# HELP subtrackr_queue_wait_time_ms Last job wait time in milliseconds'); + lines.push('# TYPE subtrackr_queue_wait_time_ms gauge'); + for (const p of PRIORITY_ORDER) { + lines.push(`subtrackr_queue_wait_time_ms{priority="${p}"} ${Math.round(stats[p].lastWaitTimeMs)}`); + } + + lines.push('# HELP subtrackr_queue_processing_time_ms Last job processing time in milliseconds'); + lines.push('# TYPE subtrackr_queue_processing_time_ms gauge'); + for (const p of PRIORITY_ORDER) { + lines.push( + `subtrackr_queue_processing_time_ms{priority="${p}"} ${Math.round(stats[p].lastProcessingTimeMs)}`, + ); + } + + lines.push('# HELP subtrackr_queue_avg_wait_time_ms Average wait time per priority class'); + lines.push('# TYPE subtrackr_queue_avg_wait_time_ms gauge'); + for (const p of PRIORITY_ORDER) { + lines.push(`subtrackr_queue_avg_wait_time_ms{priority="${p}"} ${Math.round(avgWaitMs(stats[p]))}`); + } + + lines.push('# HELP subtrackr_queue_avg_processing_time_ms Average processing time per priority class'); + lines.push('# TYPE subtrackr_queue_avg_processing_time_ms gauge'); + for (const p of PRIORITY_ORDER) { + lines.push( + `subtrackr_queue_avg_processing_time_ms{priority="${p}"} ${Math.round(avgProcessingMs(stats[p]))}`, + ); + } + + lines.push('# HELP subtrackr_queue_slo_violations_total Jobs exceeding latency SLO per priority'); + lines.push('# TYPE subtrackr_queue_slo_violations_total counter'); + for (const p of PRIORITY_ORDER) { + lines.push(`subtrackr_queue_slo_violations_total{priority="${p}"} ${stats[p].sloViolations}`); + } + + lines.push('# HELP subtrackr_queue_slo_threshold_ms Latency SLO threshold in milliseconds'); + lines.push('# TYPE subtrackr_queue_slo_threshold_ms gauge'); + for (const p of PRIORITY_ORDER) { + const threshold = Number.isFinite(LATENCY_SLO_MS[p]) ? LATENCY_SLO_MS[p] : -1; + lines.push(`subtrackr_queue_slo_threshold_ms{priority="${p}"} ${threshold}`); + } + + return lines.join('\n'); +} + +export function createQueueMetricsHandler(getSnapshot: () => QueueMetricsSnapshot) { + return function handleQueueMetrics( + _req: unknown, + res: { setHeader(name: string, value: string): void; end(body: string): void }, + ): void { + const body = formatQueuePrometheus(getSnapshot()); + res.setHeader('Content-Type', 'text/plain; version=0.0.4; charset=utf-8'); + res.end(body); + }; +} diff --git a/backend/notification/jobs/notificationDeliveryJob.ts b/backend/notification/jobs/notificationDeliveryJob.ts new file mode 100644 index 00000000..6e407cb5 --- /dev/null +++ b/backend/notification/jobs/notificationDeliveryJob.ts @@ -0,0 +1,49 @@ +/** + * Notification delivery background jobs. + * + * Push, email, and in-app notifications — high priority. + */ + +import type { PriorityClass, PriorityQueue, QueueJob, WeightedFairQueue } from '../../shared/queue'; + +export const NOTIFICATION_JOB_PRIORITY: PriorityClass = 'high'; + +export const NOTIFICATION_JOB_NAMES = { + PUSH: 'notification:push', + EMAIL: 'notification:email', + IN_APP: 'notification:in-app', +} as const; + +export type NotificationJobName = (typeof NOTIFICATION_JOB_NAMES)[keyof typeof NOTIFICATION_JOB_NAMES]; + +export interface NotificationDeliveryPayload { + userId: string; + channel: 'push' | 'email' | 'in_app'; + title: string; + body: string; + metadata?: Record; +} + +export async function enqueueNotification( + queue: PriorityQueue, + payload: NotificationDeliveryPayload, +): Promise> { + const name = + payload.channel === 'push' + ? NOTIFICATION_JOB_NAMES.PUSH + : payload.channel === 'email' + ? NOTIFICATION_JOB_NAMES.EMAIL + : NOTIFICATION_JOB_NAMES.IN_APP; + return queue.add(name, payload); +} + +export async function enqueueNotificationBatch( + scheduler: WeightedFairQueue, + payloads: NotificationDeliveryPayload[], +): Promise[]> { + return Promise.all( + payloads.map((payload) => + scheduler.spawnSubJob(NOTIFICATION_JOB_PRIORITY, NOTIFICATION_JOB_NAMES.PUSH, payload), + ), + ); +} diff --git a/backend/shared/queue/__mocks__/bullmq.ts b/backend/shared/queue/__mocks__/bullmq.ts new file mode 100644 index 00000000..541a2b18 --- /dev/null +++ b/backend/shared/queue/__mocks__/bullmq.ts @@ -0,0 +1,45 @@ +/** Lightweight BullMQ mock for unit tests (avoids Redis connections and open handles). */ + +export type ConnectionOptions = Record; +export type JobsOptions = Record; +export type QueueOptions = { connection: ConnectionOptions }; + +const jobStore = new Map(); + +export class Queue { + private name: string; + + constructor(name: string, _opts: QueueOptions) { + this.name = name; + } + + async add(name: string, data: unknown, opts?: JobsOptions): Promise<{ id: string }> { + const id = (opts?.jobId as string) ?? `mock-${this.name}-${jobStore.size + 1}`; + jobStore.set(id, { name, data }); + return { id }; + } + + async getJob(id: string): Promise<{ remove(): Promise } | undefined> { + if (!jobStore.has(id)) return undefined; + return { + remove: async () => { + jobStore.delete(id); + }, + }; + } + + async getWaitingCount(): Promise { + return jobStore.size; + } + + async pause(): Promise {} + + async resume(): Promise {} + + async close(): Promise {} +} + +/** Reset store between tests. */ +export function __resetMockJobStore(): void { + jobStore.clear(); +} diff --git a/backend/shared/queue/__tests__/jobPriorities.test.ts b/backend/shared/queue/__tests__/jobPriorities.test.ts new file mode 100644 index 00000000..776b5ea1 --- /dev/null +++ b/backend/shared/queue/__tests__/jobPriorities.test.ts @@ -0,0 +1,26 @@ +import { DUNNING_JOB_PRIORITY } from '../../../billing/jobs/dunningJob'; +import { PAYMENT_JOB_PRIORITY } from '../../../billing/jobs/paymentConfirmationJob'; +import { ANALYTICS_JOB_PRIORITY } from '../../../analytics/jobs/analyticsAggregationJob'; +import { MV_REFRESH_JOB_PRIORITY } from '../../../analytics/jobs/mvRefreshJob'; +import { NOTIFICATION_JOB_PRIORITY } from '../../../notification/jobs/notificationDeliveryJob'; +import { WEBHOOK_JOB_PRIORITY } from '../../../webhook/jobs/webhookDeliveryJob'; + +describe('job priority assignments', () => { + it('assigns critical priority to billing jobs', () => { + expect(PAYMENT_JOB_PRIORITY).toBe('critical'); + expect(DUNNING_JOB_PRIORITY).toBe('critical'); + }); + + it('assigns high priority to notification jobs', () => { + expect(NOTIFICATION_JOB_PRIORITY).toBe('high'); + }); + + it('assigns normal priority to webhook jobs', () => { + expect(WEBHOOK_JOB_PRIORITY).toBe('normal'); + }); + + it('assigns low priority to analytics jobs', () => { + expect(ANALYTICS_JOB_PRIORITY).toBe('low'); + expect(MV_REFRESH_JOB_PRIORITY).toBe('low'); + }); +}); diff --git a/backend/shared/queue/__tests__/priorityQueue.test.ts b/backend/shared/queue/__tests__/priorityQueue.test.ts new file mode 100644 index 00000000..f312eb1e --- /dev/null +++ b/backend/shared/queue/__tests__/priorityQueue.test.ts @@ -0,0 +1,182 @@ +import type { JobsOptions } from 'bullmq'; +import { PriorityQueue } from '../priorityQueue'; +import type { PriorityClass } from '../types'; + +function makeMockConnection() { + return {} as never; +} + +function makeMockQueueFactory() { + const added: Array<{ name: string; data: unknown; opts?: JobsOptions; id: string }> = []; + const store = new Map(); + + const factory = () => ({ + add: jest.fn(async (name: string, data: unknown, opts?: JobsOptions) => { + const id = (opts?.jobId as string) ?? `job_${added.length + 1}`; + added.push({ name, data, opts, id }); + store.set(id, { name, data }); + return { id }; + }), + getJob: jest.fn(async (id: string) => { + if (!store.has(id)) return undefined; + return { + remove: jest.fn(async () => { + store.delete(id); + }), + }; + }), + getWaitingCount: jest.fn(async () => store.size), + pause: jest.fn(async () => undefined), + resume: jest.fn(async () => undefined), + close: jest.fn(async () => undefined), + }); + + return { factory, added, store }; +} + +describe('PriorityQueue', () => { + it('enqueues with BullMQ priority mapped from class', async () => { + const { factory, added } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'critical', + queueFactory: factory, + }); + + await queue.add('payment:confirm', { id: 'tx_1' }); + + expect(added).toHaveLength(1); + expect(added[0].opts?.priority).toBe(1); + expect(queue.depth).toBe(1); + }); + + it('rejects spawnSubJob when parent priority does not match queue class', async () => { + const { factory } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'low', + queueFactory: factory, + }); + + await expect(queue.spawnSubJob('child', { step: 2 }, 'critical')).rejects.toThrow( + 'Use WeightedFairQueue.spawnSubJob()', + ); + }); + + it('allows spawnSubJob when parent priority matches queue class', async () => { + const { factory, added } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'critical', + queueFactory: factory, + }); + + await queue.spawnSubJob('child', { step: 2 }, 'critical'); + + expect(added[0].opts?.priority).toBe(1); + expect(queue.peek()?.priority).toBe('critical'); + }); + + it('throws when queue is at capacity', async () => { + const { factory } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'normal', + maxSize: 2, + queueFactory: factory, + }); + + await queue.add('a', {}); + await queue.add('b', {}); + await expect(queue.add('c', {})).rejects.toThrow('at capacity'); + }); + + it('rejects enqueue when paused', async () => { + const { factory } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'low', + queueFactory: factory, + }); + + await queue.pause(); + await expect(queue.add('job', {})).rejects.toThrow('paused'); + }); + + it('removes job from BullMQ when dequeued', async () => { + const { factory, store } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'critical', + queueFactory: factory, + }); + + await queue.add('pay', { id: '1' }); + expect(store.size).toBe(1); + + const job = await queue.dequeue(); + expect(job).toBeDefined(); + expect(store.size).toBe(0); + expect(queue.depth).toBe(0); + }); + + it('records SLO violations for critical jobs exceeding 30s wait', () => { + const { factory } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'critical', + queueFactory: factory, + }); + + queue.recordProcessed(35_000, 500, 'critical'); + expect(queue.getStats().sloViolations).toBe(1); + + queue.recordProcessed(10_000, 200, 'critical'); + expect(queue.getStats().sloViolations).toBe(1); + }); + + it('pauses and resumes dequeue', async () => { + const { factory } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: 'low', + queueFactory: factory, + }); + + await queue.add('job', {}); + await queue.pause(); + expect(await queue.dequeue()).toBeUndefined(); + + await queue.resume(); + expect(await queue.dequeue()).toBeDefined(); + }); + + it('maps each priority class to the correct BullMQ value', async () => { + const expected: Record = { + critical: 1, + high: 2, + normal: 3, + low: 4, + }; + + for (const [cls, bullPriority] of Object.entries(expected) as [PriorityClass, number][]) { + const { factory, added } = makeMockQueueFactory(); + const queue = new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority: cls, + queueFactory: factory, + }); + await queue.add('test', {}); + expect(added[0].opts?.priority).toBe(bullPriority); + } + }); +}); diff --git a/backend/shared/queue/__tests__/queueFactory.test.ts b/backend/shared/queue/__tests__/queueFactory.test.ts new file mode 100644 index 00000000..84aef4a0 --- /dev/null +++ b/backend/shared/queue/__tests__/queueFactory.test.ts @@ -0,0 +1,28 @@ +import { createJobQueueSystem } from '../queueFactory'; +import { DEFAULT_PRIORITY_WEIGHTS } from '../types'; + +describe('createJobQueueSystem', () => { + it('wires all four priority queues to a WFQ scheduler', async () => { + const { scheduler, queues } = createJobQueueSystem({ + connection: { host: 'localhost', port: 6379 }, + baseQueueName: 'test', + maxQueueSize: 50, + }); + + expect(queues.critical.priority).toBe('critical'); + expect(queues.high.priority).toBe('high'); + expect(queues.normal.priority).toBe('normal'); + expect(queues.low.priority).toBe('low'); + + await scheduler.enqueue('critical', 'pay', { id: '1' }); + await scheduler.enqueue('low', 'analytics', { id: '2' }); + + const snapshot = scheduler.getSnapshot(); + expect(snapshot.effectiveWeights.critical).toBeGreaterThan(snapshot.effectiveWeights.low); + + const next = await scheduler.scheduleNext(); + expect(next?.priority).toBe('critical'); + + expect(DEFAULT_PRIORITY_WEIGHTS.critical).toBe(50); + }); +}); diff --git a/backend/shared/queue/__tests__/weightedFairQueue.test.ts b/backend/shared/queue/__tests__/weightedFairQueue.test.ts new file mode 100644 index 00000000..5be2ebb9 --- /dev/null +++ b/backend/shared/queue/__tests__/weightedFairQueue.test.ts @@ -0,0 +1,255 @@ +import { PriorityQueue } from '../priorityQueue'; +import { + WeightedFairQueue, + computeEffectiveWeights, + resolveBackpressure, + selectNextPriority, +} from '../weightedFairQueue'; +import { DEFAULT_PRIORITY_WEIGHTS, type PriorityClass } from '../types'; + +function makeMockConnection() { + return {} as never; +} + +function makeQueues(maxSize = 100) { + const store = new Map>(); + + const factory = (priority: PriorityClass) => { + if (!store.has(priority)) store.set(priority, new Map()); + const bucket = store.get(priority)!; + + return () => ({ + add: jest.fn(async (name: string, _data: unknown, opts?: { jobId?: string }) => { + const id = opts?.jobId ?? `${priority}_${bucket.size + 1}`; + bucket.set(id, { name }); + return { id }; + }), + getJob: jest.fn(async (id: string) => { + if (!bucket.has(id)) return undefined; + return { remove: jest.fn(async (): Promise => { bucket.delete(id); }) }; + }), + getWaitingCount: jest.fn(async () => bucket.size), + pause: jest.fn(async () => undefined), + resume: jest.fn(async () => undefined), + close: jest.fn(async () => undefined), + }); + }; + + const make = (priority: PriorityClass) => + new PriorityQueue({ + connection: makeMockConnection(), + baseQueueName: 'subtrackr', + priority, + maxSize, + queueFactory: factory(priority), + }); + + return { + critical: make('critical'), + high: make('high'), + normal: make('normal'), + low: make('low'), + }; +} + +describe('computeEffectiveWeights', () => { + it('returns base weights when all queues have work', () => { + const depths = { critical: 5, high: 3, normal: 2, low: 1 }; + const weights = computeEffectiveWeights(DEFAULT_PRIORITY_WEIGHTS, depths, new Set()); + expect(weights.critical).toBeCloseTo(50, 0); + expect(weights.high).toBeCloseTo(25, 0); + expect(weights.normal).toBeCloseTo(15, 0); + expect(weights.low).toBeCloseTo(10, 0); + }); + + it('redistributes idle critical capacity to active queues', () => { + const depths = { critical: 0, high: 10, normal: 5, low: 3 }; + const weights = computeEffectiveWeights(DEFAULT_PRIORITY_WEIGHTS, depths, new Set()); + + expect(weights.critical).toBe(0); + expect(weights.high).toBeGreaterThan(25); + expect(weights.normal).toBeGreaterThan(15); + expect(weights.low).toBeGreaterThan(10); + expect(weights.high + weights.normal + weights.low).toBeCloseTo(100, 0); + }); + + it('guarantees low priority at least 1% when it has jobs', () => { + const depths = { critical: 100, high: 100, normal: 100, low: 5 }; + const weights = computeEffectiveWeights(DEFAULT_PRIORITY_WEIGHTS, depths, new Set()); + const total = weights.critical + weights.high + weights.normal + weights.low; + expect(weights.low / total).toBeGreaterThanOrEqual(0.01); + }); + + it('zeros paused queues', () => { + const depths = { critical: 10, high: 10, normal: 10, low: 10 }; + const weights = computeEffectiveWeights(DEFAULT_PRIORITY_WEIGHTS, depths, new Set(['low'])); + expect(weights.low).toBe(0); + }); +}); + +describe('selectNextPriority', () => { + it('favours critical jobs under default weights', () => { + const deficits = { critical: 0, high: 0, normal: 0, low: 0 }; + const depths = { critical: 10, high: 10, normal: 10, low: 10 }; + const weights = { ...DEFAULT_PRIORITY_WEIGHTS }; + + const picks: PriorityClass[] = []; + for (let i = 0; i < 20; i++) { + const next = selectNextPriority(deficits, depths, weights, new Set()); + if (next) picks.push(next); + } + + const criticalCount = picks.filter((p) => p === 'critical').length; + const lowCount = picks.filter((p) => p === 'low').length; + expect(criticalCount).toBeGreaterThan(lowCount); + }); + + it('returns null when all queues are empty', () => { + const deficits = { critical: 0, high: 0, normal: 0, low: 0 }; + const depths = { critical: 0, high: 0, normal: 0, low: 0 }; + const result = selectNextPriority(deficits, depths, DEFAULT_PRIORITY_WEIGHTS, new Set()); + expect(result).toBeNull(); + }); + + it('skips paused queues', () => { + const deficits = { critical: 0, high: 0, normal: 0, low: 0 }; + const depths = { critical: 5, high: 0, normal: 0, low: 10 }; + const result = selectNextPriority(deficits, depths, DEFAULT_PRIORITY_WEIGHTS, new Set(['low'])); + expect(result).toBe('critical'); + }); +}); + +describe('resolveBackpressure', () => { + it('pauses low first when all queues are full', () => { + const depths = { critical: 100, high: 100, normal: 100, low: 100 }; + const result = resolveBackpressure(depths, 100, new Set()); + expect(result).toBe('low'); + }); + + it('pauses normal when low is already paused', () => { + const depths = { critical: 100, high: 100, normal: 100, low: 100 }; + const result = resolveBackpressure(depths, 100, new Set(['low'])); + expect(result).toBe('normal'); + }); + + it('returns null when queues are not all full', () => { + const depths = { critical: 100, high: 50, normal: 100, low: 100 }; + const result = resolveBackpressure(depths, 100, new Set()); + expect(result).toBeNull(); + }); + + it('never pauses critical', () => { + const depths = { critical: 100, high: 100, normal: 100, low: 100 }; + const paused = new Set(['low', 'normal', 'high']); + const result = resolveBackpressure(depths, 100, paused); + expect(result).toBeNull(); + }); +}); + +describe('WeightedFairQueue', () => { + it('schedules jobs respecting WFQ weights', async () => { + const queues = makeQueues(); + const wfq = new WeightedFairQueue(queues); + + for (let i = 0; i < 50; i++) await queues.critical.add('c', { i }); + for (let i = 0; i < 50; i++) await queues.low.add('l', { i }); + + const picks: PriorityClass[] = []; + for (let i = 0; i < 60; i++) { + const next = await wfq.scheduleNext(); + if (next) picks.push(next.priority); + } + + const criticalCount = picks.filter((p) => p === 'critical').length; + const lowCount = picks.filter((p) => p === 'low').length; + expect(criticalCount).toBeGreaterThan(lowCount * 2); + }); + + it('routes spawnSubJob to the correct priority queue', async () => { + const queues = makeQueues(); + const wfq = new WeightedFairQueue(queues); + + await wfq.spawnSubJob('critical', 'urgent-child', { id: 1 }); + + expect(queues.critical.depth).toBe(1); + expect(queues.low.depth).toBe(0); + + const next = await wfq.scheduleNext(); + expect(next?.priority).toBe('critical'); + expect(next?.job.name).toBe('urgent-child'); + }); + + it('auto-applies backpressure on enqueue when all queues full', async () => { + const queues = makeQueues(2); + const wfq = new WeightedFairQueue(queues, { maxQueueSize: 2 }); + + await wfq.enqueue('critical', 'c1', {}); + await wfq.enqueue('critical', 'c2', {}); + await wfq.enqueue('high', 'h1', {}); + await wfq.enqueue('high', 'h2', {}); + await wfq.enqueue('normal', 'n1', {}); + await wfq.enqueue('normal', 'n2', {}); + await wfq.enqueue('low', 'l1', {}); + await wfq.enqueue('low', 'l2', {}); + + await expect(wfq.enqueue('low', 'l3', {})).rejects.toThrow('paused'); + expect(queues.low.isPaused).toBe(true); + }); + + it('processNext executes handler and records metrics', async () => { + const queues = makeQueues(); + const wfq = new WeightedFairQueue(queues); + const handler = jest.fn(async () => undefined); + + await wfq.enqueue('critical', 'test:job', { value: 42 }); + + const processed = await wfq.processNext({ 'test:job': handler }); + expect(processed).toBe(true); + expect(handler).toHaveBeenCalledWith(expect.objectContaining({ data: { value: 42 } })); + expect(queues.critical.getStats().totalProcessed).toBe(1); + }); + + it('records completion metrics and SLO violations', async () => { + const queues = makeQueues(); + const wfq = new WeightedFairQueue(queues); + + const job = await queues.critical.add('pay', { id: '1' }); + await wfq.scheduleNext(); + + const enqueuedAt = Date.now() - 35_000; + const staleJob = { ...job, enqueuedAt }; + wfq.recordCompletion('critical', staleJob, 200); + + expect(queues.critical.getStats().sloViolations).toBe(1); + expect(queues.critical.getStats().lastProcessingTimeMs).toBe(200); + }); + + it('exposes scheduler snapshot with effective weights', async () => { + const queues = makeQueues(); + const wfq = new WeightedFairQueue(queues); + + await queues.high.add('h', {}); + const snapshot = wfq.getSnapshot(); + + expect(snapshot.depths.high).toBe(1); + expect(snapshot.depths.critical).toBe(0); + expect(snapshot.effectiveWeights.critical).toBe(0); + expect(snapshot.effectiveWeights.high).toBeGreaterThan(25); + }); + + it('distributes capacity close to target weights over many rounds', () => { + const deficits = { critical: 0, high: 0, normal: 0, low: 0 }; + const depths = { critical: 1000, high: 1000, normal: 1000, low: 1000 }; + const counts = { critical: 0, high: 0, normal: 0, low: 0 }; + + for (let i = 0; i < 1000; i++) { + const w = computeEffectiveWeights(DEFAULT_PRIORITY_WEIGHTS, depths, new Set()); + const p = selectNextPriority(deficits, depths, w, new Set()); + if (p) counts[p]++; + } + + expect(counts.critical / 1000).toBeGreaterThan(0.4); + expect(counts.high / 1000).toBeGreaterThan(0.15); + expect(counts.low / 1000).toBeGreaterThan(0.05); + }); +}); diff --git a/backend/shared/queue/index.ts b/backend/shared/queue/index.ts new file mode 100644 index 00000000..dd68b41c --- /dev/null +++ b/backend/shared/queue/index.ts @@ -0,0 +1,34 @@ +export { + BULLMQ_PRIORITY, + DEFAULT_PRIORITY_WEIGHTS, + LATENCY_SLO_MS, + LOW_PRIORITY_MIN_CAPACITY_PERCENT, + PRIORITY_ORDER, + createEmptyStats, +} from './types'; +export type { + EnqueueOptions, + PriorityClass, + PriorityQueueStats, + PriorityStatsMap, + QueueJob, +} from './types'; + +export { PriorityQueue } from './priorityQueue'; +export type { BullJobLike, BullQueueLike, PriorityQueueConfig } from './priorityQueue'; + +export { + WeightedFairQueue, + computeEffectiveWeights, + selectNextPriority, + resolveBackpressure, +} from './weightedFairQueue'; +export type { + JobHandler, + JobHandlerMap, + SchedulerSnapshot, + WeightedFairQueueConfig, +} from './weightedFairQueue'; + +export { createJobQueueSystem } from './queueFactory'; +export type { JobQueueSystem, JobQueueSystemConfig } from './queueFactory'; diff --git a/backend/shared/queue/priorityQueue.ts b/backend/shared/queue/priorityQueue.ts new file mode 100644 index 00000000..fb549222 --- /dev/null +++ b/backend/shared/queue/priorityQueue.ts @@ -0,0 +1,190 @@ +/** + * PriorityQueue — wraps BullMQ with priority-class semantics. + * + * Scheduling source of truth: the in-memory pending list consumed by + * WeightedFairQueue. BullMQ is the durable persistence layer; jobs are + * removed from Redis when the WFQ scheduler dequeues them. + */ + +import { Queue, type ConnectionOptions, type JobsOptions, type QueueOptions } from 'bullmq'; +import { + BULLMQ_PRIORITY, + LATENCY_SLO_MS, + type EnqueueOptions, + type PriorityClass, + type PriorityQueueStats, + type QueueJob, +} from './types'; + +// ── Adapter for test doubles ────────────────────────────────────────────────── + +export interface BullJobLike { + remove(): Promise; +} + +export interface BullQueueLike { + add(name: string, data: unknown, opts?: JobsOptions): Promise<{ id?: string }>; + getJob(id: string): Promise; + getWaitingCount(): Promise; + pause(): Promise; + resume(): Promise; + close(): Promise; +} + +export interface PriorityQueueConfig { + connection: ConnectionOptions; + baseQueueName: string; + priority: PriorityClass; + maxSize?: number; + /** Inject a mock queue in tests. */ + queueFactory?: (name: string, opts: QueueOptions) => BullQueueLike; +} + +// ── PriorityQueue ───────────────────────────────────────────────────────────── + +export class PriorityQueue { + readonly priority: PriorityClass; + private readonly queue: BullQueueLike; + private readonly maxSize: number; + private readonly pending: QueueJob[] = []; + private paused = false; + private stats: PriorityQueueStats; + + constructor(config: PriorityQueueConfig) { + this.priority = config.priority; + this.maxSize = config.maxSize ?? 10_000; + const queueName = `${config.baseQueueName}:${config.priority}`; + const factory = config.queueFactory ?? ((name, opts) => new Queue(name, opts) as unknown as BullQueueLike); + this.queue = factory(queueName, { connection: config.connection }); + this.stats = { + depth: 0, + paused: false, + maxSize: this.maxSize, + totalEnqueued: 0, + totalProcessed: 0, + totalWaitTimeMs: 0, + totalProcessingTimeMs: 0, + sloViolations: 0, + lastWaitTimeMs: 0, + lastProcessingTimeMs: 0, + }; + } + + get isPaused(): boolean { + return this.paused; + } + + get depth(): number { + return this.pending.length; + } + + get isFull(): boolean { + return this.pending.length >= this.maxSize; + } + + getStats(): PriorityQueueStats { + return { ...this.stats, depth: this.pending.length, paused: this.paused }; + } + + /** + * Enqueue a job at this queue's priority class. + * Persists to BullMQ and adds to the in-memory scheduling buffer. + */ + async add(name: string, data: T, opts: EnqueueOptions = {}): Promise> { + if (this.paused) { + throw new Error(`PriorityQueue [${this.priority}] is paused — cannot accept new jobs`); + } + if (this.isFull) { + throw new Error(`PriorityQueue [${this.priority}] is at capacity (${this.maxSize})`); + } + + const effectivePriority = opts.parentPriority ?? this.priority; + const enqueuedAt = Date.now(); + const bullOpts: JobsOptions = { + priority: BULLMQ_PRIORITY[effectivePriority], + delay: opts.delay, + jobId: opts.jobId, + }; + + const result = await this.queue.add(name, data, bullOpts); + const bullJobId = result.id ?? opts.jobId; + const job: QueueJob = { + id: bullJobId ?? `${name}_${enqueuedAt}`, + name, + data, + priority: effectivePriority, + enqueuedAt, + bullJobId, + }; + + this.pending.push(job); + this.stats.totalEnqueued += 1; + this.stats.depth = this.pending.length; + return job; + } + + /** + * Spawn a sub-job at the same priority as this queue. + * For cross-priority inheritance use WeightedFairQueue.spawnSubJob(). + */ + async spawnSubJob(name: string, data: T, parentPriority: PriorityClass): Promise> { + if (parentPriority !== this.priority) { + throw new Error( + `spawnSubJob: parent priority "${parentPriority}" does not match queue "${this.priority}". ` + + 'Use WeightedFairQueue.spawnSubJob() to route sub-jobs to the correct priority class.', + ); + } + return this.add(name, data, { parentPriority }); + } + + /** Dequeue the next job and remove it from BullMQ (WFQ scheduling). */ + async dequeue(): Promise | undefined> { + if (this.paused || this.pending.length === 0) return undefined; + + const job = this.pending.shift(); + if (!job) return undefined; + + if (job.bullJobId) { + const bullJob = await this.queue.getJob(job.bullJobId); + await bullJob?.remove(); + } + + this.stats.depth = this.pending.length; + return job; + } + + /** Peek without removing. */ + peek(): QueueJob | undefined { + return this.pending[0]; + } + + /** Record processing completion and check SLO. */ + recordProcessed(waitTimeMs: number, processingTimeMs: number, jobPriority: PriorityClass): void { + this.stats.totalProcessed += 1; + this.stats.totalWaitTimeMs += waitTimeMs; + this.stats.totalProcessingTimeMs += processingTimeMs; + this.stats.lastWaitTimeMs = waitTimeMs; + this.stats.lastProcessingTimeMs = processingTimeMs; + + const slo = LATENCY_SLO_MS[jobPriority]; + if (Number.isFinite(slo) && waitTimeMs > slo) { + this.stats.sloViolations += 1; + } + } + + async pause(): Promise { + this.paused = true; + this.stats.paused = true; + await this.queue.pause(); + } + + async resume(): Promise { + this.paused = false; + this.stats.paused = false; + await this.queue.resume(); + } + + async close(): Promise { + await this.queue.close(); + } +} diff --git a/backend/shared/queue/queueFactory.ts b/backend/shared/queue/queueFactory.ts new file mode 100644 index 00000000..d4ebae2e --- /dev/null +++ b/backend/shared/queue/queueFactory.ts @@ -0,0 +1,39 @@ +/** + * Factory for wiring a complete weighted-fair job queue system. + */ + +import type { ConnectionOptions } from 'bullmq'; +import { PriorityQueue } from './priorityQueue'; +import { WeightedFairQueue, type WeightedFairQueueConfig } from './weightedFairQueue'; +import type { PriorityClass } from './types'; + +export interface JobQueueSystemConfig { + connection: ConnectionOptions; + baseQueueName?: string; + maxQueueSize?: number; + weights?: WeightedFairQueueConfig['weights']; +} + +export interface JobQueueSystem { + scheduler: WeightedFairQueue; + queues: Record; +} + +export function createJobQueueSystem(config: JobQueueSystemConfig): JobQueueSystem { + const baseQueueName = config.baseQueueName ?? 'subtrackr:jobs'; + const maxSize = config.maxQueueSize ?? 10_000; + + const queues = { + critical: new PriorityQueue({ connection: config.connection, baseQueueName, priority: 'critical', maxSize }), + high: new PriorityQueue({ connection: config.connection, baseQueueName, priority: 'high', maxSize }), + normal: new PriorityQueue({ connection: config.connection, baseQueueName, priority: 'normal', maxSize }), + low: new PriorityQueue({ connection: config.connection, baseQueueName, priority: 'low', maxSize }), + }; + + const scheduler = new WeightedFairQueue(queues, { + weights: config.weights, + maxQueueSize: maxSize, + }); + + return { scheduler, queues }; +} diff --git a/backend/shared/queue/types.ts b/backend/shared/queue/types.ts new file mode 100644 index 00000000..1ed3a6c6 --- /dev/null +++ b/backend/shared/queue/types.ts @@ -0,0 +1,95 @@ +/** + * Job queue priority types and scheduling constants. + * + * Priority classes map to BullMQ numeric priorities (lower = higher urgency). + * Weights drive weighted fair queuing across worker capacity. + */ + +export type PriorityClass = 'critical' | 'high' | 'normal' | 'low'; + +/** Descending urgency — used for backpressure (pause lowest first). */ +export const PRIORITY_ORDER: readonly PriorityClass[] = ['critical', 'high', 'normal', 'low']; + +/** Default WFQ capacity shares (must sum to 100). */ +export const DEFAULT_PRIORITY_WEIGHTS: Record = { + critical: 50, + high: 25, + normal: 15, + low: 10, +}; + +/** Minimum guaranteed capacity for low-priority jobs under load (1%). */ +export const LOW_PRIORITY_MIN_CAPACITY_PERCENT = 1; + +/** Latency SLO thresholds in milliseconds. */ +export const LATENCY_SLO_MS: Record = { + critical: 30_000, + high: 120_000, + normal: 600_000, + low: Infinity, +}; + +/** + * BullMQ priority values — lower number means higher priority. + * @see https://docs.bullmq.io/guide/jobs/prioritized + */ +export const BULLMQ_PRIORITY: Record = { + critical: 1, + high: 2, + normal: 3, + low: 4, +}; + +export interface EnqueueOptions { + /** Inherit parent job priority when spawning sub-jobs. */ + parentPriority?: PriorityClass; + delay?: number; + jobId?: string; +} + +export interface QueueJob { + id: string; + name: string; + data: T; + priority: PriorityClass; + enqueuedAt: number; + /** BullMQ job id — used to remove from Redis when dequeued by WFQ. */ + bullJobId?: string; +} + +export interface PriorityQueueStats { + depth: number; + paused: boolean; + maxSize: number; + totalEnqueued: number; + totalProcessed: number; + totalWaitTimeMs: number; + totalProcessingTimeMs: number; + sloViolations: number; + lastWaitTimeMs: number; + lastProcessingTimeMs: number; +} + +export type PriorityStatsMap = Record; + +export function createEmptyStats(maxSize = 10_000): PriorityStatsMap { + const empty = (): PriorityQueueStats => ({ + depth: 0, + paused: false, + maxSize, + totalEnqueued: 0, + totalProcessed: 0, + totalWaitTimeMs: 0, + totalProcessingTimeMs: 0, + sloViolations: 0, + lastWaitTimeMs: 0, + lastProcessingTimeMs: 0, + }); + + return { + critical: empty(), + high: empty(), + normal: empty(), + low: empty(), + }; +} diff --git a/backend/shared/queue/weightedFairQueue.ts b/backend/shared/queue/weightedFairQueue.ts new file mode 100644 index 00000000..e293448c --- /dev/null +++ b/backend/shared/queue/weightedFairQueue.ts @@ -0,0 +1,318 @@ +/** + * Weighted Fair Queue scheduler. + * + * Implements deficit round robin across priority classes with: + * - configurable weights (default 50/25/15/10) + * - dynamic reweighting when higher queues are idle + * - starvation prevention (low guaranteed ≥ 1% capacity) + * - backpressure (pause lowest priority first when all queues full) + */ + +import { PriorityQueue } from './priorityQueue'; +import { + DEFAULT_PRIORITY_WEIGHTS, + LOW_PRIORITY_MIN_CAPACITY_PERCENT, + PRIORITY_ORDER, + type EnqueueOptions, + type PriorityClass, + type PriorityStatsMap, + type QueueJob, + createEmptyStats, +} from './types'; + +export interface WeightedFairQueueConfig { + weights?: Record; + maxQueueSize?: number; +} + +export type JobHandler = (job: QueueJob) => Promise; +export type JobHandlerMap = Record; + +export interface SchedulerSnapshot { + effectiveWeights: Record; + deficits: Record; + depths: Record; + paused: PriorityClass[]; +} + +/** + * Pure scheduling functions — exported for unit testing. + */ + +export function computeEffectiveWeights( + baseWeights: Record, + depths: Record, + paused: ReadonlySet, +): Record { + const effective: Record = { ...baseWeights }; + + // Zero out paused or empty queues and collect redistributable capacity. + let redistributable = 0; + for (const p of PRIORITY_ORDER) { + if (paused.has(p) || depths[p] === 0) { + redistributable += effective[p]; + effective[p] = 0; + } + } + + // Dynamic reweighting — share idle capacity among queues that have work. + const active = PRIORITY_ORDER.filter((p) => !paused.has(p) && depths[p] > 0); + if (active.length > 0 && redistributable > 0) { + const activeWeightSum = active.reduce((sum, p) => sum + baseWeights[p], 0); + for (const p of active) { + effective[p] += redistributable * (baseWeights[p] / activeWeightSum); + } + } + + // Starvation prevention — low always gets at least 1% when it has jobs. + if (!paused.has('low') && depths.low > 0) { + const total = PRIORITY_ORDER.reduce((sum, p) => sum + effective[p], 0); + const minLow = (LOW_PRIORITY_MIN_CAPACITY_PERCENT / 100) * Math.max(total, 100); + if (effective.low < minLow) { + const deficit = minLow - effective.low; + const donors = PRIORITY_ORDER.filter( + (p) => p !== 'low' && effective[p] > minLow, + ); + let remaining = deficit; + for (const donor of donors) { + if (remaining <= 0) break; + const take = Math.min(remaining, effective[donor] - minLow); + effective[donor] -= take; + effective.low += take; + remaining -= take; + } + } + } + + return effective; +} + +export function selectNextPriority( + deficits: Record, + depths: Record, + effectiveWeights: Record, + paused: ReadonlySet, +): PriorityClass | null { + let best: PriorityClass | null = null; + let bestDeficit = -Infinity; + + for (const p of PRIORITY_ORDER) { + if (paused.has(p) || depths[p] === 0) continue; + deficits[p] += effectiveWeights[p]; + if (deficits[p] > bestDeficit) { + bestDeficit = deficits[p]; + best = p; + } + } + + if (best === null) return null; + + const weightSum = PRIORITY_ORDER.reduce( + (sum, p) => sum + (depths[p] > 0 && !paused.has(p) ? effectiveWeights[p] : 0), + 0, + ); + deficits[best] -= weightSum > 0 ? weightSum : effectiveWeights[best]; + return best; +} + +export function resolveBackpressure( + depths: Record, + maxSize: number, + currentlyPaused: ReadonlySet, +): PriorityClass | null { + const allFull = PRIORITY_ORDER.every( + (p) => currentlyPaused.has(p) || depths[p] >= maxSize, + ); + if (!allFull) return null; + + // Pause lowest priority first (never pause critical). + for (let i = PRIORITY_ORDER.length - 1; i >= 1; i--) { + const p = PRIORITY_ORDER[i]; + if (!currentlyPaused.has(p)) return p; + } + return null; +} + +// ── WeightedFairQueue ───────────────────────────────────────────────────────── + +export class WeightedFairQueue { + private readonly queues: Record; + private readonly weights: Record; + private readonly maxSize: number; + private readonly deficits: Record = { + critical: 0, + high: 0, + normal: 0, + low: 0, + }; + private readonly paused = new Set(); + + constructor(queues: Record, config: WeightedFairQueueConfig = {}) { + this.queues = queues; + this.weights = { ...DEFAULT_PRIORITY_WEIGHTS, ...config.weights }; + this.maxSize = config.maxQueueSize ?? 10_000; + } + + getQueue(priority: PriorityClass): PriorityQueue { + return this.queues[priority]; + } + + getDepths(): Record { + return { + critical: this.queues.critical.depth, + high: this.queues.high.depth, + normal: this.queues.normal.depth, + low: this.queues.low.depth, + }; + } + + getSnapshot(): SchedulerSnapshot { + const depths = this.getDepths(); + return { + effectiveWeights: computeEffectiveWeights(this.weights, depths, this.paused), + deficits: { ...this.deficits }, + depths, + paused: [...this.paused], + }; + } + + getStats(): PriorityStatsMap { + return { + critical: this.queues.critical.getStats(), + high: this.queues.high.getStats(), + normal: this.queues.normal.getStats(), + low: this.queues.low.getStats(), + }; + } + + /** Apply backpressure — pause lowest-priority non-empty queue when all are full. */ + async applyBackpressure(): Promise { + const toPause = resolveBackpressure(this.getDepths(), this.maxSize, this.paused); + if (toPause) { + await this.queues[toPause].pause(); + this.paused.add(toPause); + } + return toPause; + } + + /** + * Enqueue a job with automatic backpressure handling. + * Preferred entry point over calling PriorityQueue.add() directly. + */ + async enqueue( + priority: PriorityClass, + name: string, + data: T, + opts: EnqueueOptions = {}, + ): Promise> { + await this.applyBackpressure(); + if (this.paused.has(priority) || this.queues[priority].isPaused) { + throw new Error(`Cannot enqueue to paused priority class "${priority}"`); + } + return this.queues[priority].add(name, data, opts) as Promise>; + } + + /** + * Priority inheritance — routes sub-jobs to the parent's priority queue. + */ + async spawnSubJob( + parentPriority: PriorityClass, + name: string, + data: T, + ): Promise> { + return this.enqueue(parentPriority, name, data, { parentPriority }); + } + + /** + * Select the next job using weighted fair queuing. + * Returns null when no work is available. + */ + async scheduleNext(): Promise<{ priority: PriorityClass; job: QueueJob } | null> { + const depths = this.getDepths(); + const effectiveWeights = computeEffectiveWeights(this.weights, depths, this.paused); + const priority = selectNextPriority(this.deficits, depths, effectiveWeights, this.paused); + if (!priority) return null; + + const job = await this.queues[priority].dequeue(); + if (!job) return null; + + return { priority, job }; + } + + /** + * Process a single job: schedule → execute handler → record metrics. + * Returns false when no job was available. + */ + async processNext(handlers: JobHandlerMap): Promise { + const next = await this.scheduleNext(); + if (!next) return false; + + const start = Date.now(); + const handler = handlers[next.job.name]; + if (!handler) { + throw new Error(`No handler registered for job "${next.job.name}"`); + } + + try { + await handler(next.job); + } finally { + this.recordCompletion(next.priority, next.job, Date.now() - start); + } + + return true; + } + + private workerTimer: ReturnType | null = null; + private processing = false; + + /** Start a polling worker loop that processes jobs via registered handlers. */ + startProcessing(handlers: JobHandlerMap, intervalMs = 50): void { + if (this.workerTimer) return; + this.workerTimer = setInterval(() => { + if (this.processing) return; + this.processing = true; + void this.processNext(handlers) + .catch((err) => console.error('[WeightedFairQueue] Worker error:', err)) + .finally(() => { + this.processing = false; + }); + }, intervalMs); + } + + stopProcessing(): void { + if (this.workerTimer) { + clearInterval(this.workerTimer); + this.workerTimer = null; + } + } + + isProcessing(): boolean { + return this.workerTimer !== null; + } + + /** + * Record job completion metrics on the originating priority queue. + */ + recordCompletion( + queuePriority: PriorityClass, + job: QueueJob, + processingTimeMs: number, + ): void { + const waitTimeMs = Date.now() - job.enqueuedAt; + this.queues[queuePriority].recordProcessed(waitTimeMs, processingTimeMs, job.priority); + } + + async resumeAll(): Promise { + for (const p of this.paused) { + await this.queues[p].resume(); + } + this.paused.clear(); + } + + async close(): Promise { + this.stopProcessing(); + await Promise.all(PRIORITY_ORDER.map((p) => this.queues[p].close())); + } +} + +export { createEmptyStats }; diff --git a/backend/webhook/jobs/webhookDeliveryJob.ts b/backend/webhook/jobs/webhookDeliveryJob.ts new file mode 100644 index 00000000..0404ee19 --- /dev/null +++ b/backend/webhook/jobs/webhookDeliveryJob.ts @@ -0,0 +1,39 @@ +/** + * Webhook delivery background jobs. + * + * Outbound merchant webhook dispatch — normal priority. + */ + +import type { PriorityClass, PriorityQueue, QueueJob, WeightedFairQueue } from '../../shared/queue'; + +export const WEBHOOK_JOB_PRIORITY: PriorityClass = 'normal'; + +export const WEBHOOK_JOB_NAMES = { + DELIVER: 'webhook:deliver', + RETRY: 'webhook:retry', +} as const; + +export type WebhookJobName = (typeof WEBHOOK_JOB_NAMES)[keyof typeof WEBHOOK_JOB_NAMES]; + +export interface WebhookDeliveryPayload { + webhookId: string; + merchantId: string; + url: string; + eventType: string; + payload: Record; + attempt: number; +} + +export async function enqueueWebhookDelivery( + queue: PriorityQueue, + payload: WebhookDeliveryPayload, +): Promise> { + return queue.add(WEBHOOK_JOB_NAMES.DELIVER, payload); +} + +export async function enqueueWebhookRetry( + scheduler: WeightedFairQueue, + payload: WebhookDeliveryPayload, +): Promise> { + return scheduler.spawnSubJob(WEBHOOK_JOB_PRIORITY, WEBHOOK_JOB_NAMES.RETRY, payload); +} diff --git a/jest.backend.config.js b/jest.backend.config.js index 1b948182..33ca8c34 100644 --- a/jest.backend.config.js +++ b/jest.backend.config.js @@ -3,6 +3,9 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', testMatch: ['**/backend/**/__tests__/**/*.test.ts'], + moduleNameMapper: { + '^bullmq$': '/backend/shared/queue/__mocks__/bullmq.ts', + }, transform: { '^.+\\.tsx?$': ['ts-jest', { tsconfig: { strict: false, skipLibCheck: true } }], }, diff --git a/jest.config.js b/jest.config.js index f3a0ae61..4ae69d14 100644 --- a/jest.config.js +++ b/jest.config.js @@ -15,7 +15,9 @@ module.exports = { ], moduleNameMapper: { '^@/(.*)$': '/src/$1', - '^@react-native-community/netinfo$': '/src/__mocks__/@react-native-community/netinfo.js', + '^bullmq$': '/backend/shared/queue/__mocks__/bullmq.ts', + '^@react-native-community/netinfo$': + '/src/__mocks__/@react-native-community/netinfo.js', }, setupFilesAfterEnv: [], testEnvironment: 'node', diff --git a/package-lock.json b/package-lock.json index b59c73b5..41d70313 100644 --- a/package-lock.json +++ b/package-lock.json @@ -23,6 +23,7 @@ "@walletconnect/react-native-compat": "^2.23.9", "@walletconnect/utils": "^2.23.9", "bcryptjs": "^3.0.3", + "bullmq": "^5.79.1", "ethers": "^5.8.0", "expo": "~53.0.20", "expo-application": "~6.1.5", @@ -33,6 +34,7 @@ "expo-notifications": "^0.31.5", "expo-status-bar": "~2.2.3", "i18next": "^26.0.8", + "ioredis": "^5.11.1", "react": "19.2.5", "react-i18next": "^17.0.6", "react-native": "0.85.2", @@ -3563,6 +3565,12 @@ "integrity": "sha512-F0YfUDjvT+Mtt/R4xdl2X0EYCHMMiJqNLdxHD++jDT5ydEFIyqbCHh51Qx2E211dgZprPKhV7sHmnXKpLuvc5g==", "license": "MIT" }, + "node_modules/@ioredis/commands": { + "version": "1.10.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.10.0.tgz", + "integrity": "sha512-UmeW7z4LfctwoQ5wkhVzgq8tXkreED2xZGpX+Bg+zA+WJFZCT6c062AfCK/Dfk81xZnnwdhJCUMkitihRaoC2Q==", + "license": "MIT" + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -4428,6 +4436,84 @@ "node": ">= 18" } }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.4.tgz", + "integrity": "sha512-LCkGo6JDfaBhgST7UpPWgNgLINpcpabaHfyz5OBx75nUYxBsaEPxjnyNjWpeb/xBup/682QnBfRBy2/LvPutZQ==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.4.tgz", + "integrity": "sha512-zExlW9zUJKZH/tOtVMttwjKa4Xm/3KcNjnE3dPN92uCktwavMxpgCA3MoJK/DOnTWsQgo224OaST27/mPNAf+w==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.4.tgz", + "integrity": "sha512-Tg3yX65f5GbtXLkrYEHE5oibZG9epyYWas7FogTTEJeDEF9JlXJzKgXaNhT3UXlTOeA+AfZpYZYZ0uPj7Cfquw==", + "cpu": [ + "arm" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.4.tgz", + "integrity": "sha512-dgX0P/9wGPJeHFBG+ZmhgE6bmtMt7NP5CRBGyyktpopdk/mW4POnrpQsSLtKI1dwpc+pPLuXHDh6vvskyQE/sw==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.4.tgz", + "integrity": "sha512-8TNXMEjJc3QEy7R/x1INhgiU+XakDAFUzBhaz7+Rbrs8NH5UQeHQxxmzsSBJGyV6I1jW79undiQm8tOI+D+8FQ==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.4.tgz", + "integrity": "sha512-CmCXPQrkbwExx3j946/PtHWHbYJiCRBRDl4BlkRQcJB/YOwQxJRTpoo7aTsortjgoJ1x7opzTSxn7C+ASSLVjQ==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@napi-rs/wasm-runtime": { "version": "0.2.12", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-0.2.12.tgz", @@ -13383,6 +13469,79 @@ "node": ">=6.14.2" } }, + "node_modules/bullmq": { + "version": "5.79.1", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.79.1.tgz", + "integrity": "sha512-cteoHRr1FGOTUgzFrnMyBNGtQhNeVR8Ej6nImNSHQDJi4tj6GMD0p9ZG65ZsTnvR9RVf18dhRxWu4kFl634QGA==", + "license": "MIT", + "dependencies": { + "cron-parser": "4.9.0", + "ioredis": "5.10.1", + "msgpackr": "2.0.2", + "node-abort-controller": "3.1.1", + "semver": "7.8.1", + "tslib": "2.8.1" + }, + "engines": { + "node": ">=12.22.0" + }, + "peerDependencies": { + "redis": ">=5.0.0" + }, + "peerDependenciesMeta": { + "redis": { + "optional": true + } + } + }, + "node_modules/bullmq/node_modules/@ioredis/commands": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.5.1.tgz", + "integrity": "sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==", + "license": "MIT" + }, + "node_modules/bullmq/node_modules/ioredis": { + "version": "5.10.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.10.1.tgz", + "integrity": "sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.5.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/bullmq/node_modules/semver": { + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.8.1.tgz", + "integrity": "sha512-rkVq3IXh+4FDGch+KwzX3aV9W3kO54GyEgpvBzSyctDA6Xtd7RJQV1xmXbeQp5v7+VzLOfVqiutSE6GICgPFvg==", + "license": "ISC", + "bin": { + "semver": "bin/semver.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/bullmq/node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" + }, "node_modules/bunyamin": { "version": "1.6.3", "resolved": "https://registry.npmjs.org/bunyamin/-/bunyamin-1.6.3.tgz", @@ -14200,6 +14359,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.1.tgz", + "integrity": "sha512-rwHwUfXL40Chm1r08yrhU3qpUvdVlgkKNeyeGPOxnW8/SyVDvgRaed/Uz54AqWNaTCAThlj6QAs3TZcKI0xDEw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -14884,6 +15052,18 @@ "node": "^14.15.0 || ^16.10.0 || >=18.0.0" } }, + "node_modules/cron-parser": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "license": "MIT", + "dependencies": { + "luxon": "^3.2.1" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/cross-env": { "version": "10.1.0", "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-10.1.0.tgz", @@ -15396,6 +15576,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -20527,6 +20716,28 @@ "fp-ts": "^1.0.0" } }, + "node_modules/ioredis": { + "version": "5.11.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.11.1.tgz", + "integrity": "sha512-ehuGcf94bQXhfagULNXrJdfnWO38v070jxSx/qE87Kjzmu2fU7ro5EFAb+OPituLqgfyuQaym5DlrNydW2sJ9A==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.10.0", + "cluster-key-slot": "1.1.1", + "debug": "4.4.3", + "denque": "2.1.0", + "redis-errors": "1.2.0", + "redis-parser": "3.0.0", + "standard-as-callback": "2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -24270,6 +24481,12 @@ "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, "node_modules/lodash.escaperegexp": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/lodash.escaperegexp/-/lodash.escaperegexp-4.1.2.tgz", @@ -24277,6 +24494,12 @@ "dev": true, "license": "MIT" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.isplainobject": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/lodash.isplainobject/-/lodash.isplainobject-4.0.6.tgz", @@ -24815,6 +25038,15 @@ "yallist": "^3.0.2" } }, + "node_modules/luxon": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.7.2.tgz", + "integrity": "sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==", + "license": "MIT", + "engines": { + "node": ">=12" + } + }, "node_modules/make-asynchronous": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/make-asynchronous/-/make-asynchronous-1.1.0.tgz", @@ -25840,6 +26072,37 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", "license": "MIT" }, + "node_modules/msgpackr": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-2.0.2.tgz", + "integrity": "sha512-c5hYOXFbP79Slh6Dzd2wzk+jnV7mX1UxfMYtilnY1NmalXPqG8DGb5cYCMBrW4AsH3zekBBZd4QrKz9NhtvYLQ==", + "license": "MIT", + "optionalDependencies": { + "msgpackr-extract": "^3.0.4" + } + }, + "node_modules/msgpackr-extract": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.4.tgz", + "integrity": "sha512-4kmO/MdyUIkLIvTPr8VHLil4AtoKIoniWPIEk5+CDy0xnWC84azhSFmuJ7PxZdsYtiP5kEeQsORAVIeMgxT+Hw==", + "hasInstallScript": true, + "license": "MIT", + "optional": true, + "dependencies": { + "node-gyp-build-optional-packages": "5.2.2" + }, + "bin": { + "download-msgpackr-prebuilds": "bin/download-prebuilds.js" + }, + "optionalDependencies": { + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.4", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.4", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.4", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.4", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.4", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.4" + } + }, "node_modules/multi-sort-stream": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/multi-sort-stream/-/multi-sort-stream-1.0.4.tgz", @@ -26190,6 +26453,12 @@ "node": ">=12.0.0" } }, + "node_modules/node-abort-controller": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz", + "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==", + "license": "MIT" + }, "node_modules/node-addon-api": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-2.0.2.tgz", @@ -26276,6 +26545,31 @@ "node-gyp-build-test": "build-test.js" } }, + "node_modules/node-gyp-build-optional-packages": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz", + "integrity": "sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==", + "license": "MIT", + "optional": true, + "dependencies": { + "detect-libc": "^2.0.1" + }, + "bin": { + "node-gyp-build-optional-packages": "bin.js", + "node-gyp-build-optional-packages-optional": "optional.js", + "node-gyp-build-optional-packages-test": "build-test.js" + } + }, + "node_modules/node-gyp-build-optional-packages/node_modules/detect-libc": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.1.2.tgz", + "integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==", + "license": "Apache-2.0", + "optional": true, + "engines": { + "node": ">=8" + } + }, "node_modules/node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -31831,6 +32125,27 @@ "node": ">=8" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reduce-flatten": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/reduce-flatten/-/reduce-flatten-2.0.0.tgz", @@ -34109,6 +34424,12 @@ "node": ">=8" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "1.5.0", "resolved": "https://registry.npmjs.org/statuses/-/statuses-1.5.0.tgz", diff --git a/package.json b/package.json index 6c4a0d73..9bdbc3b2 100644 --- a/package.json +++ b/package.json @@ -75,6 +75,7 @@ "@walletconnect/react-native-compat": "^2.23.9", "@walletconnect/utils": "^2.23.9", "bcryptjs": "^3.0.3", + "bullmq": "^5.79.1", "ethers": "^5.8.0", "expo": "~53.0.20", "expo-application": "~6.1.5", @@ -85,6 +86,7 @@ "expo-notifications": "^0.31.5", "expo-status-bar": "~2.2.3", "i18next": "^26.0.8", + "ioredis": "^5.11.1", "react": "19.2.5", "react-i18next": "^17.0.6", "react-native": "0.85.2", @@ -101,8 +103,6 @@ }, "devDependencies": { "@babel/core": "^7.29.0", - "babel-plugin-module-resolver": "^5.0.2", - "babel-plugin-transform-remove-console": "^6.3.0", "@commitlint/cli": "^20.5.2", "@commitlint/config-conventional": "^20.5.0", "@config-plugins/detox": "^11.0.0", @@ -125,6 +125,8 @@ "@types/react-dom": "^19.2.3", "@typescript-eslint/eslint-plugin": "^7.0.0", "@typescript-eslint/parser": "^7.0.0", + "babel-plugin-module-resolver": "^5.0.2", + "babel-plugin-transform-remove-console": "^6.3.0", "cross-env": "^10.1.0", "detox": "^20.51.0", "eslint": "^8.57.0",