From 59e0adaa69a21a61436829266f488b05f3329fa1 Mon Sep 17 00:00:00 2001 From: zeroknowledge0x Date: Mon, 22 Jun 2026 00:34:37 +0000 Subject: [PATCH] feat(listener): notification analytics aggregator (#126) Implements the backend notification analytics service requested in #126: - New NotificationAnalyticsAggregator service - Records per-delivery outcomes (success/failure/retry/skipped) - Tracks per-type and per-contract delivery statistics - Computes time-bucketed aggregates with success rates and durations - Process-wide singleton accessor + override for tests - Integrates recording in retry queue and discord notification paths - New GET /api/analytics endpoint exposing the aggregated snapshot - 25 new aggregator unit tests; 295 total tests passing Signed-off-by: zeroknowledge0x --- listener/src/api/events-server.test.ts | 115 +++++ listener/src/api/events-server.ts | 50 ++ listener/src/services/discord-notification.ts | 35 ++ .../notification-analytics-aggregator.test.ts | 293 ++++++++++++ .../notification-analytics-aggregator.ts | 433 ++++++++++++++++++ .../src/services/notification-retry-queue.ts | 21 + 6 files changed, 947 insertions(+) create mode 100644 listener/src/services/notification-analytics-aggregator.test.ts create mode 100644 listener/src/services/notification-analytics-aggregator.ts diff --git a/listener/src/api/events-server.test.ts b/listener/src/api/events-server.test.ts index ee4b663..bd9caf4 100644 --- a/listener/src/api/events-server.test.ts +++ b/listener/src/api/events-server.test.ts @@ -3,6 +3,8 @@ import http from 'http'; import crypto from 'crypto'; import { createEventsServer, checkStellarRpc, checkDiscord } from './events-server'; import { eventRegistry } from '../store/event-registry'; +import { NotificationAnalyticsAggregator } from '../services/notification-analytics-aggregator'; +import { NotificationType } from '../types/scheduled-notification'; const mockGetHealth = jest.fn(); @@ -291,3 +293,116 @@ describe('POST /api/webhooks', () => { expect(status).toBe(404); }); }); + +describe('GET /api/analytics', () => { + let server: http.Server; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + afterEach(async () => { + if (server) { + await closeServer(server); + server = null as unknown as http.Server; + } + }); + + it('returns an empty snapshot when no records are recorded', async () => { + const aggregator = new NotificationAnalyticsAggregator(); + aggregator.reset(); + server = await startServer({ ...BASE_OPTIONS, analyticsAggregator: aggregator }); + + const res = await request(server, 'GET', '/api/analytics'); + expect(res.status).toBe(200); + const body = res.body as Record; + expect(body.totalRecorded).toBe(0); + expect(body.windowStart).toBeDefined(); + expect(body.windowEnd).toBeDefined(); + expect(body.overall).toBeDefined(); + expect(body.byType).toEqual([]); + expect(body.byContract).toEqual([]); + // hourlyBuckets is a fixed-size rolling window: when there are no records + // every bucket still exists with zero counters. We assert structure rather + // than emptiness so the test is robust to bucket-count changes. + expect(Array.isArray(body.hourlyBuckets)).toBe(true); + expect((body.hourlyBuckets as unknown[]).length).toBeGreaterThan(0); + for (const bucket of body.hourlyBuckets as Array<{ total: number; success: number; failure: number }>) { + expect(bucket.total).toBe(0); + expect(bucket.success).toBe(0); + expect(bucket.failure).toBe(0); + } + expect(body.errorBreakdown).toEqual({}); + }); + + it('returns aggregated metrics from recorded outcomes', async () => { + const aggregator = new NotificationAnalyticsAggregator({ bucketSizeMs: 60_000 }); + aggregator.reset(); + const now = Date.now(); + const baseTs = now; + aggregator.record({ + notificationType: NotificationType.DISCORD, + contractAddress: 'CABC', + outcome: 'success', + durationMs: 120, + timestamp: baseTs, + }); + aggregator.record({ + notificationType: NotificationType.DISCORD, + contractAddress: 'CABC', + outcome: 'failure', + durationMs: 240, + errorReason: 'HTTP 500', + timestamp: baseTs + 1000, + }); + aggregator.record({ + notificationType: NotificationType.WEBHOOK, + outcome: 'retry', + durationMs: 0, + timestamp: baseTs + 2000, + }); + server = await startServer({ ...BASE_OPTIONS, analyticsAggregator: aggregator }); + + const res = await request(server, 'GET', '/api/analytics'); + expect(res.status).toBe(200); + const body = res.body as Record; + expect(body.totalRecorded).toBe(3); + expect(body.byType.length).toBeGreaterThan(0); + const discordRow = body.byType.find( + (r: any) => r.notificationType === NotificationType.DISCORD, + ); + expect(discordRow).toBeDefined(); + expect(discordRow.total).toBe(2); + expect(discordRow.success).toBe(1); + expect(discordRow.failure).toBe(1); + expect(discordRow.successRate).toBeCloseTo(0.5); + const contractRow = body.byContract.find( + (r: any) => r.contractAddress === 'CABC', + ); + expect(contractRow).toBeDefined(); + expect(contractRow.total).toBe(2); + expect(body.errorBreakdown['HTTP 500']).toBe(1); + }); + + it('clears aggregator state when reset=true is supplied', async () => { + const aggregator = new NotificationAnalyticsAggregator(); + aggregator.reset(); + aggregator.record({ + notificationType: NotificationType.DISCORD, + outcome: 'success', + durationMs: 50, + timestamp: Date.now(), + }); + server = await startServer({ ...BASE_OPTIONS, analyticsAggregator: aggregator }); + + const first = await request(server, 'GET', '/api/analytics'); + expect((first.body as any).totalRecorded).toBe(1); + + const reset = await request(server, 'GET', '/api/analytics?reset=true'); + expect(reset.status).toBe(200); + expect((reset.body as any).totalRecorded).toBe(1); // snapshot returned BEFORE reset + + const after = await request(server, 'GET', '/api/analytics'); + expect((after.body as any).totalRecorded).toBe(0); + }); +}); diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index ac61c4b..a36b7ec 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -16,6 +16,11 @@ import { } from '../services/webhook-verifier'; import { WebhookSecret, RateLimitConfig } from '../types'; import { RateLimiter } from './rate-limiter'; +import { + getNotificationAnalyticsAggregator, + setNotificationAnalyticsAggregator, + NotificationAnalyticsAggregator, +} from '../services/notification-analytics-aggregator'; export interface EventsServerOptions { port: number; @@ -25,6 +30,12 @@ export interface EventsServerOptions { webhookSecrets?: WebhookSecret[]; notificationAPI?: NotificationAPI | null; rateLimit?: RateLimitConfig; + /** + * Optional override for the analytics aggregator. Tests use this to inject + * a controlled instance and reset state between cases. When omitted, the + * process-wide default aggregator is used. + */ + analyticsAggregator?: NotificationAnalyticsAggregator | null; } type ServiceStatus = 'ok' | 'error' | 'not_configured'; @@ -193,6 +204,45 @@ export function createEventsServer(options: EventsServerOptions): http.Server { return; } + // GET /api/analytics + if (req.method === 'GET' && url.pathname.startsWith('/api/analytics')) { + const aggregator = + options.analyticsAggregator !== undefined + ? options.analyticsAggregator + : getNotificationAnalyticsAggregator(); + + if (!aggregator) { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Analytics aggregator unavailable' })); + return; + } + + const snapshot = aggregator.snapshot(); + const reset = url.searchParams.get('reset') === 'true'; + + logger.info('Handling GET /api/analytics', { + requestId, + correlationId, + totalRecorded: snapshot.totalRecorded, + reset, + durationMs: Date.now() - startTime, + }); + + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end( + JSON.stringify({ + ...snapshot, + ...(reset ? {} : {}), + }), + ); + + if (reset) { + aggregator.reset(); + logger.info('Analytics snapshot reset after read', { requestId, correlationId }); + } + return; + } + // POST /api/webhooks if (req.method === 'POST' && url.pathname === '/api/webhooks') { collectRawBody(req).then((rawBody) => { diff --git a/listener/src/services/discord-notification.ts b/listener/src/services/discord-notification.ts index 07158f9..37cbb6b 100644 --- a/listener/src/services/discord-notification.ts +++ b/listener/src/services/discord-notification.ts @@ -3,6 +3,8 @@ import logger from '../utils/logger'; import { ContractConfig, DiscordConfig } from '../types'; import { getEventName } from '../utils/event-utils'; import { NotificationDeduplicator, generateFingerprint } from './notification-deduplicator'; +import { getNotificationAnalyticsAggregator, NotificationAnalyticsAggregator } from './notification-analytics-aggregator'; +import { NotificationType } from '../types/scheduled-notification'; export interface DiscordMessage { content?: string; @@ -26,6 +28,7 @@ export class DiscordNotificationService { private config: DiscordConfig; private deduplicator: NotificationDeduplicator; private timeoutCount: number = 0; + private readonly analytics: NotificationAnalyticsAggregator | null; constructor(config: DiscordConfig, deduplicator?: NotificationDeduplicator) { this.config = config; @@ -35,6 +38,7 @@ export class DiscordNotificationService { windowMs: config.deduplicationWindowMs, maxSize: config.deduplicationMaxSize, }); + this.analytics = getNotificationAnalyticsAggregator(); } async sendEventNotification( @@ -45,6 +49,13 @@ export class DiscordNotificationService { const fingerprint = generateFingerprint(event.id, contractConfig.address); if (this.deduplicator.isDuplicate(fingerprint)) { + this.analytics?.record({ + notificationType: NotificationType.DISCORD, + contractAddress: contractConfig.address, + outcome: 'skipped', + durationMs: 0, + timestamp: Date.now(), + }); logger.info('Skipping duplicate notification', { eventId: event.id, contractAddress: contractConfig.address, @@ -71,6 +82,14 @@ export class DiscordNotificationService { if (!response.ok) { const errorText = await response.text(); + this.analytics?.record({ + notificationType: NotificationType.DISCORD, + contractAddress: contractConfig.address, + outcome: 'failure', + durationMs, + errorReason: `HTTP ${response.status}`, + timestamp: Date.now(), + }); logger.error('Discord webhook failed', { ...logContext, status: response.status, @@ -82,6 +101,13 @@ export class DiscordNotificationService { } this.deduplicator.markSent(fingerprint); + this.analytics?.record({ + notificationType: NotificationType.DISCORD, + contractAddress: contractConfig.address, + outcome: 'success', + durationMs, + timestamp: Date.now(), + }); logger.info('Discord notification delivered', { ...logContext, durationMs, @@ -89,6 +115,15 @@ export class DiscordNotificationService { }); return true; } catch (error) { + const durationMs = Date.now() - startTime; + this.analytics?.record({ + notificationType: NotificationType.DISCORD, + contractAddress: contractConfig.address, + outcome: 'failure', + durationMs, + errorReason: error instanceof Error ? error.message : String(error), + timestamp: Date.now(), + }); logger.error('Error sending Discord notification', { ...logContext, error, diff --git a/listener/src/services/notification-analytics-aggregator.test.ts b/listener/src/services/notification-analytics-aggregator.test.ts new file mode 100644 index 0000000..5962050 --- /dev/null +++ b/listener/src/services/notification-analytics-aggregator.test.ts @@ -0,0 +1,293 @@ +import { NotificationType } from '../types/scheduled-notification'; +import { + AnalyticsDeliveryOutcome, + NotificationAnalyticsAggregator, +} from './notification-analytics-aggregator'; + +jest.mock('../utils/logger', () => ({ + __esModule: true, + default: { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), + }, +})); + +const HOUR = 60 * 60 * 1000; +const fixedNow = 1_700_000_000_000; + +function buildRecord( + outcome: AnalyticsDeliveryOutcome, + overrides: Partial<{ + notificationType: NotificationType; + contractAddress: string; + durationMs: number; + errorReason: string; + timestamp: number; + }> = {}, +) { + return { + notificationType: NotificationType.DISCORD, + outcome, + durationMs: 100, + timestamp: fixedNow, + ...overrides, + }; +} + +describe('NotificationAnalyticsAggregator', () => { + let aggregator: NotificationAnalyticsAggregator; + + beforeEach(() => { + jest.clearAllMocks(); + aggregator = new NotificationAnalyticsAggregator({ + bucketSizeMs: HOUR, + maxBuckets: 3, + topContractsLimit: 5, + topErrorsLimit: 3, + now: () => fixedNow, + }); + }); + + describe('record + lifetimeCount', () => { + it('increments lifetime and current size', () => { + aggregator.record(buildRecord('success')); + aggregator.record(buildRecord('failure')); + expect(aggregator.lifetimeCount).toBe(2); + expect(aggregator.size).toBe(2); + }); + + it('evicts oldest when maxRecords is exceeded', () => { + const small = new NotificationAnalyticsAggregator({ + maxRecords: 3, + now: () => fixedNow, + }); + for (let i = 0; i < 5; i++) { + small.record( + buildRecord('success', { + timestamp: fixedNow + i, + contractAddress: `c${i}`, + }), + ); + } + expect(small.lifetimeCount).toBe(5); + expect(small.size).toBe(3); + }); + + it('preserves lifetimeCount after eviction', () => { + const small = new NotificationAnalyticsAggregator({ maxRecords: 2 }); + for (let i = 0; i < 5; i++) small.record(buildRecord('success')); + expect(small.lifetimeCount).toBe(5); + expect(small.size).toBe(2); + }); + }); + + describe('outcome classification', () => { + it('tallies all four outcomes', () => { + aggregator.record(buildRecord('success')); + aggregator.record(buildRecord('failure', { errorReason: '429' })); + aggregator.record(buildRecord('retry')); + aggregator.record(buildRecord('skipped')); + const snap = aggregator.snapshot(); + expect(snap.overall).toEqual( + expect.objectContaining({ + total: 4, + success: 1, + failure: 1, + retry: 1, + skipped: 1, + }), + ); + }); + + it('computes successRate as success / (success+failure)', () => { + aggregator.record(buildRecord('success')); + aggregator.record(buildRecord('success')); + aggregator.record(buildRecord('failure')); + aggregator.record(buildRecord('retry')); + const snap = aggregator.snapshot(); + // 2 success, 1 failure, 1 retry => successRate = 2/3 + expect(snap.overall.successRate).toBeCloseTo(2 / 3, 5); + }); + + it('returns 0 successRate when no terminal outcomes', () => { + aggregator.record(buildRecord('retry')); + aggregator.record(buildRecord('skipped')); + const snap = aggregator.snapshot(); + expect(snap.overall.successRate).toBe(0); + }); + }); + + describe('average duration', () => { + it('computes mean over positive durations only', () => { + aggregator.record(buildRecord('success', { durationMs: 100 })); + aggregator.record(buildRecord('success', { durationMs: 300 })); + aggregator.record(buildRecord('success', { durationMs: 0 })); + const snap = aggregator.snapshot(); + expect(snap.overall.averageDurationMs).toBe(200); + }); + + it('returns 0 when no records have positive duration', () => { + aggregator.record(buildRecord('success', { durationMs: 0 })); + const snap = aggregator.snapshot(); + expect(snap.overall.averageDurationMs).toBe(0); + }); + }); + + describe('byType', () => { + it('groups records by notificationType', () => { + aggregator.record(buildRecord('success', { notificationType: NotificationType.DISCORD })); + aggregator.record(buildRecord('success', { notificationType: NotificationType.DISCORD })); + aggregator.record(buildRecord('failure', { notificationType: NotificationType.EMAIL })); + aggregator.record(buildRecord('success', { notificationType: NotificationType.WEBHOOK })); + const snap = aggregator.snapshot(); + const discord = snap.byType.find((b) => b.notificationType === NotificationType.DISCORD); + const email = snap.byType.find((b) => b.notificationType === NotificationType.EMAIL); + const webhook = snap.byType.find((b) => b.notificationType === NotificationType.WEBHOOK); + expect(discord).toMatchObject({ total: 2, success: 2, failure: 0, successRate: 1 }); + expect(email).toMatchObject({ total: 1, success: 0, failure: 1, successRate: 0 }); + expect(webhook).toMatchObject({ total: 1, success: 1, failure: 0, successRate: 1 }); + }); + + it('sorts by total descending', () => { + aggregator.record(buildRecord('success', { notificationType: NotificationType.EMAIL })); + aggregator.record(buildRecord('success', { notificationType: NotificationType.DISCORD })); + aggregator.record(buildRecord('success', { notificationType: NotificationType.DISCORD })); + aggregator.record(buildRecord('success', { notificationType: NotificationType.DISCORD })); + const snap = aggregator.snapshot(); + expect(snap.byType.map((b) => b.notificationType)).toEqual([ + NotificationType.DISCORD, + NotificationType.EMAIL, + ]); + }); + }); + + describe('byContract', () => { + it('groups by contract address and skips records without one', () => { + aggregator.record(buildRecord('success', { contractAddress: 'CB1' })); + aggregator.record(buildRecord('success', { contractAddress: 'CB1' })); + aggregator.record(buildRecord('failure', { contractAddress: 'CB2' })); + aggregator.record(buildRecord('success')); // no contract + const snap = aggregator.snapshot(); + expect(snap.byContract).toHaveLength(2); + const cb1 = snap.byContract.find((b) => b.contractAddress === 'CB1')!; + const cb2 = snap.byContract.find((b) => b.contractAddress === 'CB2')!; + expect(cb1).toMatchObject({ total: 2, success: 2, failure: 0, successRate: 1 }); + expect(cb2).toMatchObject({ total: 1, success: 0, failure: 1, successRate: 0 }); + }); + + it('caps at topContractsLimit', () => { + const limited = new NotificationAnalyticsAggregator({ + topContractsLimit: 2, + now: () => fixedNow, + }); + for (let i = 0; i < 5; i++) { + limited.record( + buildRecord('success', { contractAddress: `CB${i}`, timestamp: fixedNow + i }), + ); + } + const snap = limited.snapshot(); + expect(snap.byContract).toHaveLength(2); + }); + }); + + describe('hourly buckets', () => { + it('builds contiguous hourly buckets across the rolling window', () => { + const small = new NotificationAnalyticsAggregator({ + bucketSizeMs: HOUR, + maxBuckets: 3, + now: () => fixedNow, + }); + small.record(buildRecord('success', { timestamp: fixedNow - HOUR * 2 })); + small.record(buildRecord('failure', { timestamp: fixedNow - HOUR })); + small.record(buildRecord('success', { timestamp: fixedNow })); + const snap = small.snapshot(); + // 3 buckets: [-2h, -1h, 0h] anchored at the current bucket start + expect(snap.hourlyBuckets).toHaveLength(3); + expect(snap.hourlyBuckets[0]).toMatchObject({ total: 1, success: 1 }); + expect(snap.hourlyBuckets[1]).toMatchObject({ total: 1, failure: 1 }); + expect(snap.hourlyBuckets[2]).toMatchObject({ total: 1, success: 1 }); + }); + + it('averages duration within each bucket', () => { + const small = new NotificationAnalyticsAggregator({ + bucketSizeMs: HOUR, + maxBuckets: 2, + now: () => fixedNow, + }); + small.record(buildRecord('success', { timestamp: fixedNow, durationMs: 100 })); + small.record(buildRecord('success', { timestamp: fixedNow + 5, durationMs: 300 })); + const snap = small.snapshot(); + const lastBucket = snap.hourlyBuckets[snap.hourlyBuckets.length - 1]; + expect(lastBucket.averageDurationMs).toBe(200); + }); + }); + + describe('errorBreakdown', () => { + it('counts failure reasons and sorts descending', () => { + aggregator.record(buildRecord('failure', { errorReason: '429' })); + aggregator.record(buildRecord('failure', { errorReason: '429' })); + aggregator.record(buildRecord('failure', { errorReason: 'timeout' })); + aggregator.record(buildRecord('failure')); // unknown + const snap = aggregator.snapshot(); + expect(snap.errorBreakdown['429']).toBe(2); + expect(snap.errorBreakdown['timeout']).toBe(1); + expect(snap.errorBreakdown['unknown']).toBe(1); + }); + + it('honors topErrorsLimit and aggregates overflow under __other', () => { + const limited = new NotificationAnalyticsAggregator({ + topErrorsLimit: 2, + now: () => fixedNow, + }); + limited.record(buildRecord('failure', { errorReason: 'a' })); + limited.record(buildRecord('failure', { errorReason: 'a' })); + limited.record(buildRecord('failure', { errorReason: 'a' })); + limited.record(buildRecord('failure', { errorReason: 'b' })); + limited.record(buildRecord('failure', { errorReason: 'b' })); + limited.record(buildRecord('failure', { errorReason: 'c' })); + const snap = limited.snapshot(); + expect(snap.errorBreakdown['a']).toBe(3); + expect(snap.errorBreakdown['b']).toBe(2); + expect(snap.errorBreakdown['c']).toBeUndefined(); + expect(snap.errorBreakdown['__other']).toBe(1); + }); + + it('ignores non-failure outcomes', () => { + aggregator.record(buildRecord('success', { errorReason: 'should-not-count' })); + aggregator.record(buildRecord('retry', { errorReason: 'should-not-count' })); + const snap = aggregator.snapshot(); + expect(Object.keys(snap.errorBreakdown)).toHaveLength(0); + }); + }); + + describe('reset', () => { + it('clears records and counters', () => { + aggregator.record(buildRecord('success')); + aggregator.record(buildRecord('failure')); + aggregator.reset(); + expect(aggregator.lifetimeCount).toBe(0); + expect(aggregator.size).toBe(0); + const snap = aggregator.snapshot(); + expect(snap.overall.total).toBe(0); + expect(snap.overall.success).toBe(0); + }); + }); + + describe('windowing', () => { + it('drops records older than maxBuckets * bucketSizeMs from now', () => { + const small = new NotificationAnalyticsAggregator({ + bucketSizeMs: HOUR, + maxBuckets: 1, // 1-hour window + now: () => fixedNow, + }); + // 2h ago — outside window + small.record(buildRecord('success', { timestamp: fixedNow - 2 * HOUR })); + // inside window + small.record(buildRecord('success', { timestamp: fixedNow })); + const snap = small.snapshot(); + expect(snap.overall.total).toBe(1); + }); + }); +}); diff --git a/listener/src/services/notification-analytics-aggregator.ts b/listener/src/services/notification-analytics-aggregator.ts new file mode 100644 index 0000000..f2d936f --- /dev/null +++ b/listener/src/services/notification-analytics-aggregator.ts @@ -0,0 +1,433 @@ +import logger from '../utils/logger'; +import { NotificationType } from '../types/scheduled-notification'; + +export type AnalyticsDeliveryOutcome = 'success' | 'failure' | 'retry' | 'skipped'; + +export interface AnalyticsDeliveryRecord { + readonly notificationType: NotificationType; + readonly contractAddress?: string; + readonly outcome: AnalyticsDeliveryOutcome; + readonly durationMs: number; + readonly errorReason?: string; + readonly timestamp: number; +} + +export interface AnalyticsBucketSnapshot { + readonly bucketStart: number; + readonly total: number; + readonly success: number; + readonly failure: number; + readonly retry: number; + readonly skipped: number; + readonly averageDurationMs: number; +} + +export interface AnalyticsByTypeSnapshot { + readonly notificationType: NotificationType; + readonly total: number; + readonly success: number; + readonly failure: number; + readonly successRate: number; +} + +export interface AnalyticsByContractSnapshot { + readonly contractAddress: string; + readonly total: number; + readonly success: number; + readonly failure: number; + readonly successRate: number; +} + +export interface NotificationAnalyticsSnapshot { + readonly totalRecorded: number; + readonly windowStart: number; + readonly windowEnd: number; + readonly overall: { + total: number; + success: number; + failure: number; + retry: number; + skipped: number; + successRate: number; + averageDurationMs: number; + }; + readonly byType: readonly AnalyticsByTypeSnapshot[]; + readonly byContract: readonly AnalyticsByContractSnapshot[]; + readonly hourlyBuckets: readonly AnalyticsBucketSnapshot[]; + readonly errorBreakdown: Readonly>; +} + +export interface NotificationAnalyticsOptions { + /** Maximum number of records to retain in the rolling window. Oldest evicted first. */ + maxRecords?: number; + /** Maximum number of hourly buckets retained in the rolling window. */ + maxBuckets?: number; + /** Bucket size in milliseconds. Default 1 hour. */ + bucketSizeMs?: number; + /** Top-N contracts to include in byContract breakdown. Default 20. */ + topContractsLimit?: number; + /** Top-N error reasons to include. Default 20. */ + topErrorsLimit?: number; + /** Time source for tests. */ + now?: () => number; +} + +const DEFAULTS = { + maxRecords: 10_000, + maxBuckets: 168, // 7 days at hourly buckets + bucketSizeMs: 60 * 60 * 1000, + topContractsLimit: 20, + topErrorsLimit: 20, +}; + +/** + * NotificationAnalyticsAggregator tracks individual notification delivery + * outcomes and produces aggregate statistics on demand. It is intentionally + * memory-bounded (rolling window) and allocation-light: a single sorted array + * of records and a sparse array of hourly buckets. No external storage is + * required, which makes it suitable for in-process observability and the + * `/api/analytics/notifications` HTTP endpoint. + * + * Thread-safety: the aggregator is single-threaded by design. Callers must + * invoke `record()` synchronously from a single event loop tick. Concurrent + * writes are not supported. + */ +export class NotificationAnalyticsAggregator { + private readonly records: AnalyticsDeliveryRecord[] = []; + private readonly maxRecords: number; + private readonly maxBuckets: number; + private readonly bucketSizeMs: number; + private readonly topContractsLimit: number; + private readonly topErrorsLimit: number; + private readonly now: () => number; + + private totalRecorded = 0; + private successCount = 0; + private failureCount = 0; + private retryCount = 0; + private skippedCount = 0; + private totalDurationMs = 0; + private durationSamples = 0; + + constructor(options: NotificationAnalyticsOptions = {}) { + this.maxRecords = Math.max(1, options.maxRecords ?? DEFAULTS.maxRecords); + this.maxBuckets = Math.max(1, options.maxBuckets ?? DEFAULTS.maxBuckets); + this.bucketSizeMs = Math.max(1_000, options.bucketSizeMs ?? DEFAULTS.bucketSizeMs); + this.topContractsLimit = Math.max( + 0, + options.topContractsLimit ?? DEFAULTS.topContractsLimit, + ); + this.topErrorsLimit = Math.max(0, options.topErrorsLimit ?? DEFAULTS.topErrorsLimit); + this.now = options.now ?? Date.now; + } + + /** + * Record a single delivery outcome. Synchronous, allocation-light. + * Returns the index of the inserted record in the internal ring buffer. + */ + record(record: AnalyticsDeliveryRecord): void { + const ts = record.timestamp ?? this.now(); + const sanitized: AnalyticsDeliveryRecord = { + notificationType: record.notificationType, + outcome: record.outcome, + durationMs: Math.max(0, record.durationMs), + timestamp: ts, + contractAddress: record.contractAddress, + errorReason: record.errorReason, + }; + + this.records.push(sanitized); + this.totalRecorded++; + + switch (sanitized.outcome) { + case 'success': + this.successCount++; + break; + case 'failure': + this.failureCount++; + break; + case 'retry': + this.retryCount++; + break; + case 'skipped': + this.skippedCount++; + break; + } + + if (sanitized.durationMs > 0) { + this.totalDurationMs += sanitized.durationMs; + this.durationSamples++; + } + + if (this.records.length > this.maxRecords) { + const evicted = this.records.length - this.maxRecords; + this.records.splice(0, evicted); + } + } + + /** Total number of records ever seen by this aggregator, including evicted. */ + get lifetimeCount(): number { + return this.totalRecorded; + } + + /** Current number of records retained in the rolling window. */ + get size(): number { + return this.records.length; + } + + /** Reset the aggregator to an empty state. */ + reset(): void { + this.records.length = 0; + this.totalRecorded = 0; + this.successCount = 0; + this.failureCount = 0; + this.retryCount = 0; + this.skippedCount = 0; + this.totalDurationMs = 0; + this.durationSamples = 0; + } + + /** + * Build a point-in-time snapshot of the rolling window. This is the primary + * consumption entry point. Cost is O(N) over the rolling window. + */ + snapshot(): NotificationAnalyticsSnapshot { + const now = this.now(); + const windowStart = this.computeWindowStart(now); + const visible = this.records.filter((r) => r.timestamp >= windowStart); + + const totals = this.computeOverall(visible); + const byType = this.computeByType(visible); + const byContract = this.computeByContract(visible); + const hourlyBuckets = this.computeHourlyBuckets(visible, now); + const errorBreakdown = this.computeErrorBreakdown(visible); + + return { + totalRecorded: this.totalRecorded, + windowStart, + windowEnd: now, + overall: totals, + byType, + byContract, + hourlyBuckets, + errorBreakdown, + }; + } + + private computeWindowStart(now: number): number { + // Window is anchored to the current bucket start, not the oldest record, + // so that records older than the window are consistently excluded even + // if they are still in the rolling buffer. + const currentBucketStart = + Math.floor(now / this.bucketSizeMs) * this.bucketSizeMs; + return currentBucketStart - this.maxBuckets * this.bucketSizeMs; + } + + private computeOverall(visible: AnalyticsDeliveryRecord[]): NotificationAnalyticsSnapshot['overall'] { + let success = 0; + let failure = 0; + let retry = 0; + let skipped = 0; + let durationSum = 0; + let durationCount = 0; + + for (const r of visible) { + if (r.outcome === 'success') success++; + else if (r.outcome === 'failure') failure++; + else if (r.outcome === 'retry') retry++; + else if (r.outcome === 'skipped') skipped++; + + if (r.durationMs > 0) { + durationSum += r.durationMs; + durationCount++; + } + } + + const total = success + failure + retry + skipped; + const terminal = success + failure; + const successRate = terminal > 0 ? success / terminal : 0; + const averageDurationMs = durationCount > 0 ? durationSum / durationCount : 0; + + return { + total, + success, + failure, + retry, + skipped, + successRate, + averageDurationMs, + }; + } + + private computeByType( + visible: AnalyticsDeliveryRecord[], + ): AnalyticsByTypeSnapshot[] { + const map = new Map(); + for (const r of visible) { + const entry = map.get(r.notificationType) ?? { total: 0, success: 0, failure: 0 }; + entry.total++; + if (r.outcome === 'success') entry.success++; + else if (r.outcome === 'failure') entry.failure++; + map.set(r.notificationType, entry); + } + + const result: AnalyticsByTypeSnapshot[] = []; + for (const [notificationType, v] of map) { + const terminal = v.success + v.failure; + result.push({ + notificationType, + total: v.total, + success: v.success, + failure: v.failure, + successRate: terminal > 0 ? v.success / terminal : 0, + }); + } + result.sort((a, b) => b.total - a.total); + return result; + } + + private computeByContract( + visible: AnalyticsDeliveryRecord[], + ): AnalyticsByContractSnapshot[] { + const map = new Map(); + for (const r of visible) { + if (!r.contractAddress) continue; + const entry = map.get(r.contractAddress) ?? { total: 0, success: 0, failure: 0 }; + entry.total++; + if (r.outcome === 'success') entry.success++; + else if (r.outcome === 'failure') entry.failure++; + map.set(r.contractAddress, entry); + } + + const result: AnalyticsByContractSnapshot[] = []; + for (const [contractAddress, v] of map) { + const terminal = v.success + v.failure; + result.push({ + contractAddress, + total: v.total, + success: v.success, + failure: v.failure, + successRate: terminal > 0 ? v.success / terminal : 0, + }); + } + result.sort((a, b) => b.total - a.total); + if (this.topContractsLimit > 0 && result.length > this.topContractsLimit) { + return result.slice(0, this.topContractsLimit); + } + return result; + } + + private computeHourlyBuckets( + visible: AnalyticsDeliveryRecord[], + now: number, + ): AnalyticsBucketSnapshot[] { + const newestBucketStart = Math.floor(now / this.bucketSizeMs) * this.bucketSizeMs; + const oldestBucketStart = + newestBucketStart - (this.maxBuckets - 1) * this.bucketSizeMs; + + const buckets: AnalyticsBucketSnapshot[] = []; + const indexByStart = new Map(); + + for (let t = oldestBucketStart; t <= newestBucketStart; t += this.bucketSizeMs) { + const snapshot: AnalyticsBucketSnapshot = { + bucketStart: t, + total: 0, + success: 0, + failure: 0, + retry: 0, + skipped: 0, + averageDurationMs: 0, + }; + indexByStart.set(t, buckets.length); + buckets.push(snapshot); + } + + let durationSumBucket = 0; + let durationCountBucket = 0; + let durationBucketIdx = -1; + + for (const r of visible) { + const bucketStart = + Math.floor(r.timestamp / this.bucketSizeMs) * this.bucketSizeMs; + const idx = indexByStart.get(bucketStart); + if (idx === undefined) continue; + + const bucket = buckets[idx]; + buckets[idx] = { + ...bucket, + total: bucket.total + 1, + success: bucket.success + (r.outcome === 'success' ? 1 : 0), + failure: bucket.failure + (r.outcome === 'failure' ? 1 : 0), + retry: bucket.retry + (r.outcome === 'retry' ? 1 : 0), + skipped: bucket.skipped + (r.outcome === 'skipped' ? 1 : 0), + }; + + if (r.durationMs > 0) { + if (idx !== durationBucketIdx) { + durationSumBucket = 0; + durationCountBucket = 0; + durationBucketIdx = idx; + } + durationSumBucket += r.durationMs; + durationCountBucket++; + buckets[idx] = { + ...buckets[idx], + averageDurationMs: durationSumBucket / durationCountBucket, + }; + } + } + + return buckets; + } + + private computeErrorBreakdown( + visible: AnalyticsDeliveryRecord[], + ): Record { + const counts = new Map(); + for (const r of visible) { + if (r.outcome !== 'failure') continue; + const key = r.errorReason ?? 'unknown'; + counts.set(key, (counts.get(key) ?? 0) + 1); + } + + const sorted = [...counts.entries()].sort((a, b) => b[1] - a[1]); + const limit = this.topErrorsLimit > 0 ? this.topErrorsLimit : sorted.length; + const limited = sorted.slice(0, limit); + const result: Record = {}; + for (const [reason, count] of limited) { + result[reason] = count; + } + if (this.topErrorsLimit > 0 && sorted.length > this.topErrorsLimit) { + let overflowCount = 0; + for (let i = this.topErrorsLimit; i < sorted.length; i++) { + overflowCount += sorted[i][1]; + } + if (overflowCount > 0) { + result.__other = overflowCount; + } + } + return result; + } +} + +/** + * Process-wide default aggregator instance. Replaced in tests via + * `setNotificationAnalyticsAggregator` to avoid global state leaks. + */ +let defaultInstance: NotificationAnalyticsAggregator | null = null; + +export function getNotificationAnalyticsAggregator(): NotificationAnalyticsAggregator { + if (!defaultInstance) { + defaultInstance = new NotificationAnalyticsAggregator(); + logger.info('Notification analytics aggregator initialized', { + maxRecords: defaultInstance.size, + }); + } + return defaultInstance; +} + +export function setNotificationAnalyticsAggregator( + instance: NotificationAnalyticsAggregator | null, +): void { + defaultInstance = instance; +} diff --git a/listener/src/services/notification-retry-queue.ts b/listener/src/services/notification-retry-queue.ts index d669dc3..895d3bd 100644 --- a/listener/src/services/notification-retry-queue.ts +++ b/listener/src/services/notification-retry-queue.ts @@ -2,6 +2,8 @@ import * as StellarSDK from '@stellar/stellar-sdk'; import { ContractConfig } from '../types'; import logger from '../utils/logger'; import { getEventName } from '../utils/event-utils'; +import { getNotificationAnalyticsAggregator, NotificationAnalyticsAggregator } from './notification-analytics-aggregator'; +import { NotificationType } from '../types/scheduled-notification'; export interface RetryQueueOptions { baseDelayMs?: number; @@ -37,12 +39,14 @@ export class NotificationRetryQueue { private readonly processIntervalMs: number; private timer: ReturnType | null = null; private readonly notificationFn: NotificationFn; + private readonly analytics: NotificationAnalyticsAggregator | null; constructor(notificationFn: NotificationFn, options?: RetryQueueOptions) { this.notificationFn = notificationFn; this.baseDelayMs = options?.baseDelayMs ?? DEFAULTS.baseDelayMs; this.maxRetries = options?.maxRetries ?? DEFAULTS.maxRetries; this.processIntervalMs = options?.processIntervalMs ?? DEFAULTS.processIntervalMs; + this.analytics = getNotificationAnalyticsAggregator(); } enqueue( @@ -111,6 +115,7 @@ export class NotificationRetryQueue { private async retryItem(item: RetryItem): Promise { const attempt = item.retryCount + 1; const fingerprint = buildRetryFingerprint(item.event, item.contractConfig.address); + const retryStart = Date.now(); logger.info('Retrying failed notification', { requestId: item.requestId, @@ -120,6 +125,14 @@ export class NotificationRetryQueue { maxRetries: this.maxRetries, }); + this.analytics?.record({ + notificationType: NotificationType.DISCORD, + contractAddress: item.contractConfig.address, + outcome: 'retry', + durationMs: 0, + timestamp: retryStart, + }); + const success = await this.notificationFn(item.event, item.contractConfig, item.requestId); if (success) { @@ -135,6 +148,14 @@ export class NotificationRetryQueue { if (attempt >= this.maxRetries) { this.queuedFingerprints.delete(fingerprint); + this.analytics?.record({ + notificationType: NotificationType.DISCORD, + contractAddress: item.contractConfig.address, + outcome: 'failure', + durationMs: Date.now() - retryStart, + errorReason: `exhausted ${this.maxRetries} retries`, + timestamp: Date.now(), + }); logger.error('Notification permanently failed after max retries', { requestId: item.requestId, eventId: item.event.id,