Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions backend/analytics/jobs/analyticsAggregationJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Analytics aggregation background jobs.
*
* Heavy aggregation and reporting — low priority.
*/

import type { PriorityClass, PriorityQueue, QueueJob, WeightedFairQueue } from '../../shared/queue';

export const ANALYTICS_JOB_PRIORITY: PriorityClass = 'low';

export const ANALYTICS_JOB_NAMES = {
AGGREGATE_DAILY: 'analytics:aggregate-daily',
AGGREGATE_HOURLY: 'analytics:aggregate-hourly',
MAINTENANCE_CLEANUP: 'analytics:maintenance-cleanup',
} as const;

export type AnalyticsJobName = (typeof ANALYTICS_JOB_NAMES)[keyof typeof ANALYTICS_JOB_NAMES];

export interface AnalyticsAggregationPayload {
reportType: string;
merchantId?: string;
dateRange: { start: string; end: string };
}

export async function enqueueAnalyticsAggregation(
queue: PriorityQueue<AnalyticsAggregationPayload>,
payload: AnalyticsAggregationPayload,
): Promise<QueueJob<AnalyticsAggregationPayload>> {
return queue.add(ANALYTICS_JOB_NAMES.AGGREGATE_DAILY, payload);
}

export async function enqueueMaintenanceCleanup(
scheduler: WeightedFairQueue,
olderThanDays: number,
): Promise<QueueJob<{ olderThanDays: number }>> {
return scheduler.spawnSubJob(ANALYTICS_JOB_PRIORITY, ANALYTICS_JOB_NAMES.MAINTENANCE_CLEANUP, {
olderThanDays,
});
}
6 changes: 6 additions & 0 deletions backend/analytics/jobs/mvRefreshJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
*
* Runs on a configurable interval (default 60 s for real-time views).
* Exposes a Prometheus-style metric for view freshness monitoring.
*
* Queue priority: low (analytics / maintenance class).
*/

import { QueryClient } from '../../../backend/shared/query/queryRouter';
import type { PriorityClass } from '../../shared/queue';

/** Background queue priority for MV refresh work. */
export const MV_REFRESH_JOB_PRIORITY: PriorityClass = 'low';

// ── View definitions ──────────────────────────────────────────────────────────

Expand Down
75 changes: 75 additions & 0 deletions backend/billing/jobs/__tests__/billingJobQueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { BillingJobQueue } from '../billingJobQueue';
import {
PAYMENT_JOB_NAMES,
type PaymentConfirmationPayload,
} from '../paymentConfirmationJob';
import type { QueueJob } from '../../../shared/queue';

