Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions listener/src/api/events-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import http from 'http';
import crypto from 'crypto';
import { createEventsServer, checkStellarRpc, checkDiscord } from './events-server';
import { eventRegistry } from '../store/event-registry';
import { NotificationAnalyticsAggregator } from '../services/notification-analytics-aggregator';
import { NotificationType } from '../types/scheduled-notification';

const mockGetHealth = jest.fn();

Expand Down Expand Up @@ -291,3 +293,116 @@ describe('POST /api/webhooks', () => {
expect(status).toBe(404);
});
});

describe('GET /api/analytics', () => {
let server: http.Server;

beforeEach(() => {
jest.clearAllMocks();
});

afterEach(async () => {
if (server) {
await closeServer(server);
server = null as unknown as http.Server;
}
});

it('returns an empty snapshot when no records are recorded', async () => {
const aggregator = new NotificationAnalyticsAggregator();
aggregator.reset();
server = await startServer({ ...BASE_OPTIONS, analyticsAggregator: aggregator });

const res = await request(server, 'GET', '/api/analytics');
expect(res.status).toBe(200);
const body = res.body as Record<string, unknown>;
expect(body.totalRecorded).toBe(0);
expect(body.windowStart).toBeDefined();
expect(body.windowEnd).toBeDefined();
expect(body.overall).toBeDefined();
expect(body.byType).toEqual([]);
expect(body.byContract).toEqual([]);
// hourlyBuckets is a fixed-size rolling window: when there are no records
// every bucket still exists with zero counters. We assert structure rather
// than emptiness so the test is robust to bucket-count changes.
expect(Array.isArray(body.hourlyBuckets)).toBe(true);
expect((body.hourlyBuckets as unknown[]).length).toBeGreaterThan(0);
for (const bucket of body.hourlyBuckets as Array<{ total: number; success: number; failure: number }>) {
expect(bucket.total).toBe(0);
expect(bucket.success).toBe(0);
expect(bucket.failure).toBe(0);
}
expect(body.errorBreakdown).toEqual({});
});

it('returns aggregated metrics from recorded outcomes', async () => {
const aggregator = new NotificationAnalyticsAggregator({ bucketSizeMs: 60_000 });
aggregator.reset();
const now = Date.now();
const baseTs = now;
aggregator.record({
notificationType: NotificationType.DISCORD,
contractAddress: 'CABC',
outcome: 'success',
durationMs: 120,
timestamp: baseTs,
});
aggregator.record({
notificationType: NotificationType.DISCORD,
contractAddress: 'CABC',
outcome: 'failure',
durationMs: 240,
errorReason: 'HTTP 500',
timestamp: baseTs + 1000,
});
aggregator.record({
notificationType: NotificationType.WEBHOOK,
outcome: 'retry',
durationMs: 0,
timestamp: baseTs + 2000,
});
server = await startServer({ ...BASE_OPTIONS, analyticsAggregator: aggregator });

const res = await request(server, 'GET', '/api/analytics');
expect(res.status).toBe(200);
const body = res.body as Record<string, any>;
expect(body.totalRecorded).toBe(3);
expect(body.byType.length).toBeGreaterThan(0);
const discordRow = body.byType.find(
(r: any) => r.notificationType === NotificationType.DISCORD,
);
expect(discordRow).toBeDefined();
expect(discordRow.total).toBe(2);
expect(discordRow.success).toBe(1);
expect(discordRow.failure).toBe(1);
expect(discordRow.successRate).toBeCloseTo(0.5);
const contractRow = body.byContract.find(
(r: any) => r.contractAddress === 'CABC',
);
expect(contractRow).toBeDefined();
expect(contractRow.total).toBe(2);
expect(body.errorBreakdown['HTTP 500']).toBe(1);
});

it('clears aggregator state when reset=true is supplied', async () => {
const aggregator = new NotificationAnalyticsAggregator();
aggregator.reset();
aggregator.record({
notificationType: NotificationType.DISCORD,
outcome: 'success',
durationMs: 50,
timestamp: Date.now(),
});
server = await startServer({ ...BASE_OPTIONS, analyticsAggregator: aggregator });

const first = await request(server, 'GET', '/api/analytics');
expect((first.body as any).totalRecorded).toBe(1);

const reset = await request(server, 'GET', '/api/analytics?reset=true');
expect(reset.status).toBe(200);
expect((reset.body as any).totalRecorded).toBe(1); // snapshot returned BEFORE reset

const after = await request(server, 'GET', '/api/analytics');
expect((after.body as any).totalRecorded).toBe(0);
});
});
50 changes: 50 additions & 0 deletions listener/src/api/events-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import {
} from '../services/webhook-verifier';
import { WebhookSecret, RateLimitConfig } from '../types';
import { RateLimiter } from './rate-limiter';
import {
getNotificationAnalyticsAggregator,
setNotificationAnalyticsAggregator,
NotificationAnalyticsAggregator,
} from '../services/notification-analytics-aggregator';