describe('BillingJobQueue', () => {
it('enqueues and processes payment confirmation end-to-end', async () => {
const sent: string[] = [];
const queue = new BillingJobQueue({
connection: { host: 'localhost', port: 6379 },
paymentHandlers: {
[PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: async (job: QueueJob<PaymentConfirmationPayload>) => {
sent.push(job.data.transactionId);
},
},
});

await queue.schedulePaymentConfirmation({
subscriptionId: 'sub_1',
subscriberId: 'user_1',
merchantId: 'merch_1',
amount: 29.99,
currency: 'USD',
transactionId: 'tx_abc123',
});

const processed = await queue.processNext();
expect(processed).toBe(true);
expect(sent).toEqual(['tx_abc123']);
expect(queue.scheduler.getStats().critical.totalProcessed).toBe(1);

await queue.close();
});

it('processes critical payment before low analytics under load', async () => {
const order: string[] = [];
const queue = new BillingJobQueue({
connection: { host: 'localhost', port: 6379 },
paymentHandlers: {
[PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: async (job: QueueJob<PaymentConfirmationPayload>) => {
order.push(`pay:${job.data.transactionId}`);
},
},
});

const scheduler = queue.scheduler;

for (let i = 0; i < 5; i++) {
await scheduler.enqueue('low', 'analytics:aggregate', { i });
}
await queue.schedulePaymentConfirmation({
subscriptionId: 'sub_1',
subscriberId: 'user_1',
merchantId: 'merch_1',
amount: 10,
currency: 'USD',
transactionId: 'tx_urgent',
});

await scheduler.processNext({
[PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: async (job: QueueJob<PaymentConfirmationPayload>) => {
order.push(`pay:${job.data.transactionId}`);
},
'analytics:aggregate': async (job: QueueJob<{ i: number }>) => {
order.push(`analytics:${job.data.i}`);
},
});

expect(order[0]).toBe('pay:tx_urgent');

await queue.close();
});
});
84 changes: 84 additions & 0 deletions backend/billing/jobs/billingJobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Billing job queue — wires payment/dunning jobs to the WFQ scheduler.
*/

import type { ConnectionOptions } from 'bullmq';
import {
createJobQueueSystem,
type JobHandlerMap,
type JobQueueSystem,
type QueueJob,
type WeightedFairQueue,
} from '../../shared/queue';
import { createPaymentJobHandlers } from './paymentJobHandlers';
import {
PAYMENT_JOB_PRIORITY,
type ChargeSettlementPayload,
type PaymentConfirmationPayload,
enqueueChargeSettlement,
enqueuePaymentConfirmation,
} from './paymentConfirmationJob';

export interface BillingJobQueueConfig {
connection: ConnectionOptions;
baseQueueName?: string;
maxQueueSize?: number;
paymentHandlers?: JobHandlerMap;
}

export class BillingJobQueue {
private readonly system: JobQueueSystem;
private readonly handlers: JobHandlerMap;

constructor(config: BillingJobQueueConfig) {
this.system = createJobQueueSystem({
connection: config.connection,
baseQueueName: config.baseQueueName ?? 'subtrackr:billing',
maxQueueSize: config.maxQueueSize,
});
this.handlers = config.paymentHandlers ?? createPaymentJobHandlers();
}

get scheduler(): WeightedFairQueue {
return this.system.scheduler;
}

/** Enqueue a payment confirmation at critical priority (with auto backpressure). */
async schedulePaymentConfirmation(
payload: PaymentConfirmationPayload,
): Promise<QueueJob<PaymentConfirmationPayload>> {
return this.system.scheduler.enqueue(
PAYMENT_JOB_PRIORITY,
'payment:confirmation-email',
payload,
);
}

/** Enqueue charge settlement at critical priority. */
async scheduleChargeSettlement(
payload: ChargeSettlementPayload,
): Promise<QueueJob<ChargeSettlementPayload>> {
return this.system.scheduler.enqueue(PAYMENT_JOB_PRIORITY, 'payment:charge-settlement', payload);
}

/** Process one job from the queue. */
async processNext(): Promise<boolean> {
return this.system.scheduler.processNext(this.handlers);
}

/** Start the background worker loop. */
start(intervalMs = 50): void {
this.system.scheduler.startProcessing(this.handlers, intervalMs);
}

stop(): void {
this.system.scheduler.stopProcessing();
}

async close(): Promise<void> {
await this.system.scheduler.close();
}
}

// Re-export convenience enqueue functions that use WFQ
export { enqueuePaymentConfirmation, enqueueChargeSettlement };
40 changes: 40 additions & 0 deletions backend/billing/jobs/dunningJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Dunning background jobs.
*
* Payment recovery and dunning communications — critical priority.
*/

import type { PriorityClass, PriorityQueue, QueueJob, WeightedFairQueue } from '../../shared/queue';

export const DUNNING_JOB_PRIORITY: PriorityClass = 'critical';

export const DUNNING_JOB_NAMES = {
SEND_REMINDER: 'dunning:send-reminder',
ESCALATE_STAGE: 'dunning:escalate-stage',
SUSPEND_SUBSCRIPTION: 'dunning:suspend-subscription',
} as const;

export type DunningJobName = (typeof DUNNING_JOB_NAMES)[keyof typeof DUNNING_JOB_NAMES];

export interface DunningReminderPayload {
subscriptionId: string;
subscriberId: string;
merchantId: string;
stage: string;
failedAttempts: number;
}

export async function enqueueDunningReminder(
queue: PriorityQueue<DunningReminderPayload>,
payload: DunningReminderPayload,
): Promise<QueueJob<DunningReminderPayload>> {
return queue.add(DUNNING_JOB_NAMES.SEND_REMINDER, payload);
}

export async function enqueueDunningEscalation(
scheduler: WeightedFairQueue,
payload: DunningReminderPayload,
parentPriority: PriorityClass = DUNNING_JOB_PRIORITY,
): Promise<QueueJob<DunningReminderPayload>> {
return scheduler.spawnSubJob(parentPriority, DUNNING_JOB_NAMES.ESCALATE_STAGE, payload);
}
20 changes: 20 additions & 0 deletions backend/billing/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export {
PAYMENT_JOB_PRIORITY,
PAYMENT_JOB_NAMES,
enqueuePaymentConfirmation,
enqueueChargeSettlement,
} from './paymentConfirmationJob';
export type {
PaymentConfirmationPayload,
ChargeSettlementPayload,
PaymentJobName,
} from './paymentConfirmationJob';

export { DUNNING_JOB_PRIORITY, DUNNING_JOB_NAMES, enqueueDunningReminder, enqueueDunningEscalation } from './dunningJob';
export type { DunningReminderPayload, DunningJobName } from './dunningJob';

export { createPaymentJobHandlers, defaultPaymentConfirmationHandler } from './paymentJobHandlers';
export type { PaymentConfirmationHandler, PaymentConfirmationResult } from './paymentJobHandlers';

export { BillingJobQueue } from './billingJobQueue';
export type { BillingJobQueueConfig } from './billingJobQueue';
45 changes: 45 additions & 0 deletions backend/billing/jobs/paymentConfirmationJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Payment confirmation background jobs.
*
* Time-sensitive billing notifications — scheduled at critical priority.
*/

import type { PriorityClass, PriorityQueue, QueueJob } from '../../shared/queue';

export const PAYMENT_JOB_PRIORITY: PriorityClass = 'critical';

export const PAYMENT_JOB_NAMES = {
CONFIRMATION_EMAIL: 'payment:confirmation-email',
RECEIPT_GENERATION: 'payment:receipt-generation',
CHARGE_SETTLEMENT: 'payment:charge-settlement',
} as const;

export type PaymentJobName = (typeof PAYMENT_JOB_NAMES)[keyof typeof PAYMENT_JOB_NAMES];

export interface PaymentConfirmationPayload {
subscriptionId: string;
subscriberId: string;
merchantId: string;
amount: number;
currency: string;
transactionId: string;
}

export interface ChargeSettlementPayload {
batchRunId: string;
subscriptionIds: string[];
}

export async function enqueuePaymentConfirmation(
queue: PriorityQueue<PaymentConfirmationPayload>,
payload: PaymentConfirmationPayload,
): Promise<QueueJob<PaymentConfirmationPayload>> {
return queue.add(PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL, payload);
}

export async function enqueueChargeSettlement(
queue: PriorityQueue<ChargeSettlementPayload>,
payload: ChargeSettlementPayload,
): Promise<QueueJob<ChargeSettlementPayload>> {
return queue.add(PAYMENT_JOB_NAMES.CHARGE_SETTLEMENT, payload);
}
57 changes: 57 additions & 0 deletions backend/billing/jobs/paymentJobHandlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Payment job handlers — executed by the WFQ worker loop.
*/

import type { JobHandlerMap, QueueJob } from '../../shared/queue';
import {
PAYMENT_JOB_NAMES,
type ChargeSettlementPayload,
type PaymentConfirmationPayload,
} from './paymentConfirmationJob';

export interface PaymentConfirmationResult {
transactionId: string;
sentAt: number;
channel: 'email';
}

export type PaymentConfirmationHandler = (
payload: PaymentConfirmationPayload,
) => Promise<PaymentConfirmationResult>;

/** Default handler — logs confirmation; swap for real email service in production. */
export const defaultPaymentConfirmationHandler: PaymentConfirmationHandler = async (payload) => {
console.info(
`[PaymentJob] Sending confirmation email for tx ${payload.transactionId} ` +
`(subscriber=${payload.subscriberId}, amount=${payload.amount} ${payload.currency})`,
);
return {
transactionId: payload.transactionId,
sentAt: Date.now(),
channel: 'email',
};
};

async function handleConfirmationEmail(
job: QueueJob<PaymentConfirmationPayload>,
sendFn: PaymentConfirmationHandler,
): Promise<void> {
await sendFn(job.data);
}

async function handleChargeSettlement(job: QueueJob<ChargeSettlementPayload>): Promise<void> {
console.info(
`[PaymentJob] Settling batch ${job.data.batchRunId} (${job.data.subscriptionIds.length} subscriptions)`,
);
}

export function createPaymentJobHandlers(
sendConfirmation: PaymentConfirmationHandler = defaultPaymentConfirmationHandler,
): JobHandlerMap {
return {
[PAYMENT_JOB_NAMES.CONFIRMATION_EMAIL]: (job) =>
handleConfirmationEmail(job as QueueJob<PaymentConfirmationPayload>, sendConfirmation),
[PAYMENT_JOB_NAMES.CHARGE_SETTLEMENT]: (job) =>
handleChargeSettlement(job as QueueJob<ChargeSettlementPayload>),
};
}
Loading
Loading