export interface EventsServerOptions {
port: number;
Expand All @@ -25,6 +30,12 @@ export interface EventsServerOptions {
webhookSecrets?: WebhookSecret[];
notificationAPI?: NotificationAPI | null;
rateLimit?: RateLimitConfig;
/**
* Optional override for the analytics aggregator. Tests use this to inject
* a controlled instance and reset state between cases. When omitted, the
* process-wide default aggregator is used.
*/
analyticsAggregator?: NotificationAnalyticsAggregator | null;
}

type ServiceStatus = 'ok' | 'error' | 'not_configured';
Expand Down Expand Up @@ -193,6 +204,45 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
return;
}

// GET /api/analytics
if (req.method === 'GET' && url.pathname.startsWith('/api/analytics')) {
const aggregator =
options.analyticsAggregator !== undefined
? options.analyticsAggregator
: getNotificationAnalyticsAggregator();

if (!aggregator) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Analytics aggregator unavailable' }));
return;
}

const snapshot = aggregator.snapshot();
const reset = url.searchParams.get('reset') === 'true';

logger.info('Handling GET /api/analytics', {
requestId,
correlationId,
totalRecorded: snapshot.totalRecorded,
reset,
durationMs: Date.now() - startTime,
});

res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({
...snapshot,
...(reset ? {} : {}),
}),
);

if (reset) {
aggregator.reset();
logger.info('Analytics snapshot reset after read', { requestId, correlationId });
}
return;
}

// POST /api/webhooks
if (req.method === 'POST' && url.pathname === '/api/webhooks') {
collectRawBody(req).then((rawBody) => {
Expand Down
35 changes: 35 additions & 0 deletions listener/src/services/discord-notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import logger from '../utils/logger';
import { ContractConfig, DiscordConfig } from '../types';
import { getEventName } from '../utils/event-utils';
import { NotificationDeduplicator, generateFingerprint } from './notification-deduplicator';
import { getNotificationAnalyticsAggregator, NotificationAnalyticsAggregator } from './notification-analytics-aggregator';
import { NotificationType } from '../types/scheduled-notification';

export interface DiscordMessage {
content?: string;
Expand All @@ -26,6 +28,7 @@ export class DiscordNotificationService {
private config: DiscordConfig;
private deduplicator: NotificationDeduplicator;
private timeoutCount: number = 0;
private readonly analytics: NotificationAnalyticsAggregator | null;

constructor(config: DiscordConfig, deduplicator?: NotificationDeduplicator) {
this.config = config;
Expand All @@ -35,6 +38,7 @@ export class DiscordNotificationService {
windowMs: config.deduplicationWindowMs,
maxSize: config.deduplicationMaxSize,
});
this.analytics = getNotificationAnalyticsAggregator();
}

async sendEventNotification(
Expand All @@ -45,6 +49,13 @@ export class DiscordNotificationService {
const fingerprint = generateFingerprint(event.id, contractConfig.address);

if (this.deduplicator.isDuplicate(fingerprint)) {
this.analytics?.record({
notificationType: NotificationType.DISCORD,
contractAddress: contractConfig.address,
outcome: 'skipped',
durationMs: 0,
timestamp: Date.now(),
});
logger.info('Skipping duplicate notification', {
eventId: event.id,
contractAddress: contractConfig.address,
Expand All @@ -71,6 +82,14 @@ export class DiscordNotificationService {

if (!response.ok) {
const errorText = await response.text();
this.analytics?.record({
notificationType: NotificationType.DISCORD,
contractAddress: contractConfig.address,
outcome: 'failure',
durationMs,
errorReason: `HTTP ${response.status}`,
timestamp: Date.now(),
});
logger.error('Discord webhook failed', {
...logContext,
status: response.status,
Expand All @@ -82,13 +101,29 @@ export class DiscordNotificationService {
}

this.deduplicator.markSent(fingerprint);
this.analytics?.record({
notificationType: NotificationType.DISCORD,
contractAddress: contractConfig.address,
outcome: 'success',
durationMs,
timestamp: Date.now(),
});
logger.info('Discord notification delivered', {
...logContext,
durationMs,
deduplication: this.deduplicator.getMetrics(),
});
return true;
} catch (error) {
const durationMs = Date.now() - startTime;
this.analytics?.record({
notificationType: NotificationType.DISCORD,
contractAddress: contractConfig.address,
outcome: 'failure',
durationMs,
errorReason: error instanceof Error ? error.message : String(error),
timestamp: Date.now(),
});
logger.error('Error sending Discord notification', {
...logContext,
error,
Expand Down
Loading
Loading