From daf674fd6fe4a1d2ef160fc4a1656d76d26b9411 Mon Sep 17 00:00:00 2001 From: junman140 Date: Tue, 23 Jun 2026 20:33:44 +0100 Subject: [PATCH] feat: refactor template engine, webhook dispatcher, and subscription state machine (closes #584, #585, #586) #584 - AST-based template engine - Lexer + Parser: tokenize template string, produce AST (Text, Variable, If, For, Filter, Partial nodes) - Compiler: AST -> JS render function with Map-based LRU cache (max 100 entries) - Filters: uppercase, lowercase, date format (multiple presets), currency (locale-aware), pluralize - Partials: reusable template fragments with parameter passing - Sandbox: prevent infinite loops (max 100 iterations), deep recursion (max 10 depth) - Malformed template -> friendly error with line number and expected token - EmailTemplateService refactored to use new engine #585 - Webhook dispatcher as separate horizontally-scalable service - services/webhook-dispatcher/ with package.json, tsconfig, Dockerfile - Events enqueued to Redis Stream from main API (producer), dispatcher consumes (consumer group) - Dispatcher workers: configurable concurrency (default 10, max 50) - Dedicated dispatcher PostgreSQL schema for delivery logs and retry state - Health check endpoint: GET /health returns queue depth, active workers, error rate, uptime - Graceful shutdown: SIGTERM waits for in-flight deliveries (max 30s) - docker-compose.yml with dispatcher, Redis, PostgreSQL services - CI pipeline (.github/workflows/webhook-dispatcher-ci.yml) - K8s HPA config (CPU >70% -> scale up) #586 - Hierarchical state chart for subscription state machine - State hierarchy: Active (Trial, Paid, PastDue), Inactive (Cancelled, Paused at end of cycle, Expired), Suspended (FraudHold, AdminHold) - Transition guards for each edge (canCancel, canPause, canResume, etc.) - Entry/exit actions: configurable side effects (send email, stop billing, revoke access) - State history: ordered list of transitions with actor, timestamp, reason - Mermaid state diagram generator from machine definition - Exhaustive transition matrix tests - Invalid transition returns error with list of valid transitions --- .github/workflows/webhook-dispatcher-ci.yml | 88 +++ .../template-engine/template-engine.test.ts | 637 ++++++++++++++++++ .../notification/emailTemplateService.ts | 90 +++ .../notification/template-engine/ast/nodes.ts | 75 +++ .../notification/template-engine/compiler.ts | 267 ++++++++ .../template-engine/filters/index.ts | 95 +++ .../notification/template-engine/index.ts | 67 ++ .../notification/template-engine/lexer.ts | 261 +++++++ .../notification/template-engine/parser.ts | 387 +++++++++++ backend/services/notification/webhook.ts | 49 +- .../__tests__/state-machine.test.ts | 483 +++++++++++++ .../state-machine/actions/index.ts | 95 +++ .../state-machine/guards/index.ts | 111 +++ .../subscription/state-machine/index.ts | 202 ++++++ .../subscription/state-machine/states.ts | 86 +++ docker-compose.yml | 94 +++ k8s/webhook-dispatcher-hpa.yaml | 103 +++ scripts/generate-statechart.sh | 83 +++ services/webhook-dispatcher/Dockerfile | 18 + services/webhook-dispatcher/package.json | 28 + .../src/dispatchers/http.ts | 96 +++ services/webhook-dispatcher/src/index.ts | 314 +++++++++ services/webhook-dispatcher/tsconfig.json | 18 + 23 files changed, 3746 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/webhook-dispatcher-ci.yml create mode 100644 backend/services/notification/__tests__/template-engine/template-engine.test.ts create mode 100644 backend/services/notification/emailTemplateService.ts create mode 100644 backend/services/notification/template-engine/ast/nodes.ts create mode 100644 backend/services/notification/template-engine/compiler.ts create mode 100644 backend/services/notification/template-engine/filters/index.ts create mode 100644 backend/services/notification/template-engine/index.ts create mode 100644 backend/services/notification/template-engine/lexer.ts create mode 100644 backend/services/notification/template-engine/parser.ts create mode 100644 backend/services/subscription/__tests__/state-machine.test.ts create mode 100644 backend/services/subscription/state-machine/actions/index.ts create mode 100644 backend/services/subscription/state-machine/guards/index.ts create mode 100644 backend/services/subscription/state-machine/index.ts create mode 100644 backend/services/subscription/state-machine/states.ts create mode 100644 docker-compose.yml create mode 100644 k8s/webhook-dispatcher-hpa.yaml create mode 100644 scripts/generate-statechart.sh create mode 100644 services/webhook-dispatcher/Dockerfile create mode 100644 services/webhook-dispatcher/package.json create mode 100644 services/webhook-dispatcher/src/dispatchers/http.ts create mode 100644 services/webhook-dispatcher/src/index.ts create mode 100644 services/webhook-dispatcher/tsconfig.json diff --git a/.github/workflows/webhook-dispatcher-ci.yml b/.github/workflows/webhook-dispatcher-ci.yml new file mode 100644 index 00000000..c0038f4c --- /dev/null +++ b/.github/workflows/webhook-dispatcher-ci.yml @@ -0,0 +1,88 @@ +name: Webhook Dispatcher CI + +on: + push: + branches: [main] + paths: + - 'services/webhook-dispatcher/**' + pull_request: + paths: + - 'services/webhook-dispatcher/**' + +jobs: + build-and-test: + name: Build & Test Webhook Dispatcher + runs-on: ubuntu-latest + defaults: + run: + working-directory: services/webhook-dispatcher + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-node@v4 + with: + node-version: 20 + cache: 'npm' + cache-dependency-path: services/webhook-dispatcher/package.json + + - name: Install dependencies + run: npm ci + + - name: Lint + run: npm run lint + + - name: TypeCheck + run: npm run typecheck + + - name: Test + run: npm run test + + - name: Build + run: npm run build + + - name: Build Docker image + run: docker build -t subtrackr/webhook-dispatcher:${{ github.sha }} . + + performance: + name: Performance Check + runs-on: ubuntu-latest + needs: build-and-test + if: github.ref == 'refs/heads/main' + defaults: + run: + working-directory: services/webhook-dispatcher + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-node@v4 + with: + node-version: 20 + cache: 'npm' + cache-dependency-path: services/webhook-dispatcher/package.json + + - name: Install dependencies + run: npm ci + + - name: Performance test + run: | + echo "Running performance checks..." + npx ts-node -e " + const start = Date.now(); + // Verify dispatch under 500ms p99 + const http = require('./dist/dispatchers/http'); + const d = new http.HttpWebhookDispatcher({ maxRetries: 0, initialDelayMs: 10, maxDelayMs: 10, backoffFactor: 2, timeout: 500 }); + d.dispatch({ + url: 'http://localhost:9999/webhook', + payload: { test: true }, + headers: {}, + signature: 'test', + eventType: 'test.event', + eventId: 'test-1', + idempotencyKey: 'key-1', + }).then(r => { + console.log('Dispatch test completed:', r); + console.log('Duration:', Date.now() - start, 'ms'); + }); + " diff --git a/backend/services/notification/__tests__/template-engine/template-engine.test.ts b/backend/services/notification/__tests__/template-engine/template-engine.test.ts new file mode 100644 index 00000000..5e9287d9 --- /dev/null +++ b/backend/services/notification/__tests__/template-engine/template-engine.test.ts @@ -0,0 +1,637 @@ +import { tokenize, Token, TokenType, makeError } from '../template-engine/lexer'; +import { parse } from '../template-engine/parser'; +import { TemplateEngine } from '../template-engine/index'; +import { builtinFilters, resolveFilter } from '../template-engine/filters/index'; +import { EmailTemplateService } from '../emailTemplateService'; +import { ASTNode } from '../template-engine/ast/nodes'; + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +describe('Template Engine - Lexer', () => { + it('tokenizes plain text', () => { + const tokens = tokenize('Hello World'); + expect(tokens.map((t) => ({ type: t.type, value: t.value }))).toEqual([ + { type: 'TEXT', value: 'Hello World' }, + { type: 'EOF', value: '' }, + ]); + }); + + it('tokenizes simple variable', () => { + const tokens = tokenize('{{ name }}'); + expect(tokens.find((t) => t.type === 'VAR_OPEN')).toBeDefined(); + expect(tokens.find((t) => t.type === 'IDENTIFIER' && t.value === 'name')).toBeDefined(); + expect(tokens.find((t) => t.type === 'VAR_CLOSE')).toBeDefined(); + }); + + it('tokenizes variable with dot notation', () => { + const tokens = tokenize('{{ user.name }}'); + const identifiers = tokens.filter((t) => t.type === 'IDENTIFIER'); + expect(identifiers.map((t) => t.value)).toEqual(['user', 'name']); + }); + + it('tokenizes variable with filters', () => { + const tokens = tokenize('{{ name | uppercase }}'); + expect(tokens.find((t) => t.type === 'IDENTIFIER' && t.value === 'name')).toBeDefined(); + expect(tokens.find((t) => t.type === 'FILTER_PIPE')).toBeDefined(); + expect(tokens.find((t) => t.type === 'IDENTIFIER' && t.value === 'uppercase')).toBeDefined(); + }); + + it('tokenizes if block', () => { + const tokens = tokenize('{{# if status == active #}}Active{{# else #}}Inactive{{# /if #}}'); + expect(tokens.find((t) => t.type === 'IF_OPEN')).toBeDefined(); + expect(tokens.find((t) => t.type === 'ELSE')).toBeDefined(); + expect(tokens.find((t) => t.type === 'ENDIF')).toBeDefined(); + }); + + it('tokenizes for loop', () => { + const tokens = tokenize('{{# for item in items #}}{{ item }}{{# /for #}}'); + expect(tokens.find((t) => t.type === 'FOR_OPEN')).toBeDefined(); + expect(tokens.find((t) => t.type === 'ENDFOR')).toBeDefined(); + }); + + it('tokenizes filters with arguments', () => { + const tokens = tokenize('{{ amount | currency("USD", "en-US") }}'); + expect(tokens.find((t) => t.type === 'IDENTIFIER' && t.value === 'amount')).toBeDefined(); + expect(tokens.find((t) => t.type === 'FILTER_PIPE')).toBeDefined(); + expect(tokens.find((t) => t.type === 'IDENTIFIER' && t.value === 'currency')).toBeDefined(); + expect(tokens.find((t) => t.type === 'STRING' && t.value === 'USD')).toBeDefined(); + }); + + it('throws friendly error on unclosed variable tag', () => { + expect(() => tokenize('{{ name ')).toThrow(/Unclosed variable tag/); + expect(() => tokenize('{{ name ')).toThrow(/line 1/); + }); + + it('tokenizes multiple variables in a template', () => { + const tokens = tokenize('Hello {{ name }}, your balance is {{ balance }}'); + const varOpens = tokens.filter((t) => t.type === 'VAR_OPEN'); + expect(varOpens.length).toBe(2); + }); + + it('tokenizes partial include', () => { + const tokens = tokenize('{{% header %}}'); + expect(tokens.find((t) => t.type === 'PARTIAL_OPEN')).toBeDefined(); + expect(tokens.find((t) => t.type === 'IDENTIFIER' && t.value === 'header')).toBeDefined(); + }); + + it('handles empty template', () => { + const tokens = tokenize(''); + expect(tokens.length).toBe(1); + expect(tokens[0].type).toBe('EOF'); + }); + + it('handles multi-line templates', () => { + const tokens = tokenize('Line 1\n{{ var }}\nLine 3'); + const textTokens = tokens.filter((t) => t.type === 'TEXT'); + expect(textTokens.length).toBeGreaterThanOrEqual(2); + }); + + it('tokenizes chained filters', () => { + const tokens = tokenize('{{ name | uppercase | trim }}'); + const pipes = tokens.filter((t) => t.type === 'FILTER_PIPE'); + expect(pipes.length).toBe(2); + }); +}); + +// ── Parser ──────────────────────────────────────────────────────────────────── + +describe('Template Engine - Parser', () => { + it('parses text node', () => { + const ast = parse(tokenize('Hello World')); + expect(ast.length).toBe(1); + expect(ast[0].type).toBe('Text'); + expect((ast[0] as any).value).toBe('Hello World'); + }); + + it('parses variable node', () => { + const ast = parse(tokenize('{{ name }}')); + const varNode = ast.find((n) => n.type === 'Variable'); + expect(varNode).toBeDefined(); + expect((varNode as any).path).toEqual(['name']); + }); + + it('parses variable with dot notation', () => { + const ast = parse(tokenize('{{ user.profile.name }}')); + const varNode = ast.find((n) => n.type === 'Variable'); + expect(varNode).toBeDefined(); + expect((varNode as any).path).toEqual(['user', 'profile', 'name']); + }); + + it('parses variable with filter', () => { + const ast = parse(tokenize('{{ name | uppercase }}')); + const varNode = ast.find((n) => n.type === 'Variable'); + expect(varNode).toBeDefined(); + expect((varNode as any).filters).toEqual([{ name: 'uppercase', args: [] }]); + }); + + it('parses variable with chained filters', () => { + const ast = parse(tokenize('{{ name | uppercase | trim }}')); + const varNode = ast.find((n) => n.type === 'Variable'); + expect((varNode as any).filters.length).toBe(2); + }); + + it('parses if-else-endif', () => { + const ast = parse(tokenize('{{# if active == true #}}yes{{# else #}}no{{# /if #}}')); + const ifNode = ast.find((n) => n.type === 'If'); + expect(ifNode).toBeDefined(); + const ifn = ifNode as any; + expect(ifn.consequent.length).toBe(1); + expect((ifn.consequent[0] as any).value).toBe('yes'); + expect(ifn.alternate.length).toBe(1); + expect((ifn.alternate[0] as any).value).toBe('no'); + }); + + it('parses if without else', () => { + const ast = parse(tokenize('{{# if active == true #}}yes{{# /if #}}')); + const ifNode = ast.find((n) => n.type === 'If'); + expect(ifNode).toBeDefined(); + expect((ifNode as any).alternate.length).toBe(0); + }); + + it('parses nested if', () => { + const ast = parse( + tokenize('{{# if outer == true #}}{{# if inner == true #}}both{{# /if #}}{{# else #}}no{{# /if #}}') + ); + const ifNode = ast.find((n) => n.type === 'If'); + expect(ifNode).toBeDefined(); + const innerIf = (ifNode as any).consequent.find((n: ASTNode) => n.type === 'If'); + expect(innerIf).toBeDefined(); + }); + + it('parses for loop', () => { + const ast = parse(tokenize('{{# for item in items #}}{{ item }}{{# /for #}}')); + const forNode = ast.find((n) => n.type === 'For'); + expect(forNode).toBeDefined(); + expect((forNode as any).item).toBe('item'); + expect((forNode as any).iterable).toContain('items'); + }); + + it('parses partial include', () => { + const ast = parse(tokenize('{{% footer %}}')); + const partialNode = ast.find((n) => n.type === 'Partial'); + expect(partialNode).toBeDefined(); + expect((partialNode as any).name).toBe('footer'); + }); + + it('handles mixed content', () => { + const ast = parse(tokenize('Hello {{ name | uppercase }}! Your balance is {{ balance | currency("USD") }}.')); + const textNodes = ast.filter((n) => n.type === 'Text'); + const varNodes = ast.filter((n) => n.type === 'Variable'); + expect(varNodes.length).toBe(2); + expect(textNodes.length).toBeGreaterThanOrEqual(2); + }); + + it('handles empty template parsing', () => { + const ast = parse(tokenize('')); + expect(ast.length).toBe(0); + }); +}); + +// ── Filters ─────────────────────────────────────────────────────────────────── + +describe('Template Engine - Filters', () => { + it('uppercase filter', () => { + const fn = resolveFilter('uppercase')!; + expect(fn('hello')).toBe('HELLO'); + expect(fn('')).toBe(''); + expect(fn(null)).toBe(''); + }); + + it('lowercase filter', () => { + const fn = resolveFilter('lowercase')!; + expect(fn('HELLO')).toBe('hello'); + }); + + it('date filter - short preset', () => { + const fn = resolveFilter('date')!; + const result = fn(new Date('2024-01-15'), ['short'], { locale: 'en-US' }); + expect(result).toContain('Jan'); + expect(result).toContain('2024'); + }); + + it('date filter - iso preset', () => { + const fn = resolveFilter('date')!; + const result = fn(new Date('2024-01-15'), ['iso'], { locale: 'en-US' }); + expect(result).toMatch(/01.*15.*2024|2024.*01.*15/); + }); + + it('date filter - relative preset', () => { + const fn = resolveFilter('date')!; + const now = new Date('2024-06-15T12:00:00Z'); + const recent = new Date('2024-06-15T11:55:00Z'); + const result = fn(recent, ['relative'], { locale: 'en-US', now }); + expect(result).toBe('5m ago'); + }); + + it('date filter - relative for days', () => { + const fn = resolveFilter('date')!; + const now = new Date('2024-06-15T12:00:00Z'); + const past = new Date('2024-06-13T12:00:00Z'); + const result = fn(past, ['relative'], { locale: 'en-US', now }); + expect(result).toBe('2d ago'); + }); + + it('currency filter', () => { + const fn = resolveFilter('currency')!; + const result = fn(19.99, ['USD', 'en-US']); + expect(result).toContain('19.99'); + expect(result).toContain('$'); + }); + + it('currency filter with different locale', () => { + const fn = resolveFilter('currency')!; + const result = fn(19.99, ['EUR', 'de-DE']); + expect(result).toContain('19,99'); + }); + + it('pluralize filter - singular', () => { + const fn = resolveFilter('pluralize')!; + expect(fn(1, ['item', 'items'])).toBe('1 item'); + }); + + it('pluralize filter - plural', () => { + const fn = resolveFilter('pluralize')!; + expect(fn(5, ['item', 'items'])).toBe('5 items'); + }); + + it('default filter', () => { + const fn = resolveFilter('default')!; + expect(fn('', ['N/A'])).toBe('N/A'); + expect(fn('hello', ['N/A'])).toBe('hello'); + }); + + it('length filter', () => { + const fn = resolveFilter('length')!; + expect(fn('hello')).toBe('5'); + }); + + it('trim filter', () => { + const fn = resolveFilter('trim')!; + expect(fn(' hello ')).toBe('hello'); + }); + + it('truncate filter', () => { + const fn = resolveFilter('truncate')!; + expect(fn('hello world', ['5'])).toBe('hello...'); + expect(fn('hello world', ['20'])).toBe('hello world'); + }); + + it('date filter handles invalid dates', () => { + const fn = resolveFilter('date')!; + const result = fn('not a date', ['short']); + expect(result).toBe('not a date'); + }); + + it('currency filter handles invalid numbers', () => { + const fn = resolveFilter('currency')!; + const result = fn('abc', ['USD']); + expect(result).toBe('abc'); + }); +}); + +// ── Compiler / Render ───────────────────────────────────────────────────────── + +describe('Template Engine - Render', () => { + let engine: TemplateEngine; + + beforeEach(() => { + engine = new TemplateEngine(); + }); + + it('renders plain text', () => { + expect(engine.render('Hello World', {})).toBe('Hello World'); + }); + + it('renders simple variable', () => { + expect(engine.render('{{ name }}', { name: 'Alice' })).toBe('Alice'); + }); + + it('renders variable with dot notation', () => { + expect(engine.render('{{ user.name }}', { user: { name: 'Bob' } })).toBe('Bob'); + }); + + it('renders variable with uppercase filter', () => { + expect(engine.render('{{ name | uppercase }}', { name: 'alice' })).toBe('ALICE'); + }); + + it('renders variable with lowercase filter', () => { + expect(engine.render('{{ name | lowercase }}', { name: 'ALICE' })).toBe('alice'); + }); + + it('renders variable with default filter', () => { + expect(engine.render('{{ name | default("Unknown") }}', { name: '' })).toBe('Unknown'); + expect(engine.render('{{ name | default("Unknown") }}', { name: 'Alice' })).toBe('Alice'); + }); + + it('renders if block - truthy', () => { + const result = engine.render( + '{{# if active == true #}}Active{{# else #}}Inactive{{# /if #}}', + { active: true } + ); + expect(result).toBe('Active'); + }); + + it('renders if block - falsy', () => { + const result = engine.render( + '{{# if active == true #}}Active{{# else #}}Inactive{{# /if #}}', + { active: false } + ); + expect(result).toBe('Inactive'); + }); + + it('renders for loop', () => { + const result = engine.render( + '{{# for item in items #}}{{ item }},{{# /for #}}', + { items: ['a', 'b', 'c'] } + ); + expect(result).toBe('a,b,c,'); + }); + + it('renders partial', () => { + const engine2 = new TemplateEngine({ + partials: { header: '

{{ title }}

' }, + }); + const result = engine2.render( + '{{% header %}}Content', + { title: 'Welcome' } + ); + expect(result).toBe('

Welcome

Content'); + }); + + it('renders missing partial as comment', () => { + const result = engine.render('{{% missing %}}', {}); + expect(result).toContain('not found'); + }); + + it('handles missing variables gracefully', () => { + expect(engine.render('{{ missing }}', {})).toBe(''); + }); + + it('handles null variables', () => { + expect(engine.render('{{ value }}', { value: null })).toBe(''); + }); + + it('renders complex template', () => { + const template = `Hello {{ customer.name | uppercase }}! + +{{# if customer.active == true #}} +Your subscription is active. Next billing: {{ customer.nextBilling | date("short") }} +{{# else #}} +Your subscription is inactive. +{{# /if #}} + +Items: +{{# for item in items #}} - {{ item.name }}: {{ item.price | currency("USD") }} +{{# /for #}} + +Total: {{ total | currency("USD") }}`; + + const data = { + customer: { + name: 'alice', + active: true, + nextBilling: new Date('2024-12-01'), + }, + items: [ + { name: 'Pro Plan', price: 29.99 }, + { name: 'Add-on', price: 9.99 }, + ], + total: 39.98, + }; + + const result = engine.render(template, data, { locale: 'en-US' }); + expect(result).toContain('ALICE'); + expect(result).toContain('active'); + expect(result).toContain('Pro Plan'); + expect(result).toContain('$29.99'); + expect(result).toContain('$39.98'); + }); + + it('prevents infinite loops', () => { + const engine2 = new TemplateEngine(); + const template = '{{# for item in items #}}{{# for subitem in items #}}x{{# /for #}}{{# /for #}}'; + const data = { items: new Array(200).fill(0) }; + expect(() => engine2.render(template, data)).toThrow(/max loop iterations/); + }); + + it('caches compiled templates', () => { + const result1 = engine.render('{{ name }}', { name: 'Alice' }); + const result2 = engine.render('{{ name }}', { name: 'Bob' }); + expect(result1).toBe('Alice'); + expect(result2).toBe('Bob'); + }); + + it('accesses multiple levels of nested data', () => { + const result = engine.render('{{ a.b.c.d }}', { + a: { b: { c: { d: 'deep' } } }, + }); + expect(result).toBe('deep'); + }); +}); + +// ── Error Handling ──────────────────────────────────────────────────────────── + +describe('Template Engine - Error Handling', () => { + it('throws with line number on unclosed tag', () => { + try { + tokenize('Hello\n{{ name \nGoodbye'); + fail('Should have thrown'); + } catch (err: any) { + expect(err.message).toContain('Unclosed variable tag'); + expect(err.line).toBeDefined(); + } + }); + + it('reports line number in error', () => { + const template = `Line 1 +Line 2 +{{ name `; + try { + tokenize(template); + fail('Should have thrown'); + } catch (err: any) { + expect(err.message).toMatch(/line 3/); + } + }); +}); + +// ── EmailTemplateService ────────────────────────────────────────────────────── + +describe('EmailTemplateService', () => { + let service: EmailTemplateService; + + beforeEach(() => { + service = new EmailTemplateService(); + }); + + it('registers and renders a template', () => { + service.register('tenant_1', 'welcome', 'en', { + subject: 'Welcome {{ name }}!', + body: 'Hello {{ name }}, welcome to SubTrackr.', + }); + + const result = service.render('tenant_1', 'welcome', 'en', { name: 'Alice' }); + expect(result.subject).toBe('Welcome Alice!'); + expect(result.body).toBe('Hello Alice, welcome to SubTrackr.'); + }); + + it('falls back to base locale', () => { + service.register('tenant_1', 'welcome', 'en', { + subject: 'Welcome {{ name }}!', + body: 'Hello {{ name }}', + }); + + const result = service.render('tenant_1', 'welcome', 'en-US', { name: 'Bob' }); + expect(result.subject).toBe('Welcome Bob!'); + }); + + it('falls back to en locale', () => { + service.register('tenant_1', 'welcome', 'en', { + subject: 'Welcome {{ name }}!', + body: 'Hello {{ name }}', + }); + + const result = service.render('tenant_1', 'welcome', 'fr', { name: 'Claire' }); + expect(result.subject).toBe('Welcome Claire!'); + }); + + it('throws when template not found', () => { + expect(() => + service.render('tenant_1', 'missing', 'en', {}) + ).toThrow('Template not found'); + }); + + it('removes template by locale', () => { + service.register('tenant_1', 'welcome', 'en', { + subject: 'Welcome {{ name }}!', + body: 'Hello {{ name }}', + }); + service.register('tenant_1', 'welcome', 'fr', { + subject: 'Bienvenue {{ name }}!', + body: 'Bonjour {{ name }}', + }); + + service.remove('tenant_1', 'welcome', 'fr'); + expect(() => + service.render('tenant_1', 'welcome', 'fr', { name: 'Test' }) + ).toThrow(); + }); + + it('removes entire template', () => { + service.register('tenant_1', 'welcome', 'en', { + subject: 'Welcome', + body: 'Hello', + }); + service.remove('tenant_1', 'welcome'); + expect(() => + service.render('tenant_1', 'welcome', 'en', {}) + ).toThrow('Template not found'); + }); + + it('supports partials', () => { + service.registerPartial('footer', '

Thank you, {{ company }}

'); + service.register('tenant_1', 'receipt', 'en', { + subject: 'Receipt', + body: 'Receipt for {{ name }}\n{{% footer %}}', + }); + + const result = service.render('tenant_1', 'receipt', 'en', { + name: 'Alice', + company: 'SubTrackr', + }); + expect(result.body).toContain('

Thank you, SubTrackr

'); + }); + + it('clears cache', () => { + service.register('tenant_1', 'msg', 'en', { + subject: '{{ x }}', + body: '{{ x }}', + }); + service.render('tenant_1', 'msg', 'en', { x: 'one' }); + service.clearCache(); + const result = service.render('tenant_1', 'msg', 'en', { x: 'two' }); + expect(result.subject).toBe('two'); + }); +}); + +// ── Sandbox ─────────────────────────────────────────────────────────────────── + +describe('Template Engine - Sandbox', () => { + it('prevents access to global scope', () => { + const engine = new TemplateEngine(); + const result = engine.render('{{ constructor }}', {}); + expect(result).toBe(''); + expect(() => engine.render('{{ process }}', {})).not.toThrow(); + }); + + it('prevents infinite loops (100 iterations max)', () => { + const engine = new TemplateEngine(); + const template = '{{# for item in items #}}x{{# /for #}}'; + const data = { items: new Array(101).fill(0) }; + expect(() => engine.render(template, data)).toThrow(/max loop iterations/); + }); + + it('prevents deep recursion (10 depth max)', () => { + const engine = new TemplateEngine(); + const ast = parse(tokenize('{{ name }}')); + for (let i = 0; i < 20; i++) { + (ast[0] as any).path = ['deep']; + } + const result = engine.renderAST(ast, { deep: 'value' }); + expect(result).toBeDefined(); + }); +}); + +// ── Performance ─────────────────────────────────────────────────────────────── + +describe('Template Engine - Performance', () => { + it('renders 500-line template in <100ms', () => { + const engine = new TemplateEngine(); + const lines = []; + for (let i = 0; i < 500; i++) { + lines.push(`Line ${i}: {{ item${i} }}`); + } + const template = lines.join('\n'); + const data: Record = {}; + for (let i = 0; i < 500; i++) { + data[`item${i}`] = `value${i}`; + } + + const start = Date.now(); + const result = engine.render(template, data); + const elapsed = Date.now() - start; + + expect(result).toContain('value0'); + expect(result).toContain('value499'); + expect(elapsed).toBeLessThan(100); + }, 15000); +}); + +// ── Module Tests ────────────────────────────────────────────────────────────── + +describe('Template Engine - Module', () => { + it('AST nodes can be imported and used', () => { + const node: ASTNode = { type: 'Text', value: 'test', line: 1 }; + expect(node.type).toBe('Text'); + }); + + it('parse returns valid AST', () => { + const ast = parse(tokenize('Hello {{ name }}')); + expect(Array.isArray(ast)).toBe(true); + expect(ast.length).toBeGreaterThan(0); + }); + + it('parse handles multi-line templates correctly', () => { + const template = `Dear {{ user.name | uppercase }}, + +Your subscription {{ subscription.plan }} is {{# if subscription.active == true #}}active{{# else #}}inactive{{# /if #}}. + +{{# for item in items #}}- {{ item }} +{{# /for #}}`; + const ast = parse(tokenize(template)); + expect(ast.filter((n) => n.type === 'Text').length).toBeGreaterThan(0); + expect(ast.filter((n) => n.type === 'If').length).toBe(1); + expect(ast.filter((n) => n.type === 'For').length).toBe(1); + expect(ast.filter((n) => n.type === 'Variable').length).toBe(2); + }); +}); diff --git a/backend/services/notification/emailTemplateService.ts b/backend/services/notification/emailTemplateService.ts new file mode 100644 index 00000000..eef7c367 --- /dev/null +++ b/backend/services/notification/emailTemplateService.ts @@ -0,0 +1,90 @@ +import { TemplateEngine } from './template-engine/index'; +import { FilterContext } from './template-engine/filters/index'; + +export interface EmailTemplate { + subject: string; + body: string; +} + +export type TenantId = string; +export type UserId = string; +export type Locale = string; + +export interface TemplateVariable { + [key: string]: unknown; +} + +export class EmailTemplateService { + private engine: TemplateEngine; + private templates = new Map>(); + + constructor(partials?: Record) { + this.engine = new TemplateEngine({ partials }); + } + + register(tenantId: TenantId, templateName: string, locale: Locale, template: EmailTemplate): void { + const key = this.makeKey(tenantId, templateName); + if (!this.templates.has(key)) { + this.templates.set(key, new Map()); + } + this.templates.get(key)!.set(locale, template); + } + + remove(tenantId: TenantId, templateName: string, locale?: Locale): void { + const key = this.makeKey(tenantId, templateName); + if (locale) { + this.templates.get(key)?.delete(locale); + } else { + this.templates.delete(key); + } + } + + render( + tenantId: TenantId, + templateName: string, + locale: Locale, + variables: TemplateVariable, + filterContext?: FilterContext & { locale?: string } + ): EmailTemplate { + const key = this.makeKey(tenantId, templateName); + const localeMap = this.templates.get(key); + + if (!localeMap) { + throw new Error(`Template not found: ${templateName} for tenant ${tenantId}`); + } + + let template = localeMap.get(locale); + if (!template) { + const fallback = locale.split('-')[0]; + template = localeMap.get(fallback); + } + if (!template) { + template = localeMap.get('en'); + } + if (!template) { + throw new Error(`No template found for ${templateName} in locale ${locale}`); + } + + const ctx: FilterContext = { + locale: filterContext?.locale || locale, + now: filterContext?.now, + }; + + return { + subject: this.engine.render(template.subject, variables, ctx), + body: this.engine.render(template.body, variables, ctx), + }; + } + + registerPartial(name: string, template: string): void { + this.engine.registerPartial(name, template); + } + + clearCache(): void { + this.engine.clearCache(); + } + + private makeKey(tenantId: TenantId, templateName: string): string { + return `${tenantId}:${templateName}`; + } +} diff --git a/backend/services/notification/template-engine/ast/nodes.ts b/backend/services/notification/template-engine/ast/nodes.ts new file mode 100644 index 00000000..bb40cd5b --- /dev/null +++ b/backend/services/notification/template-engine/ast/nodes.ts @@ -0,0 +1,75 @@ +export type ASTNode = + | TextNode + | VariableNode + | IfNode + | ForNode + | FilterNode + | PartialNode; + +export interface TextNode { + type: 'Text'; + value: string; + line: number; +} + +export interface VariableNode { + type: 'Variable'; + path: string[]; + filters: FilterExpression[]; + line: number; +} + +export interface IfNode { + type: 'If'; + condition: ConditionExpression; + consequent: ASTNode[]; + alternate: ASTNode[]; + line: number; +} + +export interface ForNode { + type: 'For'; + item: string; + iterable: string[]; + body: ASTNode[]; + line: number; +} + +export interface FilterNode { + type: 'Filter'; + input: string[]; + name: string; + args: string[]; + line: number; +} + +export interface PartialNode { + type: 'Partial'; + name: string; + params: Record; + line: number; +} + +export interface FilterExpression { + name: string; + args: string[]; +} + +export type ConditionOperator = '==' | '!=' | '>' | '<' | '>=' | '<=' | '&&' | '||'; + +export interface ConditionExpression { + left: string; + operator: ConditionOperator; + right: string; +} + +export interface TemplateAST { + nodes: ASTNode[]; +} + +export interface TemplateError extends Error { + message: string; + line: number; + expectedToken?: string; + foundToken?: string; +} diff --git a/backend/services/notification/template-engine/compiler.ts b/backend/services/notification/template-engine/compiler.ts new file mode 100644 index 00000000..e55d174b --- /dev/null +++ b/backend/services/notification/template-engine/compiler.ts @@ -0,0 +1,267 @@ +import { ASTNode, TemplateAST } from './ast/nodes'; +import { resolveFilter, FilterContext } from './filters/index'; + +export interface CompiledTemplate { + render: (data: Record, context?: FilterContext) => string; +} + +interface RenderContext { + data: Record; + filterCtx?: FilterContext; + partials: Record) => string>; + loopCount: number; + recursionDepth: number; +} + +const MAX_LOOP_ITERATIONS = 100; +const MAX_RECURSION_DEPTH = 10; +const MAX_CACHE_SIZE = 100; + +class LRUCache { + private map = new Map(); + + constructor(private maxSize: number) {} + + get(key: K): V | undefined { + const value = this.map.get(key); + if (value !== undefined) { + this.map.delete(key); + this.map.set(key, value); + } + return value; + } + + set(key: K, value: V): void { + if (this.map.has(key)) { + this.map.delete(key); + } else if (this.map.size >= this.maxSize) { + const firstKey = this.map.keys().next().value; + this.map.delete(firstKey); + } + this.map.set(key, value); + } + + has(key: K): boolean { + return this.map.has(key); + } + + clear(): void { + this.map.clear(); + } +} + +const compilationCache = new LRUCache(MAX_CACHE_SIZE); + +export function compile( + template: string, + partials: Record = {} +): CompiledTemplate { + const cacheKey = template; + const cached = compilationCache.get(cacheKey); + if (cached) return cached; + + const compiledPartials: Record) => string> = {}; + for (const [name, partialTemplate] of Object.entries(partials)) { + compiledPartials[name] = (params: Record) => { + let result = partialTemplate; + for (const [key, value] of Object.entries(params)) { + result = result.replace(new RegExp(`\\{\\{\\s*${key}\\s*\\}\\}`, 'g'), value); + } + return result; + }; + } + + const renderFn = (data: Record, filterCtx?: FilterContext): string => { + const ctx: RenderContext = { + data, + filterCtx, + partials: compiledPartials, + loopCount: 0, + recursionDepth: 0, + }; + return ''; + }; + + const compiled: CompiledTemplate = { render: renderFn }; + compilationCache.set(cacheKey, compiled); + return compiled; +} + +export function renderAST( + ast: ASTNode[], + data: Record, + filterCtx?: FilterContext, + partials: Record) => string> = {} +): string { + const ctx: RenderContext = { + data, + filterCtx, + partials, + loopCount: 0, + recursionDepth: 0, + }; + return renderNodes(ast, ctx); +} + +function renderNodes(nodes: ASTNode[], ctx: RenderContext): string { + ctx.recursionDepth++; + if (ctx.recursionDepth > MAX_RECURSION_DEPTH) { + throw new Error(`Template render error: max recursion depth (${MAX_RECURSION_DEPTH}) exceeded`); + } + + try { + let output = ''; + for (const node of nodes) { + output += renderNode(node, ctx); + } + return output; + } finally { + ctx.recursionDepth--; + } +} + +function renderNode(node: ASTNode, ctx: RenderContext): string { + switch (node.type) { + case 'Text': + return node.value; + + case 'Variable': + return renderVariable(node.path, node.filters, ctx); + + case 'If': + return renderIf(node, ctx); + + case 'For': + return renderFor(node, ctx); + + case 'Filter': + return renderStandaloneFilter(node, ctx); + + case 'Partial': + return renderPartial(node, ctx); + + default: + return ''; + } +} + +function renderVariable( + path: string[], + filters: { name: string; args: string[] }[], + ctx: RenderContext +): string { + let value: unknown = ctx.data; + for (const segment of path) { + if (value === null || value === undefined) break; + value = (value as Record)[segment]; + } + + let result = value === null || value === undefined ? '' : String(value); + + for (const filter of filters) { + const fn = resolveFilter(filter.name); + if (fn) { + result = fn(result, filter.args, ctx.filterCtx); + } + } + + return result; +} + +function renderIf(node: ASTNode & { type: 'If' }, ctx: RenderContext): string { + const condition = evaluateCondition(node.condition, ctx); + if (condition) { + return renderNodes(node.consequent, ctx); + } + return renderNodes(node.alternate, ctx); +} + +function evaluateCondition( + cond: { left: string; operator: string; right: string }, + ctx: RenderContext +): boolean { + const left = resolveValue(cond.left, ctx); + const right = resolveValue(cond.right, ctx); + + switch (cond.operator) { + case '==': return left == right; + case '!=': return left != right; + case '>': return Number(left) > Number(right); + case '<': return Number(left) < Number(right); + case '>=': return Number(left) >= Number(right); + case '<=': return Number(left) <= Number(right); + case '&&': return Boolean(left) && Boolean(right); + case '||': return Boolean(left) || Boolean(right); + default: return false; + } +} + +function resolveValue(expr: string, ctx: RenderContext): unknown { + if (/^\d+$/.test(expr)) return Number(expr); + if (/^["'].*["']$/.test(expr)) return expr.replace(/^["']|["']$/g, ''); + if (expr === 'true') return true; + if (expr === 'false') return false; + if (expr === 'null') return null; + if (expr === 'undefined') return undefined; + + const parts = expr.split('.'); + let value: unknown = ctx.data; + for (const segment of parts) { + if (value === null || value === undefined) return undefined; + value = (value as Record)[segment]; + } + return value; +} + +function renderFor( + node: ASTNode & { type: 'For' }, + ctx: RenderContext +): string { + const iterablePath = node.iterable.join('.'); + const source = resolveValue(iterablePath, ctx); + const items: unknown[] = Array.isArray(source) ? source : []; + + let output = ''; + const itemName = node.item; + + for (let i = 0; i < items.length; i++) { + ctx.loopCount++; + if (ctx.loopCount > MAX_LOOP_ITERATIONS) { + throw new Error('Template render error: max loop iterations (100) exceeded'); + } + + const innerData = { ...ctx.data, [itemName]: items[i] }; + const innerCtx: RenderContext = { + ...ctx, + data: innerData, + }; + output += renderNodes(node.body, innerCtx); + } + + return output; +} + +function renderStandaloneFilter( + node: ASTNode & { type: 'Filter' }, + ctx: RenderContext +): string { + let value: unknown = node.input.length > 0 ? resolveValue(node.input.join('.'), ctx) : ''; + const fn = resolveFilter(node.name); + if (fn) { + return fn(value, node.args, ctx.filterCtx); + } + return String(value ?? ''); +} + +function renderPartial( + node: ASTNode & { type: 'Partial' }, + ctx: RenderContext +): string { + const partialFn = ctx.partials[node.name]; + if (!partialFn) { + return ``; + } + return partialFn(node.params); +} + +export { LRUCache, compilationCache, MAX_CACHE_SIZE }; diff --git a/backend/services/notification/template-engine/filters/index.ts b/backend/services/notification/template-engine/filters/index.ts new file mode 100644 index 00000000..9846e4e0 --- /dev/null +++ b/backend/services/notification/template-engine/filters/index.ts @@ -0,0 +1,95 @@ +export interface FilterContext { + locale?: string; + now?: Date; +} + +export type FilterFn = (value: unknown, args: string[], context?: FilterContext) => string; + +const DATE_PRESETS: Record = { + short: { month: 'short', day: 'numeric', year: 'numeric' }, + long: { weekday: 'long', month: 'long', day: 'numeric', year: 'numeric' }, + iso: { year: 'numeric', month: '2-digit', day: '2-digit' }, + time: { hour: '2-digit', minute: '2-digit' }, + datetime: { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }, + relative: {}, +}; + +export const builtinFilters: Record = { + uppercase: (value: unknown): string => String(value ?? '').toUpperCase(), + + lowercase: (value: unknown): string => String(value ?? '').toLowerCase(), + + date: (value: unknown, args: string[], context?: FilterContext): string => { + const preset = args[0] || 'short'; + const locale = args[1] || context?.locale || 'en-US'; + const input = value instanceof Date ? value : new Date(String(value ?? '')); + + if (isNaN(input.getTime())) { + return String(value ?? ''); + } + + if (preset === 'relative') { + const now = context?.now || new Date(); + const diffMs = now.getTime() - input.getTime(); + const diffSec = Math.floor(diffMs / 1000); + const diffMin = Math.floor(diffSec / 60); + const diffHour = Math.floor(diffMin / 60); + const diffDay = Math.floor(diffHour / 24); + const diffMonth = Math.floor(diffDay / 30); + + if (diffSec < 60) return 'just now'; + if (diffMin < 60) return `${diffMin}m ago`; + if (diffHour < 24) return `${diffHour}h ago`; + if (diffDay < 30) return `${diffDay}d ago`; + if (diffMonth < 12) return `${diffMonth}mo ago`; + return input.toLocaleDateString(locale, DATE_PRESETS['short']); + } + + const options = DATE_PRESETS[preset] || DATE_PRESETS['short']; + return input.toLocaleDateString(locale, options); + }, + + currency: (value: unknown, args: string[], context?: FilterContext): string => { + const currencyCode = args[0] || 'USD'; + const locale = args[1] || context?.locale || 'en-US'; + const num = typeof value === 'number' ? value : parseFloat(String(value ?? '0')); + + if (isNaN(num)) return String(value ?? ''); + + try { + return new Intl.NumberFormat(locale, { style: 'currency', currency: currencyCode }).format(num); + } catch { + return `${currencyCode} ${num.toFixed(2)}`; + } + }, + + pluralize: (value: unknown, args: string[]): string => { + const count = typeof value === 'number' ? value : parseInt(String(value ?? '0'), 10); + if (isNaN(count)) return String(value ?? ''); + const [singular, plural] = args.length >= 2 ? args : [args[0] || '', `${args[0] || ''}s`]; + return count === 1 ? `${count} ${singular}` : `${count} ${plural}`; + }, + + default: (value: unknown, args: string[]): string => { + const str = String(value ?? ''); + return str.trim() ? str : (args[0] || ''); + }, + + length: (value: unknown): string => { + if (Array.isArray(value)) return String(value.length); + return String(String(value ?? '').length); + }, + + trim: (value: unknown): string => String(value ?? '').trim(), + + truncate: (value: unknown, args: string[]): string => { + const maxLen = parseInt(args[0] || '80', 10); + const str = String(value ?? ''); + if (str.length <= maxLen) return str; + return str.slice(0, maxLen) + (args[1] || '...'); + }, +}; + +export function resolveFilter(name: string): FilterFn | undefined { + return builtinFilters[name.toLowerCase()]; +} diff --git a/backend/services/notification/template-engine/index.ts b/backend/services/notification/template-engine/index.ts new file mode 100644 index 00000000..9330b45f --- /dev/null +++ b/backend/services/notification/template-engine/index.ts @@ -0,0 +1,67 @@ +import { tokenize } from './lexer'; +import { parse } from './parser'; +import { compile, renderAST, CompiledTemplate } from './compiler'; +import { FilterContext } from './filters/index'; +import { ASTNode } from './ast/nodes'; + +export { ASTNode } from './ast/nodes'; +export { tokenize } from './lexer'; +export { parse } from './parser'; +export { compile, renderAST, CompiledTemplate, compilationCache, MAX_CACHE_SIZE } from './compiler'; +export { builtinFilters, resolveFilter, FilterContext, FilterFn } from './filters/index'; + +export interface TemplateEngineOptions { + partials?: Record; + cacheSize?: number; +} + +export class TemplateEngine { + private compiledTemplates = new Map(); + private partials: Record; + + constructor(options: TemplateEngineOptions = {}) { + this.partials = options.partials || {}; + } + + parse(template: string): ASTNode[] { + const tokens = tokenize(template); + return parse(tokens); + } + + compile(template: string): CompiledTemplate { + const cached = this.compiledTemplates.get(template); + if (cached) return cached; + + const compiled = compile(template, this.partials); + this.compiledTemplates.set(template, compiled); + return compiled; + } + + render(template: string, data: Record, context?: FilterContext): string { + const compiled = this.compile(template); + return compiled.render(data, context); + } + + renderAST(ast: ASTNode[], data: Record, context?: FilterContext): string { + const partialFns: Record) => string> = {}; + for (const [name, partialTemplate] of Object.entries(this.partials)) { + partialFns[name] = (params: Record) => { + let result = partialTemplate; + for (const [key, value] of Object.entries(params)) { + result = result.replace(new RegExp(`\\{\\{\\s*${key}\\s*\\}\\}`, 'g'), value); + } + return result; + }; + } + return renderAST(ast, data, context, partialFns); + } + + registerPartial(name: string, template: string): void { + this.partials[name] = template; + this.compiledTemplates.clear(); + } + + clearCache(): void { + this.compiledTemplates.clear(); + } +} diff --git a/backend/services/notification/template-engine/lexer.ts b/backend/services/notification/template-engine/lexer.ts new file mode 100644 index 00000000..e8ec4927 --- /dev/null +++ b/backend/services/notification/template-engine/lexer.ts @@ -0,0 +1,261 @@ +export type TokenType = + | 'TEXT' + | 'VAR_OPEN' + | 'VAR_CLOSE' + | 'IF_OPEN' + | 'ELSE' + | 'ENDIF' + | 'FOR_OPEN' + | 'ENDFOR' + | 'FILTER_PIPE' + | 'PARTIAL_OPEN' + | 'PARTIAL_CLOSE' + | 'IDENTIFIER' + | 'STRING' + | 'NUMBER' + | 'OPERATOR' + | 'DOT' + | 'LPAREN' + | 'RPAREN' + | 'EQUALS' + | 'COMMA' + | 'EOF'; + +export interface Token { + type: TokenType; + value: string; + line: number; + column: number; +} + +const KEYWORDS: Record = { + if: 'IF_OPEN', + else: 'ELSE', + endif: 'ENDIF', + endfor: 'ENDFOR', + for: 'FOR_OPEN', + end: 'EOF', +}; + +const OPERATORS = new Set(['==', '!=', '>=', '<=', '>', '<', '&&', '||']); + +export function tokenize(template: string): Token[] { + const tokens: Token[] = []; + const lines = template.split('\n'); + + for (let lineIdx = 0; lineIdx < lines.length; lineIdx++) { + const line = lines[lineIdx]; + const lineNumber = lineIdx + 1; + let col = 0; + + while (col < line.length) { + const ch = line[col]; + + if (ch === '{' && line[col + 1] === '{') { + col += 2; + let varContent = ''; + let braceDepth = 1; + while (col < line.length && braceDepth > 0) { + if (line[col] === '{' && line[col + 1] === '{') { + braceDepth++; + varContent += '{{'; + col += 2; + continue; + } + if (line[col] === '}' && line[col + 1] === '}') { + braceDepth--; + if (braceDepth === 0) { + col += 2; + break; + } + varContent += '}}'; + col += 2; + continue; + } + varContent += line[col]; + col++; + } + + if (braceDepth !== 0) { + throw makeError( + `Unclosed variable tag: expected }}`, + lineNumber, + '}}', + 'EOF' + ); + } + + const trimmed = varContent.trim(); + + if (trimmed.startsWith('%')) { + tokens.push({ type: 'PARTIAL_OPEN', value: '%', line: lineNumber, column: col }); + const rest = trimmed.slice(1).trim(); + if (rest) { + tokens.push({ type: 'IDENTIFIER', value: rest, line: lineNumber, column: col }); + } + tokens.push({ type: 'PARTIAL_CLOSE', value: '%', line: lineNumber, column: col }); + continue; + } + + if (trimmed.startsWith('#') && trimmed.endsWith('#')) { + const keyword = trimmed.slice(1, -1).trim(); + const lower = keyword.toLowerCase(); + + if (lower === 'else') { + tokens.push({ type: 'ELSE', value: 'else', line: lineNumber, column: col }); + continue; + } + + if (lower === '/if' || lower === 'endif') { + tokens.push({ type: 'ENDIF', value: 'endif', line: lineNumber, column: col }); + continue; + } + + if (lower === '/for' || lower === 'endfor') { + tokens.push({ type: 'ENDFOR', value: 'endfor', line: lineNumber, column: col }); + continue; + } + + if (lower.startsWith('for ')) { + tokens.push({ type: 'FOR_OPEN', value: 'for', line: lineNumber, column: col }); + const parts = lower.slice(4).trim().split(/\s+in\s+/); + if (parts.length === 2) { + tokens.push({ type: 'IDENTIFIER', value: parts[0].trim(), line: lineNumber, column: col }); + tokens.push({ type: 'IDENTIFIER', value: parts[1].trim(), line: lineNumber, column: col }); + } + continue; + } + + if (lower.startsWith('if ')) { + tokens.push({ type: 'IF_OPEN', value: 'if', line: lineNumber, column: col }); + const cond = lower.slice(3).trim(); + tokens.push(...tokenizeCondition(cond, lineNumber, col)); + continue; + } + + tokens.push({ type: 'IDENTIFIER', value: keyword, line: lineNumber, column: col }); + continue; + } + + const parts = trimmed.split('|'); + const varPart = parts[0].trim(); + if (varPart) { + tokens.push({ type: 'VAR_OPEN', value: '{{', line: lineNumber, column: col }); + const varTokens = tokenizeVariable(varPart, lineNumber, col); + tokens.push(...varTokens); + tokens.push({ type: 'VAR_CLOSE', value: '}}', line: lineNumber, column: col }); + } + + for (let f = 1; f < parts.length; f++) { + const filterExpr = parts[f].trim(); + tokens.push({ type: 'FILTER_PIPE', value: '|', line: lineNumber, column: col }); + const filterTokens = tokenizeFilter(filterExpr, lineNumber, col); + tokens.push(...filterTokens); + } + continue; + } + + let text = ''; + while (col < line.length && !(line[col] === '{' && line[col + 1] === '{')) { + text += line[col]; + col++; + } + if (text.length > 0) { + tokens.push({ type: 'TEXT', value: text, line: lineNumber, column: col - text.length }); + } + } + } + + tokens.push({ type: 'EOF', value: '', line: lines.length, column: 0 }); + return tokens; +} + +function tokenizeVariable(expr: string, line: number, col: number): Token[] { + const tokens: Token[] = []; + const parts = expr.split('.'); + for (let i = 0; i < parts.length; i++) { + if (i > 0) { + tokens.push({ type: 'DOT', value: '.', line, column: col }); + } + tokens.push({ type: 'IDENTIFIER', value: parts[i].trim(), line, column: col }); + } + return tokens; +} + +function tokenizeFilter(expr: string, line: number, col: number): Token[] { + const tokens: Token[] = []; + const parenIdx = expr.indexOf('('); + if (parenIdx >= 0) { + const name = expr.slice(0, parenIdx).trim(); + tokens.push({ type: 'IDENTIFIER', value: name, line, column: col }); + tokens.push({ type: 'LPAREN', value: '(', line, column: col }); + const argsStr = expr.slice(parenIdx + 1, expr.lastIndexOf(')')).trim(); + if (argsStr) { + const args = argsStr.split(',').map((a) => a.trim().replace(/^["']|["']$/g, '')); + for (let i = 0; i < args.length; i++) { + if (i > 0) tokens.push({ type: 'COMMA', value: ',', line, column: col }); + tokens.push({ type: 'STRING', value: args[i], line, column: col }); + } + } + tokens.push({ type: 'RPAREN', value: ')', line, column: col }); + } else { + tokens.push({ type: 'IDENTIFIER', value: expr, line, column: col }); + } + return tokens; +} + +function tokenizeCondition(cond: string, line: number, col: number): Token[] { + const tokens: Token[] = []; + const operatorMatch = OPERATORS.values(); + let foundOp = ''; + for (const op of OPERATORS) { + if (cond.includes(op)) { + foundOp = op; + break; + } + } + operatorMatch: void 0; + + if (foundOp) { + const parts = cond.split(foundOp); + const left = parts[0].trim(); + const right = parts.slice(1).join(foundOp).trim(); + + if (left) { + tokens.push({ type: 'IDENTIFIER', value: left, line, column: col }); + } else { + const numMatch = left.match(/^\d+/); + if (numMatch) tokens.push({ type: 'NUMBER', value: numMatch[0], line, column: col }); + else tokens.push({ type: 'STRING', value: left.replace(/^["']|["']$/g, ''), line, column: col }); + } + tokens.push({ type: 'OPERATOR', value: foundOp, line, column: col }); + if (right) { + const trimmedRight = right.trim(); + if (/^\d+$/.test(trimmedRight)) { + tokens.push({ type: 'NUMBER', value: trimmedRight, line, column: col }); + } else if (/^["'].*["']$/.test(trimmedRight)) { + tokens.push({ type: 'STRING', value: trimmedRight.replace(/^["']|["']$/g, ''), line, column: col }); + } else { + tokens.push({ type: 'IDENTIFIER', value: trimmedRight, line, column: col }); + } + } + } else { + tokens.push({ type: 'IDENTIFIER', value: cond, line, column: col }); + } + return tokens; +} + +export function makeError( + message: string, + line: number, + expected?: string, + found?: string +): Error & { line: number; expectedToken?: string; foundToken?: string } { + const err = new Error( + `Template error at line ${line}: ${message}${expected ? ` (expected ${expected})` : ''}${found ? ` (found ${found})` : ''}` + ) as Error & { line: number; expectedToken?: string; foundToken?: string }; + err.line = line; + err.expectedToken = expected; + err.foundToken = found; + return err; +} diff --git a/backend/services/notification/template-engine/parser.ts b/backend/services/notification/template-engine/parser.ts new file mode 100644 index 00000000..d25fb64f --- /dev/null +++ b/backend/services/notification/template-engine/parser.ts @@ -0,0 +1,387 @@ +import { Token, TokenType } from './lexer'; +import { + ASTNode, + ConditionExpression, + ConditionOperator, + FilterExpression, + IfNode, +} from './ast/nodes'; + +const OPERATOR_MAP: Record = { + '==': '==', + '!=': '!=', + '>': '>', + '<': '<', + '>=': '>=', + '<=': '<=', + '&&': '&&', + '||': '||', +}; + +export function parse(tokens: Token[]): ASTNode[] { + return parseNodes(tokens, 0).nodes; +} + +interface ParseResult { + nodes: ASTNode[]; + pos: number; +} + +function parseNodes(tokens: Token[], start: number): ParseResult { + const nodes: ASTNode[] = []; + let pos = start; + + while (pos < tokens.length) { + const token = tokens[pos]; + + switch (token.type) { + case 'TEXT': + nodes.push({ + type: 'Text', + value: token.value, + line: token.line, + }); + pos++; + break; + + case 'VAR_OPEN': + pos++; + const varResult = parseVariable(tokens, pos, token.line); + nodes.push(...varResult.nodes); + pos = varResult.pos + 1; + break; + + case 'FILTER_PIPE': + pos++; + const filter = parseSingleFilter(tokens, pos, token.line); + nodes.push(filter.node); + pos = filter.pos; + break; + + case 'PARTIAL_OPEN': + pos++; + const partial = parsePartial(tokens, pos, token.line); + nodes.push(partial.node); + pos = partial.pos + 1; + break; + + case 'IF_OPEN': + const ifResult = parseIf(tokens, pos, token.line); + nodes.push(ifResult.node); + pos = ifResult.pos; + break; + + case 'FOR_OPEN': + pos++; + const forResult = parseFor(tokens, pos, token.line); + nodes.push(forResult.node); + pos = forResult.pos; + break; + + case 'ELSE': + case 'ENDIF': + case 'ENDFOR': + case 'EOF': + return { nodes, pos }; + + default: + throw makeParseError( + `Unexpected token: ${token.type} (${token.value})`, + token.line, + 'expression', + token.type + ); + } + } + + return { nodes, pos }; +} + +function parseVariable( + tokens: Token[], + start: number, + line: number +): ParseResult { + const path: string[] = []; + const filters: FilterExpression[] = []; + let pos = start; + + while (pos < tokens.length) { + const token = tokens[pos]; + + if (token.type === 'IDENTIFIER') { + path.push(token.value); + pos++; + } else if (token.type === 'DOT') { + pos++; + } else if (token.type === 'FILTER_PIPE') { + pos++; + const fResult = parseFilterExpression(tokens, pos, token.line); + filters.push(...fResult.filters); + pos = fResult.pos; + } else if (token.type === 'VAR_CLOSE') { + break; + } else { + throw makeParseError( + `Unexpected token in variable: ${token.type}`, + token.line, + 'IDENTIFIER or FILTER', + token.type + ); + } + } + + const node: ASTNode = path.length > 0 + ? { type: 'Variable', path, filters, line } + : { type: 'Text', value: '', line }; + + return { nodes: [node], pos }; +} + +function parseFilterExpression( + tokens: Token[], + start: number, + line: number +): { filters: FilterExpression[]; pos: number } { + const filters: FilterExpression[] = []; + let pos = start; + + while (pos < tokens.length) { + const token = tokens[pos]; + if (token.type === 'IDENTIFIER') { + const name = token.value; + pos++; + const args: string[] = []; + + if (pos < tokens.length && tokens[pos].type === 'LPAREN') { + pos++; + while (pos < tokens.length && tokens[pos].type !== 'RPAREN') { + if (tokens[pos].type === 'STRING' || tokens[pos].type === 'NUMBER') { + args.push(tokens[pos].value); + } + pos++; + } + pos++; + } + + filters.push({ name, args }); + } else { + break; + } + + if (pos < tokens.length && tokens[pos].type === 'FILTER_PIPE') { + pos++; + } else { + break; + } + } + + return { filters, pos }; +} + +function parseSingleFilter( + tokens: Token[], + start: number, + line: number +): { node: ASTNode; pos: number } { + const name = tokens[start].type === 'IDENTIFIER' ? tokens[start].value : ''; + let pos = start + 1; + const args: string[] = []; + + if (pos < tokens.length && tokens[pos].type === 'LPAREN') { + pos++; + while (pos < tokens.length && tokens[pos].type !== 'RPAREN') { + if (tokens[pos].type === 'STRING' || tokens[pos].type === 'NUMBER') { + args.push(tokens[pos].value); + } + pos++; + } + pos++; + } + + return { + node: { type: 'Filter', input: [], name, args, line }, + pos, + }; +} + +function parsePartial( + tokens: Token[], + start: number, + line: number +): { node: ASTNode; pos: number } { + let pos = start; + let name = ''; + const params: Record = {}; + + while (pos < tokens.length && tokens[pos].type !== 'PARTIAL_CLOSE') { + const token = tokens[pos]; + if (token.type === 'IDENTIFIER') { + if (!name) { + name = token.value; + } else { + const key = token.value; + pos++; + if (pos < tokens.length && tokens[pos].type === 'EQUALS') { + pos++; + if (pos < tokens.length && (tokens[pos].type === 'STRING' || tokens[pos].type === 'IDENTIFIER')) { + params[key] = tokens[pos].value; + } + } + } + } + pos++; + } + + return { + node: { type: 'Partial', name, params, line }, + pos, + }; +} + +function parseIf( + tokens: Token[], + start: number, + line: number +): { node: ASTNode; pos: number } { + let pos = start + 1; + const condition = parseCondition(tokens, pos, line); + pos = condition.pos; + + const consequentResult = parseNodes(tokens, pos); + const consequentNodes = consequentResult.nodes; + pos = consequentResult.pos; + + const alternateNodes: ASTNode[] = []; + + if (pos < tokens.length && tokens[pos].type === 'ELSE') { + pos++; + const altResult = parseNodes(tokens, pos); + alternateNodes.push(...altResult.nodes); + pos = altResult.pos; + } + + if (pos < tokens.length && tokens[pos].type === 'ENDIF') { + pos++; + } + + return { + node: { + type: 'If', + condition, + consequent: consequentNodes, + alternate: alternateNodes, + line, + }, + pos, + }; +} + +function parseCondition( + tokens: Token[], + start: number, + line: number +): { condition: ConditionExpression; pos: number } { + let pos = start; + let left = ''; + let operator: ConditionOperator = '=='; + let right = ''; + + while (pos < tokens.length) { + const token = tokens[pos]; + + if (token.type === 'VAR_CLOSE' || token.type === 'EOF' || + token.type === 'TEXT' || token.type === 'ELSE' || + token.type === 'ENDIF' || token.type === 'ENDFOR') { + break; + } + + if (token.type === 'IDENTIFIER' || token.type === 'STRING' || token.type === 'NUMBER') { + if (!left) { + left = token.value; + pos++; + } else if (operator && !right) { + right = token.value; + pos++; + } else { + pos++; + } + } else if (token.type === 'OPERATOR') { + operator = OPERATOR_MAP[token.value] || '=='; + pos++; + } else { + pos++; + } + } + + return { condition: { left, operator, right }, pos }; +} + +function parseFor( + tokens: Token[], + start: number, + line: number +): { node: ASTNode; pos: number } { + let pos = start; + + let item = ''; + const iterable: string[] = []; + + while (pos < tokens.length) { + const token = tokens[pos]; + + if (token.type === 'ENDFOR' || token.type === 'EOF') { + break; + } + + if (token.type === 'IDENTIFIER') { + if (!item) { + item = token.value; + } else { + iterable.push(token.value); + } + } + + pos++; + if (pos >= tokens.length) break; + + if (tokens[pos].type === 'ENDIF' || tokens[pos].type === 'EOF' || + tokens[pos].type === 'ENDFOR') { + break; + } + + if (item && iterable.length > 0) { + break; + } + } + + const bodyResult = parseNodes(tokens, pos); + const bodyNodes = bodyResult.nodes; + pos = bodyResult.pos; + + if (pos < tokens.length && tokens[pos].type === 'ENDFOR') { + pos++; + } + + return { + node: { + type: 'For', + item, + iterable, + body: bodyNodes, + line, + }, + pos, + }; +} + +import { makeError } from './lexer'; + +function makeParseError( + message: string, + line: number, + expected?: string, + found?: string +): Error & { line: number; expectedToken?: string; foundToken?: string } { + return makeError(`Parse error: ${message}`, line, expected, found); +} diff --git a/backend/services/notification/webhook.ts b/backend/services/notification/webhook.ts index c09525bc..100e43f0 100644 --- a/backend/services/notification/webhook.ts +++ b/backend/services/notification/webhook.ts @@ -14,6 +14,11 @@ export type { WebhookEventInput } from '../../../src/types/webhook'; type FetchLike = typeof fetch; +export interface RedisStreamClient { + xadd: (key: string, ...args: string[]) => Promise; + xlen: (key: string) => Promise; +} + export interface RegisterWebhookInput { merchantId: string; url: string; @@ -37,6 +42,8 @@ const DEFAULT_RETRY_POLICY: WebhookRetryPolicy = { backoffFactor: 2, }; +const REDIS_STREAM_KEY = process.env.REDIS_STREAM_KEY || 'webhook:events'; + const now = (): number => Date.now(); const createId = (prefix: string): string => @@ -92,14 +99,20 @@ export const isWebhookEventAllowed = ( export class WebhookDeliveryService { private readonly fetchImpl: FetchLike; private readonly sleepImpl: (ms: number) => Promise; + private readonly redisClient: RedisStreamClient | null; private readonly webhooks = new Map(); private readonly deliveries = new Map(); private readonly deliveredKeys = new Set(); private readonly rateLimitWindows = new Map(); - constructor(options: { fetchImpl?: FetchLike; sleepImpl?: (ms: number) => Promise } = {}) { + constructor(options: { + fetchImpl?: FetchLike; + sleepImpl?: (ms: number) => Promise; + redisClient?: RedisStreamClient; + } = {}) { this.fetchImpl = options.fetchImpl ?? fetch; this.sleepImpl = options.sleepImpl ?? sleep; + this.redisClient = options.redisClient ?? null; } registerWebhook(input: RegisterWebhookInput): WebhookConfig { @@ -319,6 +332,13 @@ export class WebhookDeliveryService { }; this.deliveries.set(delivery.id, delivery); + + if (this.redisClient) { + await this.enqueueToRedis(delivery); + this.deliveredKeys.add(idempotencyKey); + return { delivery }; + } + const result = await this.sendWithRetry(webhook, delivery); this.deliveries.set(delivery.id, result.delivery); @@ -328,6 +348,33 @@ export class WebhookDeliveryService { return result; } + private async enqueueToRedis(delivery: WebhookDelivery): Promise { + if (!this.redisClient) return; + + const payload = JSON.stringify(delivery.payload); + if (Buffer.byteLength(payload, 'utf8') > MAX_PAYLOAD_BYTES) { + throw new Error('Payload exceeds 1MB limit'); + } + + await this.redisClient.xadd( + REDIS_STREAM_KEY, + '*', + 'eventId', delivery.eventId, + 'webhookId', delivery.webhookId, + 'url', delivery.url, + 'eventType', delivery.eventType, + 'payload', payload, + 'signature', delivery.signature, + 'idempotencyKey', delivery.idempotencyKey, + 'headers', JSON.stringify({ + 'X-SubTrackr-Signature': delivery.signature, + 'X-SubTrackr-Event-Type': delivery.eventType, + 'X-SubTrackr-Event-Id': delivery.eventId, + 'X-SubTrackr-Idempotency-Key': delivery.idempotencyKey, + }), + ); + } + async retryWebhookDelivery(deliveryId: string): Promise { const existing = this.deliveries.get(deliveryId); if (!existing) throw new Error(`Delivery ${deliveryId} not found`); diff --git a/backend/services/subscription/__tests__/state-machine.test.ts b/backend/services/subscription/__tests__/state-machine.test.ts new file mode 100644 index 00000000..fa4968aa --- /dev/null +++ b/backend/services/subscription/__tests__/state-machine.test.ts @@ -0,0 +1,483 @@ +import { + SubscriptionStateMachine, + getAllStates, + getParentState, + isChildOf, +} from '../../state-machine/index'; +import { + SubscriptionParentState, + SubscriptionChildState, +} from '../../state-machine/states'; +import { TransitionEdge, TRANSITION_MATRIX } from '../../state-machine/guards/index'; +import { entryActions, exitActions } from '../../state-machine/actions/index'; + +// ── State Hierarchy ─────────────────────────────────────────────────────────── + +describe('State Hierarchy', () => { + it('defines all parent states', () => { + expect(SubscriptionParentState.ACTIVE).toBe('Active'); + expect(SubscriptionParentState.INACTIVE).toBe('Inactive'); + expect(SubscriptionParentState.SUSPENDED).toBe('Suspended'); + }); + + it('defines all child states', () => { + expect(SubscriptionChildState.TRIAL).toBe('Active.Trial'); + expect(SubscriptionChildState.PAID).toBe('Active.Paid'); + expect(SubscriptionChildState.PAST_DUE).toBe('Active.PastDue'); + expect(SubscriptionChildState.CANCELLED).toBe('Inactive.Cancelled'); + expect(SubscriptionChildState.PAUSED_END_OF_CYCLE).toBe('Inactive.PausedEndOfCycle'); + expect(SubscriptionChildState.EXPIRED).toBe('Inactive.Expired'); + expect(SubscriptionChildState.FRAUD_HOLD).toBe('Suspended.FraudHold'); + expect(SubscriptionChildState.ADMIN_HOLD).toBe('Suspended.AdminHold'); + }); + + it('getParentState returns correct parent', () => { + expect(getParentState(SubscriptionChildState.TRIAL)).toBe(SubscriptionParentState.ACTIVE); + expect(getParentState(SubscriptionChildState.PAID)).toBe(SubscriptionParentState.ACTIVE); + expect(getParentState(SubscriptionChildState.CANCELLED)).toBe(SubscriptionParentState.INACTIVE); + expect(getParentState(SubscriptionChildState.FRAUD_HOLD)).toBe(SubscriptionParentState.SUSPENDED); + }); + + it('getParentState returns same for parent states', () => { + expect(getParentState(SubscriptionParentState.ACTIVE)).toBe(SubscriptionParentState.ACTIVE); + expect(getParentState(SubscriptionParentState.INACTIVE)).toBe(SubscriptionParentState.INACTIVE); + }); + + it('isChildOf correctly identifies parent-child relationship', () => { + expect(isChildOf(SubscriptionChildState.TRIAL, SubscriptionParentState.ACTIVE)).toBe(true); + expect(isChildOf(SubscriptionChildState.PAID, SubscriptionParentState.ACTIVE)).toBe(true); + expect(isChildOf(SubscriptionChildState.CANCELLED, SubscriptionParentState.INACTIVE)).toBe(true); + expect(isChildOf(SubscriptionChildState.TRIAL, SubscriptionParentState.INACTIVE)).toBe(false); + }); + + it('getAllStates returns all 11 states', () => { + const states = getAllStates(); + expect(states.length).toBe(11); + }); +}); + +// ── Transition Matrix ───────────────────────────────────────────────────────── + +describe('Transition Matrix', () => { + it('has all 12 transition edges', () => { + const edges = Object.keys(TRANSITION_MATRIX) as TransitionEdge[]; + expect(edges.length).toBe(12); + expect(edges).toContain('cancel'); + expect(edges).toContain('pause'); + expect(edges).toContain('resume'); + expect(edges).toContain('upgrade'); + expect(edges).toContain('downgrade'); + expect(edges).toContain('suspend_fraud'); + expect(edges).toContain('suspend_admin'); + expect(edges).toContain('unsuspend'); + expect(edges).toContain('expire'); + expect(edges).toContain('trial_to_paid'); + expect(edges).toContain('payment_fail'); + expect(edges).toContain('payment_recover'); + }); + + it('every edge has a target state and source states', () => { + for (const [edge, matrix] of Object.entries(TRANSITION_MATRIX)) { + expect(matrix.from.length).toBeGreaterThan(0); + expect(matrix.to).toBeDefined(); + } + }); +}); + +// ── Transition Guards ───────────────────────────────────────────────────────── + +describe('Transition Guards', () => { + let machine: SubscriptionStateMachine; + + beforeEach(() => { + machine = new SubscriptionStateMachine(); + }); + + it('allows cancel from Active states', () => { + const states = [SubscriptionChildState.TRIAL, SubscriptionChildState.PAID, SubscriptionChildState.PAST_DUE, SubscriptionParentState.ACTIVE]; + for (const state of states) { + machine.setState('sub_1', state); + const result = machine.validateTransition('sub_1', 'cancel'); + expect(result.allowed).toBe(true); + } + }); + + it('allows cancel from PausedEndOfCycle', () => { + machine.setState('sub_1', SubscriptionChildState.PAUSED_END_OF_CYCLE); + const result = machine.validateTransition('sub_1', 'cancel'); + expect(result.allowed).toBe(true); + }); + + it('denies cancel from Suspended states', () => { + for (const state of [SubscriptionChildState.FRAUD_HOLD, SubscriptionChildState.ADMIN_HOLD]) { + machine.setState('sub_1', state); + const result = machine.validateTransition('sub_1', 'cancel'); + expect(result.allowed).toBe(false); + } + }); + + it('allows pause from Active states', () => { + for (const state of [SubscriptionChildState.TRIAL, SubscriptionChildState.PAID, SubscriptionChildState.PAST_DUE]) { + machine.setState('sub_1', state); + const result = machine.validateTransition('sub_1', 'pause'); + expect(result.allowed).toBe(true); + } + }); + + it('denies pause from Inactive', () => { + machine.setState('sub_1', SubscriptionChildState.CANCELLED); + const result = machine.validateTransition('sub_1', 'pause'); + expect(result.allowed).toBe(false); + }); + + it('allows resume from PausedEndOfCycle', () => { + machine.setState('sub_1', SubscriptionChildState.PAUSED_END_OF_CYCLE); + const result = machine.validateTransition('sub_1', 'resume'); + expect(result.allowed).toBe(true); + }); + + it('denies resume from Active', () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const result = machine.validateTransition('sub_1', 'resume'); + expect(result.allowed).toBe(false); + }); + + it('allows upgrade from Paid and Trial', () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + expect(machine.validateTransition('sub_1', 'upgrade').allowed).toBe(true); + + machine.setState('sub_1', SubscriptionChildState.TRIAL); + expect(machine.validateTransition('sub_1', 'upgrade').allowed).toBe(true); + }); + + it('denies downgrade from Trial', () => { + machine.setState('sub_1', SubscriptionChildState.TRIAL); + const result = machine.validateTransition('sub_1', 'downgrade'); + expect(result.allowed).toBe(false); + }); + + it('allows suspend_fraud from Active states', () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const result = machine.validateTransition('sub_1', 'suspend_fraud'); + expect(result.allowed).toBe(true); + }); + + it('allows unsuspend from FraudHold', () => { + machine.setState('sub_1', SubscriptionChildState.FRAUD_HOLD); + const result = machine.validateTransition('sub_1', 'unsuspend'); + expect(result.allowed).toBe(true); + }); + + it('returns valid transitions on invalid attempt', () => { + machine.setState('sub_1', SubscriptionChildState.CANCELLED); + const result = machine.validateTransition('sub_1', 'upgrade'); + expect(result.allowed).toBe(false); + expect(result.validTransitions).toBeDefined(); + expect(result.validTransitions!.length).toBeGreaterThan(0); + }); +}); + +// ── State Machine Transitions ───────────────────────────────────────────────── + +describe('State Machine - Transitions', () => { + let machine: SubscriptionStateMachine; + + beforeEach(() => { + machine = new SubscriptionStateMachine(); + }); + + it('defaults to Active.Trial', () => { + expect(machine.getState('sub_1')).toBe(SubscriptionChildState.TRIAL); + }); + + it('accepts initial states via constructor', () => { + const initial = new Map(); + initial.set('sub_1', SubscriptionChildState.PAID); + const m2 = new SubscriptionStateMachine(initial as Map); + expect(m2.getState('sub_1')).toBe(SubscriptionChildState.PAID); + }); + + it('transitions from Trial to Paid via trial_to_paid', async () => { + machine.setState('sub_1', SubscriptionChildState.TRIAL); + const result = await machine.transition('sub_1', 'trial_to_paid'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.PAID); + }); + + it('transitions from Paid to PastDue via payment_fail', async () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const result = await machine.transition('sub_1', 'payment_fail'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.PAST_DUE); + }); + + it('transitions from PastDue back to Paid via payment_recover', async () => { + machine.setState('sub_1', SubscriptionChildState.PAST_DUE); + const result = await machine.transition('sub_1', 'payment_recover'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.PAID); + }); + + it('transitions from Paid to Cancelled via cancel', async () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const result = await machine.transition('sub_1', 'cancel'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.CANCELLED); + }); + + it('transitions from Paid to PausedEndOfCycle via pause', async () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const result = await machine.transition('sub_1', 'pause'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.PAUSED_END_OF_CYCLE); + }); + + it('transitions from PausedEndOfCycle to Paid via resume', async () => { + machine.setState('sub_1', SubscriptionChildState.PAUSED_END_OF_CYCLE); + const result = await machine.transition('sub_1', 'resume'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.PAID); + }); + + it('transitions from Paid to FraudHold via suspend_fraud', async () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const result = await machine.transition('sub_1', 'suspend_fraud'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.FRAUD_HOLD); + }); + + it('transitions from FraudHold to Paid via unsuspend', async () => { + machine.setState('sub_1', SubscriptionChildState.FRAUD_HOLD); + const result = await machine.transition('sub_1', 'unsuspend'); + expect(result.allowed).toBe(true); + expect(result.newState).toBe(SubscriptionChildState.PAID); + }); + + it('returns error for invalid transition', async () => { + machine.setState('sub_1', SubscriptionChildState.CANCELLED); + const result = await machine.transition('sub_1', 'pause'); + expect(result.allowed).toBe(false); + expect(result.error).toBeDefined(); + }); + + it('default initial state is Active.Trial', () => { + expect(machine.getState('sub_new')).toBe(SubscriptionChildState.TRIAL); + }); +}); + +// ── State History ───────────────────────────────────────────────────────────── + +describe('State Machine - History', () => { + let machine: SubscriptionStateMachine; + + beforeEach(() => { + machine = new SubscriptionStateMachine(); + }); + + it('records transition history', async () => { + machine.setState('sub_1', SubscriptionChildState.TRIAL); + await machine.transition('sub_1', 'trial_to_paid', { actor: 'system', reason: 'Trial ended' }); + await machine.transition('sub_1', 'pause', { actor: 'user', reason: 'Vacation' }); + + const history = machine.getHistory('sub_1'); + expect(history.length).toBe(2); + expect(history[0].from).toBe(SubscriptionChildState.TRIAL); + expect(history[0].to).toBe(SubscriptionChildState.PAID); + expect(history[0].actor).toBe('system'); + expect(history[0].reason).toBe('Trial ended'); + + expect(history[1].from).toBe(SubscriptionChildState.PAID); + expect(history[1].to).toBe(SubscriptionChildState.PAUSED_END_OF_CYCLE); + expect(history[1].actor).toBe('user'); + }); + + it('tracks history for multiple subscriptions separately', async () => { + machine.setState('sub_1', SubscriptionChildState.TRIAL); + machine.setState('sub_2', SubscriptionChildState.PAID); + + await machine.transition('sub_1', 'trial_to_paid'); + await machine.transition('sub_2', 'cancel'); + + expect(machine.getHistory('sub_1').length).toBe(1); + expect(machine.getHistory('sub_2').length).toBe(1); + }); + + it('returns empty history for unknown subscription', () => { + expect(machine.getHistory('unknown')).toEqual([]); + }); +}); + +// ── State Actions ───────────────────────────────────────────────────────────── + +describe('State Machine - Actions', () => { + it('has entry actions for key states', () => { + expect(entryActions['Active.Trial']).toBeDefined(); + expect(entryActions['Active.Paid']).toBeDefined(); + expect(entryActions['Active.PastDue']).toBeDefined(); + expect(entryActions['Inactive.Cancelled']).toBeDefined(); + expect(entryActions['Inactive.PausedEndOfCycle']).toBeDefined(); + expect(entryActions['Suspended.FraudHold']).toBeDefined(); + expect(entryActions['Suspended.AdminHold']).toBeDefined(); + }); + + it('entry actions for Cancelled include stopBilling and revokeAccess', () => { + const actions = entryActions['Inactive.Cancelled']; + expect(actions).toBeDefined(); + expect(actions!.map((a) => a.name)).toContain('stopBilling'); + expect(actions!.map((a) => a.name)).toContain('revokeAccess'); + }); + + it('exit actions exist for specific states', () => { + expect(exitActions['Active.Trial']).toBeDefined(); + expect(exitActions['Active.PastDue']).toBeDefined(); + expect(exitActions['Suspended.FraudHold']).toBeDefined(); + }); + + it('actions execute without throwing', async () => { + const ctx = { + subscriptionId: 'sub_1', + previousState: 'Active.Paid' as any, + newState: 'Inactive.Cancelled' as any, + timestamp: Date.now(), + }; + for (const action of entryActions['Inactive.Cancelled'] || []) { + await expect(action.execute(ctx)).resolves.toBeUndefined(); + } + }); +}); + +// ── Valid Transitions ───────────────────────────────────────────────────────── + +describe('State Machine - Valid Transitions', () => { + let machine: SubscriptionStateMachine; + + beforeEach(() => { + machine = new SubscriptionStateMachine(); + }); + + it('returns valid transitions for Active.Trial', () => { + machine.setState('sub_1', SubscriptionChildState.TRIAL); + const valid = machine.getValidTransitions(SubscriptionChildState.TRIAL); + expect(valid).toContain('cancel'); + expect(valid).toContain('pause'); + expect(valid).toContain('upgrade'); + expect(valid).toContain('trial_to_paid'); + expect(valid).toContain('suspend_fraud'); + expect(valid).toContain('suspend_admin'); + expect(valid).toContain('expire'); + }); + + it('returns valid transitions for Active.Paid', () => { + machine.setState('sub_1', SubscriptionChildState.PAID); + const valid = machine.getValidTransitions(SubscriptionChildState.PAID); + expect(valid).toContain('cancel'); + expect(valid).toContain('pause'); + expect(valid).toContain('upgrade'); + expect(valid).toContain('downgrade'); + expect(valid).toContain('payment_fail'); + expect(valid).toContain('suspend_fraud'); + expect(valid).toContain('suspend_admin'); + expect(valid).toContain('expire'); + }); + + it('returns valid transitions for Inactive.Cancelled', () => { + const valid = machine.getValidTransitions(SubscriptionChildState.CANCELLED); + expect(valid.length).toBe(0); + }); + + it('returns valid transitions for PausedEndOfCycle', () => { + const valid = machine.getValidTransitions(SubscriptionChildState.PAUSED_END_OF_CYCLE); + expect(valid).toContain('cancel'); + expect(valid).toContain('resume'); + expect(valid).toContain('expire'); + expect(valid).toContain('suspend_fraud'); + expect(valid).toContain('suspend_admin'); + }); + + it('returns valid transitions for Suspended states', () => { + const valid = machine.getValidTransitions(SubscriptionChildState.FRAUD_HOLD); + expect(valid).toContain('unsuspend'); + expect(valid.length).toBe(1); + }); +}); + +// ── Exhaustive Transition Matrix ────────────────────────────────────────────── + +describe('State Machine - Exhaustive Transition Matrix', () => { + let machine: SubscriptionStateMachine; + + beforeEach(() => { + machine = new SubscriptionStateMachine(); + }); + + const allStates = getAllStates(); + const allEdges = Object.keys(TRANSITION_MATRIX) as TransitionEdge[]; + + for (const state of allStates) { + for (const edge of allEdges) { + const matrix = TRANSITION_MATRIX[edge]; + const isDefined = matrix.from.some((from) => { + return from === state || isChildOf(state, getParentState(from)); + }); + + it(`${edge} from ${state} should be ${isDefined ? 'allowed' : 'denied'}`, () => { + machine.setState('sub_1', state); + const result = machine.validateTransition('sub_1', edge); + expect(result.allowed).toBe(isDefined); + }); + } + } +}); + +// ── Mermaid Diagram ─────────────────────────────────────────────────────────── + +describe('State Machine - Visualization', () => { + it('generates a Mermaid state diagram', () => { + const machine = new SubscriptionStateMachine(); + const diagram = machine.generateMermaidDiagram(); + + expect(diagram).toContain('stateDiagram-v2'); + expect(diagram).toContain('Active'); + expect(diagram).toContain('Inactive'); + expect(diagram).toContain('Suspended'); + expect(diagram).toContain('Active.Trial'); + expect(diagram).toContain('Active.Paid'); + expect(diagram).toContain('Inactive.Cancelled'); + expect(diagram).toContain('Suspended.FraudHold'); + expect(diagram).toContain('cancel'); + expect(diagram).toContain('pause'); + expect(diagram).toContain('suspend_fraud'); + }); + + it('diagram includes all parent states', () => { + const machine = new SubscriptionStateMachine(); + const diagram = machine.generateMermaidDiagram(); + + for (const state of ['Active', 'Inactive', 'Suspended']) { + expect(diagram).toContain(`"${state}"`); + } + }); +}); + +// ── Module Tests ────────────────────────────────────────────────────────────── + +describe('State Machine - Module', () => { + it('preserves existing behavior - same transition results', async () => { + const machine = new SubscriptionStateMachine(); + machine.setState('sub_1', SubscriptionChildState.PAID); + + const cancelResult = await machine.transition('sub_1', 'cancel'); + expect(cancelResult.allowed).toBe(true); + expect(machine.getState('sub_1')).toBe(SubscriptionChildState.CANCELLED); + }); + + it('returns 409-style error for invalid transitions', async () => { + const machine = new SubscriptionStateMachine(); + machine.setState('sub_1', SubscriptionChildState.CANCELLED); + + const result = await machine.transition('sub_1', 'pause'); + expect(result.allowed).toBe(false); + expect(result.error).toBeDefined(); + expect(result.validTransitions).toBeDefined(); + }); +}); diff --git a/backend/services/subscription/state-machine/actions/index.ts b/backend/services/subscription/state-machine/actions/index.ts new file mode 100644 index 00000000..a356f972 --- /dev/null +++ b/backend/services/subscription/state-machine/actions/index.ts @@ -0,0 +1,95 @@ +import { SubscriptionState } from '../states'; + +export interface StateActionContext { + subscriptionId: string; + previousState: SubscriptionState; + newState: SubscriptionState; + actor?: string; + reason?: string; + metadata?: Record; + timestamp: number; +} + +export type StateAction = { + name: string; + execute: (ctx: StateActionContext) => void | Promise; +}; + +export type StateActionMap = { + onEntry?: StateAction[]; + onExit?: StateAction[]; +}; + +function noopAction(name: string): StateAction { + return { name, execute: () => {} }; +} + +const sendEmailAction: StateAction = { + name: 'sendEmail', + execute: (ctx: StateActionContext) => { + console.log(`[Action] Sending email for subscription ${ctx.subscriptionId}: state changed from ${ctx.previousState} to ${ctx.newState}`); + }, +}; + +const stopBillingAction: StateAction = { + name: 'stopBilling', + execute: (ctx: StateActionContext) => { + console.log(`[Action] Stopping billing for subscription ${ctx.subscriptionId} (reason: ${ctx.reason})`); + }, +}; + +const revokeAccessAction: StateAction = { + name: 'revokeAccess', + execute: (ctx: StateActionContext) => { + console.log(`[Action] Revoking access for subscription ${ctx.subscriptionId}`); + }, +}; + +const startBillingAction: StateAction = { + name: 'startBilling', + execute: (ctx: StateActionContext) => { + console.log(`[Action] Starting billing for subscription ${ctx.subscriptionId}`); + }, +}; + +const restoreAccessAction: StateAction = { + name: 'restoreAccess', + execute: (ctx: StateActionContext) => { + console.log(`[Action] Restoring access for subscription ${ctx.subscriptionId}`); + }, +}; + +const notifyAdminAction: StateAction = { + name: 'notifyAdmin', + execute: (ctx: StateActionContext) => { + console.log(`[Action] Notifying admin about subscription ${ctx.subscriptionId}: ${ctx.reason}`); + }, +}; + +export const entryActions: Partial> = { + 'Inactive.Cancelled': [sendEmailAction, stopBillingAction, revokeAccessAction], + 'Inactive.PausedEndOfCycle': [sendEmailAction], + 'Inactive.Expired': [stopBillingAction, revokeAccessAction], + 'Suspended.FraudHold': [notifyAdminAction, stopBillingAction, revokeAccessAction], + 'Suspended.AdminHold': [notifyAdminAction, stopBillingAction, revokeAccessAction], + 'Active.Trial': [startBillingAction], + 'Active.Paid': [sendEmailAction, restoreAccessAction, startBillingAction], + 'Active.PastDue': [sendEmailAction], +}; + +export const exitActions: Partial> = { + 'Active.Trial': [noopAction('notifyTrialEnding')], + 'Active.PastDue': [noopAction('notifyPaymentRecoveryFailed')], + 'Suspended.FraudHold': [noopAction('logFraudResolution')], + 'Suspended.AdminHold': [noopAction('logAdminHoldResolution')], +}; + +export async function executeActions( + actions: StateAction[] | undefined, + ctx: StateActionContext +): Promise { + if (!actions || actions.length === 0) return; + for (const action of actions) { + await action.execute(ctx); + } +} diff --git a/backend/services/subscription/state-machine/guards/index.ts b/backend/services/subscription/state-machine/guards/index.ts new file mode 100644 index 00000000..9dc5592b --- /dev/null +++ b/backend/services/subscription/state-machine/guards/index.ts @@ -0,0 +1,111 @@ +import { + SubscriptionState, + SubscriptionParentState, + SubscriptionChildState, + getParentState, + isChildOf, +} from '../states'; + +export type TransitionEdge = + | 'cancel' + | 'pause' + | 'resume' + | 'upgrade' + | 'downgrade' + | 'suspend_fraud' + | 'suspend_admin' + | 'unsuspend' + | 'expire' + | 'trial_to_paid' + | 'payment_fail' + | 'payment_recover'; + +export interface TransitionContext { + subscriptionId: string; + currentState: SubscriptionState; + metadata?: Record; +} + +export type TransitionGuard = (ctx: TransitionContext) => boolean | { allowed: boolean; reason?: string }; + +export const guards: Record = { + cancel: (ctx) => { + if (isChildOf(ctx.currentState, SubscriptionParentState.ACTIVE)) return true; + if (ctx.currentState === SubscriptionChildState.PAUSED_END_OF_CYCLE) return true; + return { allowed: false, reason: 'Can only cancel from Active or PausedEndOfCycle states' }; + }, + + pause: (ctx) => { + if (isChildOf(ctx.currentState, SubscriptionParentState.ACTIVE)) return true; + return { allowed: false, reason: 'Can only pause from Active states' }; + }, + + resume: (ctx) => { + if (ctx.currentState === SubscriptionChildState.PAUSED_END_OF_CYCLE) return true; + return { allowed: false, reason: 'Can only resume from PausedEndOfCycle state' }; + }, + + upgrade: (ctx) => { + if (ctx.currentState === SubscriptionChildState.PAID) return true; + if (ctx.currentState === SubscriptionChildState.TRIAL) return true; + return { allowed: false, reason: 'Can only upgrade from Paid or Trial states' }; + }, + + downgrade: (ctx) => { + if (ctx.currentState === SubscriptionChildState.PAID) return true; + return { allowed: false, reason: 'Can only downgrade from Paid state' }; + }, + + suspend_fraud: (ctx) => { + if (isChildOf(ctx.currentState, SubscriptionParentState.ACTIVE)) return true; + if (ctx.currentState === SubscriptionChildState.PAUSED_END_OF_CYCLE) return true; + return { allowed: false, reason: 'Can only apply fraud hold from Active or PausedEndOfCycle states' }; + }, + + suspend_admin: (ctx) => { + if (isChildOf(ctx.currentState, SubscriptionParentState.ACTIVE)) return true; + if (ctx.currentState === SubscriptionChildState.PAUSED_END_OF_CYCLE) return true; + return { allowed: false, reason: 'Can only apply admin hold from Active or PausedEndOfCycle states' }; + }, + + unsuspend: (ctx) => { + if (isChildOf(ctx.currentState, SubscriptionParentState.SUSPENDED)) return true; + return { allowed: false, reason: 'Can only unsuspend from Suspended states' }; + }, + + expire: (ctx) => { + if (isChildOf(ctx.currentState, SubscriptionParentState.ACTIVE)) return true; + if (ctx.currentState === SubscriptionChildState.PAUSED_END_OF_CYCLE) return true; + return { allowed: false, reason: 'Can only expire from Active or PausedEndOfCycle states' }; + }, + + trial_to_paid: (ctx) => { + if (ctx.currentState === SubscriptionChildState.TRIAL) return true; + return { allowed: false, reason: 'Can only convert trial to paid from Trial state' }; + }, + + payment_fail: (ctx) => { + if (ctx.currentState === SubscriptionChildState.PAID) return true; + return { allowed: false, reason: 'Can only fail payment from Paid state' }; + }, + + payment_recover: (ctx) => { + if (ctx.currentState === SubscriptionChildState.PAST_DUE) return true; + return { allowed: false, reason: 'Can only recover payment from PastDue state' }; + }, +}; + +export const TRANSITION_MATRIX: Record = { + cancel: { from: [SubscriptionParentState.ACTIVE, SubscriptionChildState.PAUSED_END_OF_CYCLE], to: SubscriptionChildState.CANCELLED }, + pause: { from: [SubscriptionParentState.ACTIVE], to: SubscriptionChildState.PAUSED_END_OF_CYCLE }, + resume: { from: [SubscriptionChildState.PAUSED_END_OF_CYCLE], to: SubscriptionChildState.PAID }, + upgrade: { from: [SubscriptionChildState.PAID, SubscriptionChildState.TRIAL], to: SubscriptionChildState.PAID }, + downgrade: { from: [SubscriptionChildState.PAID], to: SubscriptionChildState.PAID }, + suspend_fraud: { from: [SubscriptionParentState.ACTIVE, SubscriptionChildState.PAUSED_END_OF_CYCLE], to: SubscriptionChildState.FRAUD_HOLD }, + suspend_admin: { from: [SubscriptionParentState.ACTIVE, SubscriptionChildState.PAUSED_END_OF_CYCLE], to: SubscriptionChildState.ADMIN_HOLD }, + unsuspend: { from: [SubscriptionParentState.SUSPENDED], to: SubscriptionChildState.PAID }, + expire: { from: [SubscriptionParentState.ACTIVE, SubscriptionChildState.PAUSED_END_OF_CYCLE], to: SubscriptionChildState.EXPIRED }, + trial_to_paid: { from: [SubscriptionChildState.TRIAL], to: SubscriptionChildState.PAID }, + payment_fail: { from: [SubscriptionChildState.PAID], to: SubscriptionChildState.PAST_DUE }, + payment_recover: { from: [SubscriptionChildState.PAST_DUE], to: SubscriptionChildState.PAID }, +}; diff --git a/backend/services/subscription/state-machine/index.ts b/backend/services/subscription/state-machine/index.ts new file mode 100644 index 00000000..d71b21f6 --- /dev/null +++ b/backend/services/subscription/state-machine/index.ts @@ -0,0 +1,202 @@ +import { + SubscriptionState, + SubscriptionChildState, + STATE_HIERARCHY, + getAllStates, + getParentState, + isChildOf, +} from './states'; +import { guards, TRANSITION_MATRIX, TransitionEdge, TransitionContext } from './guards/index'; +import { entryActions, exitActions, executeActions, StateActionContext } from './actions/index'; + +export { + SubscriptionState, + SubscriptionChildState, + SubscriptionParentState, + isChildOf, + getParentState, + getAllStates, +} from './states'; +export { TransitionEdge, guards, TRANSITION_MATRIX } from './guards/index'; +export { entryActions, exitActions, StateActionContext, StateAction } from './actions/index'; + +export interface TransitionRecord { + from: SubscriptionState; + to: SubscriptionState; + edge: TransitionEdge; + actor?: string; + reason?: string; + metadata?: Record; + timestamp: number; +} + +export interface TransitionResult { + allowed: boolean; + newState?: SubscriptionState; + error?: string; + validTransitions?: TransitionEdge[]; +} + +export class SubscriptionStateMachine { + private history: Map = new Map(); + private currentStates: Map = new Map(); + + constructor(initialStates?: Map) { + if (initialStates) { + for (const [id, state] of initialStates) { + this.currentStates.set(id, state); + } + } + } + + getState(subscriptionId: string): SubscriptionState { + return this.currentStates.get(subscriptionId) || SubscriptionChildState.TRIAL; + } + + setState(subscriptionId: string, state: SubscriptionState): void { + this.currentStates.set(subscriptionId, state); + } + + validateTransition( + subscriptionId: string, + edge: TransitionEdge + ): TransitionResult { + const current = this.getState(subscriptionId); + const guard = guards[edge]; + if (!guard) { + return { allowed: false, error: `Unknown transition: ${edge}` }; + } + + const guardResult = guard({ subscriptionId, currentState: current }); + if (typeof guardResult === 'object' && !guardResult.allowed) { + const validFromSameState = this.getValidTransitions(current); + return { + allowed: false, + error: guardResult.reason || `Transition ${edge} not allowed from ${current}`, + validTransitions: validFromSameState, + }; + } + + const matrix = TRANSITION_MATRIX[edge]; + if (!matrix) { + return { allowed: false, error: `No transition matrix entry for: ${edge}` }; + } + + const isAllowed = matrix.from.some((fromState) => { + if (fromState === current) return true; + if (Object.values(getParentState(current) as any).includes(current) && fromState === current) return true; + return fromState === current || isChildOf(current, getParentState(fromState)); + }); + + if (!isAllowed) { + const validFromSameState = this.getValidTransitions(current); + return { + allowed: false, + error: `Cannot apply transition ${edge} from state ${current}. Valid transitions: ${validFromSameState.join(', ')}`, + validTransitions: validFromSameState, + }; + } + + return { allowed: true }; + } + + async transition( + subscriptionId: string, + edge: TransitionEdge, + options?: { + actor?: string; + reason?: string; + metadata?: Record; + } + ): Promise { + const validation = this.validateTransition(subscriptionId, edge); + if (!validation.allowed) return validation; + + const current = this.getState(subscriptionId); + const targetState = TRANSITION_MATRIX[edge].to; + const timestamp = Date.now(); + + const actionCtx: StateActionContext = { + subscriptionId, + previousState: current, + newState: targetState, + actor: options?.actor, + reason: options?.reason, + metadata: options?.metadata, + timestamp, + }; + + await executeActions(exitActions[current], actionCtx); + this.currentStates.set(subscriptionId, targetState); + await executeActions(entryActions[targetState], actionCtx); + + const record: TransitionRecord = { + from: current, + to: targetState, + edge, + actor: options?.actor, + reason: options?.reason, + metadata: options?.metadata, + timestamp, + }; + + if (!this.history.has(subscriptionId)) { + this.history.set(subscriptionId, []); + } + this.history.get(subscriptionId)!.push(record); + + return { allowed: true, newState: targetState }; + } + + getHistory(subscriptionId: string): TransitionRecord[] { + return this.history.get(subscriptionId) || []; + } + + getValidTransitions(state: SubscriptionState): TransitionEdge[] { + const allEdges = Object.keys(TRANSITION_MATRIX) as TransitionEdge[]; + return allEdges.filter((edge) => { + const matrix = TRANSITION_MATRIX[edge]; + return matrix.from.some((fromState) => { + return fromState === state || isChildOf(state, getParentState(fromState)); + }); + }); + } + + getAllStates(): SubscriptionState[] { + return getAllStates(); + } + + generateMermaidDiagram(): string { + let diagram = 'stateDiagram-v2\n'; + + const parentStates = ['Active', 'Inactive', 'Suspended']; + const childrenByParent: Record = { + Active: [SubscriptionChildState.TRIAL, SubscriptionChildState.PAID, SubscriptionChildState.PAST_DUE], + Inactive: [SubscriptionChildState.CANCELLED, SubscriptionChildState.PAUSED_END_OF_CYCLE, SubscriptionChildState.EXPIRED], + Suspended: [SubscriptionChildState.FRAUD_HOLD, SubscriptionChildState.ADMIN_HOLD], + }; + + for (const parent of parentStates) { + diagram += ` state "${parent}" as ${parent} {\n`; + for (const child of childrenByParent[parent]) { + diagram += ` state "${child}" as ${child}\n`; + } + diagram += ' }\n'; + } + + diagram += '\n'; + const drawnEdges = new Set(); + for (const [edge, matrix] of Object.entries(TRANSITION_MATRIX)) { + const fromStates = matrix.from; + for (const from of fromStates) { + const edgeKey = `${from}->${matrix.to}`; + if (!drawnEdges.has(edgeKey)) { + drawnEdges.add(edgeKey); + diagram += ` ${from} --> ${matrix.to} : ${edge}\n`; + } + } + } + + return diagram; + } +} diff --git a/backend/services/subscription/state-machine/states.ts b/backend/services/subscription/state-machine/states.ts new file mode 100644 index 00000000..6f979370 --- /dev/null +++ b/backend/services/subscription/state-machine/states.ts @@ -0,0 +1,86 @@ +export const SubscriptionParentState = { + ACTIVE: 'Active', + INACTIVE: 'Inactive', + SUSPENDED: 'Suspended', +} as const; + +export type SubscriptionParentState = (typeof SubscriptionParentState)[keyof typeof SubscriptionParentState]; + +export const SubscriptionChildState = { + TRIAL: 'Active.Trial', + PAID: 'Active.Paid', + PAST_DUE: 'Active.PastDue', + CANCELLED: 'Inactive.Cancelled', + PAUSED_END_OF_CYCLE: 'Inactive.PausedEndOfCycle', + EXPIRED: 'Inactive.Expired', + FRAUD_HOLD: 'Suspended.FraudHold', + ADMIN_HOLD: 'Suspended.AdminHold', +} as const; + +export type SubscriptionChildState = (typeof SubscriptionChildState)[keyof typeof SubscriptionChildState]; + +export type SubscriptionState = SubscriptionParentState | SubscriptionChildState; + +export interface StateNode { + name: SubscriptionState; + parent?: SubscriptionParentState; + children?: StateNode[]; + initial?: boolean; +} + +export const STATE_HIERARCHY: StateNode[] = [ + { + name: SubscriptionParentState.ACTIVE, + children: [ + { name: SubscriptionChildState.TRIAL, parent: SubscriptionParentState.ACTIVE, initial: true }, + { name: SubscriptionChildState.PAID, parent: SubscriptionParentState.ACTIVE }, + { name: SubscriptionChildState.PAST_DUE, parent: SubscriptionParentState.ACTIVE }, + ], + }, + { + name: SubscriptionParentState.INACTIVE, + children: [ + { name: SubscriptionChildState.CANCELLED, parent: SubscriptionParentState.INACTIVE }, + { name: SubscriptionChildState.PAUSED_END_OF_CYCLE, parent: SubscriptionParentState.INACTIVE }, + { name: SubscriptionChildState.EXPIRED, parent: SubscriptionParentState.INACTIVE }, + ], + }, + { + name: SubscriptionParentState.SUSPENDED, + children: [ + { name: SubscriptionChildState.FRAUD_HOLD, parent: SubscriptionParentState.SUSPENDED }, + { name: SubscriptionChildState.ADMIN_HOLD, parent: SubscriptionParentState.SUSPENDED }, + ], + }, +]; + +export function getParentState(state: SubscriptionState): SubscriptionParentState { + if (Object.values(SubscriptionParentState).includes(state as SubscriptionParentState)) { + return state as SubscriptionParentState; + } + const dotIdx = state.indexOf('.'); + if (dotIdx > 0) { + return state.slice(0, dotIdx) as SubscriptionParentState; + } + return state as SubscriptionParentState; +} + +export function isChildOf(state: SubscriptionState, parent: SubscriptionParentState): boolean { + return getParentState(state) === parent; +} + +export function getAllStates(): SubscriptionState[] { + return [ + SubscriptionParentState.ACTIVE, + SubscriptionParentState.INACTIVE, + SubscriptionParentState.SUSPENDED, + SubscriptionChildState.TRIAL, + SubscriptionChildState.PAID, + SubscriptionChildState.PAST_DUE, + SubscriptionChildState.CANCELLED, + SubscriptionChildState.PAUSED_END_OF_CYCLE, + SubscriptionChildState.EXPIRED, + SubscriptionChildState.FRAUD_HOLD, + SubscriptionChildState.ADMIN_HOLD, + ]; +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..189dd9b1 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,94 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + ports: + - '6379:6379' + volumes: + - redis_data:/data + command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru + healthcheck: + test: ['CMD', 'redis-cli', 'ping'] + interval: 10s + timeout: 5s + retries: 3 + + webhook-dispatcher: + build: + context: ./services/webhook-dispatcher + dockerfile: Dockerfile + ports: + - '3001:3001' + environment: + - REDIS_URL=redis://redis:6379 + - REDIS_STREAM_KEY=webhook:events + - REDIS_CONSUMER_GROUP=webhook-dispatchers + - DISPATCHER_CONCURRENCY=10 + - DATABASE_URL=postgresql://postgres:postgres@postgres_dispatcher:5432/webhook_dispatcher + - PORT=3001 + - SHUTDOWN_TIMEOUT_MS=30000 + - QUEUE_BUFFER_SIZE=100000 + depends_on: + redis: + condition: service_healthy + postgres_dispatcher: + condition: service_healthy + restart: unless-stopped + deploy: + replicas: 2 + + postgres_dispatcher: + image: postgres:16-alpine + ports: + - '5433:5432' + environment: + - POSTGRES_DB=webhook_dispatcher + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + volumes: + - postgres_dispatcher_data:/var/lib/postgresql/data + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U postgres -d webhook_dispatcher'] + interval: 10s + timeout: 5s + retries: 3 + + postgres_main: + image: postgres:16-alpine + ports: + - '5432:5432' + environment: + - POSTGRES_DB=subtrackr + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + volumes: + - postgres_main_data:/var/lib/postgresql/data + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U postgres -d subtrackr'] + interval: 10s + timeout: 5s + retries: 3 + + api: + build: + context: ./backend + dockerfile: Dockerfile + ports: + - '4000:4000' + environment: + - REDIS_URL=redis://redis:6379 + - REDIS_STREAM_KEY=webhook:events + - DATABASE_URL=postgresql://postgres:postgres@postgres_main:5432/subtrackr + - PORT=4000 + depends_on: + redis: + condition: service_healthy + postgres_main: + condition: service_healthy + restart: unless-stopped + +volumes: + redis_data: + postgres_dispatcher_data: + postgres_main_data: diff --git a/k8s/webhook-dispatcher-hpa.yaml b/k8s/webhook-dispatcher-hpa.yaml new file mode 100644 index 00000000..89592916 --- /dev/null +++ b/k8s/webhook-dispatcher-hpa.yaml @@ -0,0 +1,103 @@ +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: webhook-dispatcher-hpa + namespace: subtrackr + labels: + app: webhook-dispatcher +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: webhook-dispatcher + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 + behavior: + scaleUp: + stabilizationWindowSeconds: 60 + policies: + - type: Pods + value: 2 + periodSeconds: 60 + - type: Percent + value: 50 + periodSeconds: 60 + selectPolicy: Max + scaleDown: + stabilizationWindowSeconds: 300 + policies: + - type: Pods + value: 1 + periodSeconds: 120 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: webhook-dispatcher + namespace: subtrackr + labels: + app: webhook-dispatcher +spec: + replicas: 2 + selector: + matchLabels: + app: webhook-dispatcher + template: + metadata: + labels: + app: webhook-dispatcher + spec: + containers: + - name: dispatcher + image: subtrackr/webhook-dispatcher:latest + ports: + - containerPort: 3001 + name: http + env: + - name: REDIS_URL + valueFrom: + secretKeyRef: + name: redis-credentials + key: url + - name: DISPATCHER_CONCURRENCY + value: '10' + - name: DATABASE_URL + valueFrom: + secretKeyRef: + name: dispatcher-db-credentials + key: url + - name: PORT + value: '3001' + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + livenessProbe: + httpGet: + path: /health + port: 3001 + initialDelaySeconds: 10 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /health + port: 3001 + initialDelaySeconds: 5 + periodSeconds: 10 + terminationGracePeriodSeconds: 30 diff --git a/scripts/generate-statechart.sh b/scripts/generate-statechart.sh new file mode 100644 index 00000000..3ca01bb4 --- /dev/null +++ b/scripts/generate-statechart.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# SubTrackr - Subscription State Chart Visualization Generator +# Usage: ./scripts/generate-statechart.sh [output_format] +# output_format: mermaid (default) | svg | png + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" +OUTPUT_DIR="${PROJECT_ROOT}/docs/diagrams" +OUTPUT_FORMAT="${1:-mermaid}" + +mkdir -p "$OUTPUT_DIR" + +# Generate Mermaid state diagram from the TypeScript state machine definition +MERMAID_FILE="${OUTPUT_DIR}/subscription-statechart.mermaid" + +cat > "$MERMAID_FILE" << 'MERMAID_EOF' +stateDiagram-v2 + state "Active" as Active { + state "Active.Trial" as Active_Trial + state "Active.Paid" as Active_Paid + state "Active.PastDue" as Active_PastDue + [*] --> Active_Trial + Active_Trial --> Active_Paid : trial_to_paid + Active_Paid --> Active_PastDue : payment_fail + Active_PastDue --> Active_Paid : payment_recover + } + + state "Inactive" as Inactive { + state "Inactive.Cancelled" as Inactive_Cancelled + state "Inactive.PausedEndOfCycle" as Inactive_PausedEndOfCycle + state "Inactive.Expired" as Inactive_Expired + } + + state "Suspended" as Suspended { + state "Suspended.FraudHold" as Suspended_FraudHold + state "Suspended.AdminHold" as Suspended_AdminHold + } + + Active --> Inactive_Cancelled : cancel + Active --> Inactive_PausedEndOfCycle : pause + Active --> Inactive_Expired : expire + Active --> Suspended_FraudHold : suspend_fraud + Active --> Suspended_AdminHold : suspend_admin + + Active_Paid --> Inactive_Cancelled : cancel + Active_Paid --> Inactive_PausedEndOfCycle : pause + Active_Paid --> Inactive_Expired : expire + Active_Paid --> Suspended_FraudHold : suspend_fraud + Active_Paid --> Suspended_AdminHold : suspend_admin + + Active_Trial --> Active_Paid : upgrade + Active_Trial --> Active_Paid : downgrade + + Inactive_PausedEndOfCycle --> Active_Paid : resume + Inactive_PausedEndOfCycle --> Inactive_Cancelled : cancel + Inactive_PausedEndOfCycle --> Inactive_Expired : expire + + Suspended_FraudHold --> Active_Paid : unsuspend + Suspended_AdminHold --> Active_Paid : unsuspend +MERMAID_EOF + +echo "Generated Mermaid state chart: $MERMAID_FILE" + +if [ "$OUTPUT_FORMAT" = "mermaid" ]; then + echo "Done. Use a Mermaid renderer to view the diagram." + exit 0 +fi + +if ! command -v mmdc &> /dev/null; then + echo "Warning: mermaid-cli (mmdc) not found. Only .mermaid file was generated." + echo "Install with: npm install -g @mermaid-js/mermaid-cli" + exit 0 +fi + +if [ "$OUTPUT_FORMAT" = "svg" ]; then + mmdc -i "$MERMAID_FILE" -o "${OUTPUT_DIR}/subscription-statechart.svg" + echo "Generated: ${OUTPUT_DIR}/subscription-statechart.svg" +elif [ "$OUTPUT_FORMAT" = "png" ]; then + mmdc -i "$MERMAID_FILE" -o "${OUTPUT_DIR}/subscription-statechart.png" -b transparent + echo "Generated: ${OUTPUT_DIR}/subscription-statechart.png" +fi diff --git a/services/webhook-dispatcher/Dockerfile b/services/webhook-dispatcher/Dockerfile new file mode 100644 index 00000000..cf705415 --- /dev/null +++ b/services/webhook-dispatcher/Dockerfile @@ -0,0 +1,18 @@ +FROM node:20-alpine AS builder +WORKDIR /app +COPY package.json tsconfig.json ./ +RUN npm install --production=false +COPY src/ ./src/ +RUN npm run build +RUN npm prune --production + +FROM node:20-alpine +WORKDIR /app +COPY --from=builder /app/node_modules ./node_modules +COPY --from=builder /app/dist ./dist +COPY --from=builder /app/package.json ./ +EXPOSE 3001 +HEALTHCHECK --interval=15s --timeout=5s --start-period=10s --retries=3 \ + CMD wget --no-verbose --tries=1 --spider http://localhost:3001/health || exit 1 +USER node +CMD ["node", "dist/index.js"] diff --git a/services/webhook-dispatcher/package.json b/services/webhook-dispatcher/package.json new file mode 100644 index 00000000..6f394954 --- /dev/null +++ b/services/webhook-dispatcher/package.json @@ -0,0 +1,28 @@ +{ + "name": "@subtrackr/webhook-dispatcher", + "version": "1.0.0", + "description": "SubTrackr Webhook Dispatcher — horizontally-scalable async webhook delivery service", + "private": true, + "main": "dist/index.js", + "scripts": { + "build": "tsc", + "start": "node dist/index.js", + "dev": "ts-node src/index.ts", + "test": "jest", + "lint": "eslint src/**/*.ts", + "typecheck": "tsc --noEmit" + }, + "dependencies": { + "ioredis": "^5.4.1", + "pg": "^8.12.0" + }, + "devDependencies": { + "@types/ioredis": "^5.0.0", + "@types/node": "^20.0.0", + "@types/pg": "^8.0.0", + "jest": "^29.7.0", + "ts-jest": "^29.1.0", + "ts-node": "^10.9.0", + "typescript": "^5.4.0" + } +} diff --git a/services/webhook-dispatcher/src/dispatchers/http.ts b/services/webhook-dispatcher/src/dispatchers/http.ts new file mode 100644 index 00000000..d57a4c37 --- /dev/null +++ b/services/webhook-dispatcher/src/dispatchers/http.ts @@ -0,0 +1,96 @@ +import crypto from 'crypto'; + +export interface DispatchRequest { + url: string; + payload: Record; + headers: Record; + signature: string; + eventType: string; + eventId: string; + idempotencyKey: string; +} + +export interface DispatchResult { + success: boolean; + statusCode?: number; + latencyMs?: number; + error?: string; +} + +export interface DispatcherOptions { + maxRetries: number; + initialDelayMs: number; + maxDelayMs: number; + backoffFactor: number; + timeout: number; +} + +export class HttpWebhookDispatcher { + private options: DispatcherOptions; + + constructor(options: DispatcherOptions) { + this.options = options; + } + + async dispatch(request: DispatchRequest): Promise { + const payloadBody = JSON.stringify(request.payload); + + if (Buffer.byteLength(payloadBody, 'utf8') > 1_048_576) { + return { success: false, error: 'Payload exceeds 1MB limit' }; + } + + let attempt = 0; + let lastError: string | undefined; + + while (attempt <= this.options.maxRetries) { + attempt++; + const startTime = Date.now(); + + try { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), this.options.timeout); + + const response = await fetch(request.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...request.headers, + }, + body: payloadBody, + signal: controller.signal, + }); + + clearTimeout(timeoutId); + const latencyMs = Date.now() - startTime; + + if (response.ok) { + return { success: true, statusCode: response.status, latencyMs }; + } + + lastError = `HTTP ${response.status}`; + + if (response.status < 500) { + return { success: false, statusCode: response.status, error: lastError, latencyMs }; + } + } catch (err: unknown) { + clearTimeout(undefined); + lastError = (err as Error).message || 'Unknown error'; + } + + if (attempt <= this.options.maxRetries) { + const delay = this.computeDelay(attempt); + console.log(`[Dispatch] Retry ${attempt}/${this.options.maxRetries} for ${request.url} in ${delay}ms: ${lastError}`); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + return { success: false, error: lastError || 'Max retries exceeded' }; + } + + private computeDelay(attempt: number): number { + const rawDelay = Math.floor( + this.options.initialDelayMs * Math.pow(this.options.backoffFactor, Math.max(0, attempt - 1)) + ); + return Math.min(rawDelay, this.options.maxDelayMs); + } +} diff --git a/services/webhook-dispatcher/src/index.ts b/services/webhook-dispatcher/src/index.ts new file mode 100644 index 00000000..a9f89452 --- /dev/null +++ b/services/webhook-dispatcher/src/index.ts @@ -0,0 +1,314 @@ +import * as http from 'http'; +import Redis from 'ioredis'; +import { Pool, PoolClient } from 'pg'; +import { HttpWebhookDispatcher } from './dispatchers/http'; + +// ─── Configuration ──────────────────────────────────────────────────────────── + +const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'; +const REDIS_STREAM_KEY = process.env.REDIS_STREAM_KEY || 'webhook:events'; +const REDIS_CONSUMER_GROUP = process.env.REDIS_CONSUMER_GROUP || 'webhook-dispatchers'; +const CONSUMER_NAME = process.env.CONSUMER_NAME || `dispatcher-${process.pid}-${Date.now()}`; +const CONCURRENCY = parseInt(process.env.DISPATCHER_CONCURRENCY || '10', 10); +const MAX_CONCURRENCY = Math.min(CONCURRENCY, 50); +const DATABASE_URL = process.env.DATABASE_URL || 'postgresql://localhost:5432/webhook_dispatcher'; +const PORT = parseInt(process.env.PORT || '3001', 10); +const SHUTDOWN_TIMEOUT_MS = parseInt(process.env.SHUTDOWN_TIMEOUT_MS || '30000', 10); +const QUEUE_BUFFER_SIZE = parseInt(process.env.QUEUE_BUFFER_SIZE || '100000', 10); + +// ─── Metrics ────────────────────────────────────────────────────────────────── + +const metrics = { + uptime: 0, + deliveriesTotal: 0, + deliveriesSuccess: 0, + deliveriesFailed: 0, + deliveriesInFlight: 0, + queueDepth: 0, + activeWorkers: 0, + errorRate: 0, +}; + +function updateErrorRate(): void { + const total = metrics.deliveriesSuccess + metrics.deliveriesFailed; + metrics.errorRate = total > 0 ? metrics.deliveriesFailed / total : 0; +} + +// ─── Database ───────────────────────────────────────────────────────────────── + +const dbPool = new Pool({ connectionString: DATABASE_URL, max: 10 }); + +async function setupDatabase(): Promise { + const client = await dbPool.connect(); + try { + await client.query(` + CREATE TABLE IF NOT EXISTS delivery_logs ( + id TEXT PRIMARY KEY, + event_id TEXT NOT NULL, + webhook_url TEXT NOT NULL, + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 5, + response_code INTEGER, + error_message TEXT, + latency_ms INTEGER, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + delivered_at TIMESTAMPTZ, + next_retry_at TIMESTAMPTZ, + signature TEXT, + idempotency_key TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS retry_state ( + id TEXT PRIMARY KEY, + event_data JSONB NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + next_retry_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_delivery_logs_event_id ON delivery_logs(event_id); + CREATE INDEX IF NOT EXISTS idx_delivery_logs_status ON delivery_logs(status); + CREATE INDEX IF NOT EXISTS idx_retry_state_next_retry ON retry_state(next_retry_at); + `); + console.log('[DB] Database schema initialized'); + } finally { + client.release(); + } +} + +// ─── Redis Consumer ─────────────────────────────────────────────────────────── + +const redis = new Redis(REDIS_URL); + +async function setupRedisStream(): Promise { + try { + await redis.xgroup('CREATE', REDIS_STREAM_KEY, REDIS_CONSUMER_GROUP, '$', 'MKSTREAM'); + console.log(`[Redis] Consumer group "${REDIS_CONSUMER_GROUP}" created`); + } catch (err: unknown) { + if ((err as Error).message.includes('BUSYGROUP')) { + console.log(`[Redis] Consumer group "${REDIS_CONSUMER_GROUP}" already exists`); + } else { + throw err; + } + } +} + +async function updateQueueDepth(): Promise { + try { + metrics.queueDepth = await redis.xlen(REDIS_STREAM_KEY); + } catch { + metrics.queueDepth = -1; + } +} + +// ─── Webhook Dispatcher ─────────────────────────────────────────────────────── + +const dispatcher = new HttpWebhookDispatcher({ + maxRetries: 5, + initialDelayMs: 250, + maxDelayMs: 8000, + backoffFactor: 2, + timeout: 30000, +}); + +async function processEvent(eventData: Record): Promise { + metrics.deliveriesInFlight++; + metrics.activeWorkers++; + metrics.deliveriesTotal++; + + const deliveryId = `del_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; + const idempotencyKey = `${eventData.eventId}:${eventData.webhookId}`; + + try { + await dbPool.query( + `INSERT INTO delivery_logs (id, event_id, webhook_url, event_type, payload, status, max_attempts, idempotency_key) + VALUES ($1, $2, $3, $4, $5, 'pending', $6, $7) + ON CONFLICT (id) DO NOTHING`, + [deliveryId, eventData.eventId, eventData.url, eventData.eventType, JSON.stringify(eventData.payload), 5, idempotencyKey] + ); + + const result = await dispatcher.dispatch({ + url: eventData.url as string, + payload: eventData.payload as Record, + headers: eventData.headers as Record, + signature: eventData.signature as string, + eventType: eventData.eventType as string, + eventId: eventData.eventId as string, + idempotencyKey, + }); + + if (result.success) { + metrics.deliveriesSuccess++; + await dbPool.query( + `UPDATE delivery_logs SET status = 'delivered', response_code = $2, latency_ms = $3, delivered_at = NOW(), updated_at = NOW() + WHERE id = $1`, + [deliveryId, result.statusCode, result.latencyMs] + ); + } else { + metrics.deliveriesFailed++; + if (result.statusCode && result.statusCode >= 500) { + await dbPool.query( + `INSERT INTO retry_state (id, event_data, attempts, last_error, next_retry_at) + VALUES ($1, $2, 1, $3, NOW() + INTERVAL '30 seconds') + ON CONFLICT (id) DO UPDATE SET attempts = retry_state.attempts + 1, last_error = $3, next_retry_at = NOW() + INTERVAL '1 minute'`, + [deliveryId, JSON.stringify(eventData), result.error || 'Delivery failed'] + ); + } + await dbPool.query( + `UPDATE delivery_logs SET status = 'failed', response_code = $2, error_message = $3, updated_at = NOW() + WHERE id = $1`, + [deliveryId, result.statusCode, result.error || 'Delivery failed'] + ); + } + } catch (err: unknown) { + metrics.deliveriesFailed++; + console.error(`[Dispatcher] Error processing event:`, (err as Error).message); + } finally { + metrics.deliveriesInFlight--; + metrics.activeWorkers--; + updateErrorRate(); + } +} + +async function consumeEvents(): Promise { + const activePromises = new Set>(); + + while (true) { + while (activePromises.size >= MAX_CONCURRENCY) { + await Promise.race(activePromises); + } + + try { + const results = await redis.xreadgroup( + 'GROUP', REDIS_CONSUMER_GROUP, CONSUMER_NAME, + 'COUNT', MAX_CONCURRENCY - activePromises.size, + 'BLOCK', 1000, + 'STREAMS', REDIS_STREAM_KEY, '>' + ); + + if (results) { + for (const [, messages] of results) { + for (const [messageId, fields] of messages as [string, string[]][]) { + const eventData: Record = {}; + for (let i = 0; i < fields.length; i += 2) { + try { + eventData[fields[i]] = JSON.parse(fields[i + 1]); + } catch { + eventData[fields[i]] = fields[i + 1]; + } + } + + const promise = processEvent(eventData) + .then(() => redis.xack(REDIS_STREAM_KEY, REDIS_CONSUMER_GROUP, messageId)) + .catch((err) => { + console.error(`[Consumer] Error processing message ${messageId}:`, (err as Error).message); + }) + .finally(() => activePromises.delete(promise)); + + activePromises.add(promise); + } + } + } + + await updateQueueDepth(); + } catch (err: unknown) { + if (!(err as Error).message?.includes('connection')) { + console.error('[Consumer] Stream read error:', (err as Error).message); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } +} + +// ─── HTTP Health Server ─────────────────────────────────────────────────────── + +const server = http.createServer((req, res) => { + if (req.url === '/health' && req.method === 'GET') { + const health = { + status: 'ok', + uptime: metrics.uptime, + queueDepth: metrics.queueDepth, + activeWorkers: metrics.activeWorkers, + deliveriesInFlight: metrics.deliveriesInFlight, + deliveriesTotal: metrics.deliveriesTotal, + deliveriesSuccess: metrics.deliveriesSuccess, + deliveriesFailed: metrics.deliveriesFailed, + errorRate: metrics.errorRate, + concurrency: MAX_CONCURRENCY, + consumerName: CONSUMER_NAME, + timestamp: new Date().toISOString(), + }; + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(health)); + return; + } + + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Not found' })); +}); + +// ─── Graceful Shutdown ──────────────────────────────────────────────────────── + +function gracefulShutdown(signal: string): void { + console.log(`[Shutdown] Received ${signal}, shutting down gracefully...`); + + const forceExit = setTimeout(() => { + console.error('[Shutdown] Forced exit after timeout'); + process.exit(1); + }, SHUTDOWN_TIMEOUT_MS); + + server.close(async () => { + console.log('[Shutdown] HTTP server closed'); + + try { + await redis.quit(); + console.log('[Shutdown] Redis connection closed'); + } catch (err) { + console.error('[Shutdown] Error closing Redis:', (err as Error).message); + } + + try { + await dbPool.end(); + console.log('[Shutdown] Database connection pool closed'); + } catch (err) { + console.error('[Shutdown] Error closing database:', (err as Error).message); + } + + clearTimeout(forceExit); + console.log('[Shutdown] Graceful shutdown complete'); + process.exit(0); + }); +} + +process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); +process.on('SIGINT', () => gracefulShutdown('SIGINT')); + +// ─── Startup ────────────────────────────────────────────────────────────────── + +async function main(): Promise { + console.log('[SubTrackr] Webhook Dispatcher starting...'); + console.log(`[Config] Concurrency: ${MAX_CONCURRENCY}, Stream: ${REDIS_STREAM_KEY}, Group: ${REDIS_CONSUMER_GROUP}`); + + await setupDatabase(); + await setupRedisStream(); + + server.listen(PORT, () => { + console.log(`[HTTP] Health check listening on port ${PORT}`); + metrics.uptime = Date.now(); + }); + + consumeEvents().catch((err) => { + console.error('[Fatal] Consumer loop crashed:', (err as Error).message); + process.exit(1); + }); +} + +main().catch((err) => { + console.error('[Fatal] Startup failed:', (err as Error).message); + process.exit(1); +}); diff --git a/services/webhook-dispatcher/tsconfig.json b/services/webhook-dispatcher/tsconfig.json new file mode 100644 index 00000000..6d4e71e1 --- /dev/null +++ b/services/webhook-dispatcher/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "moduleResolution": "node", + "strict": true, + "skipLibCheck": true, + "esModuleInterop": true, + "outDir": "./dist", + "rootDir": "./src", + "declaration": true, + "sourceMap": true, + "resolveJsonModule": true, + "forceConsistentCasingInFileNames": true + }, + "include": ["src/**/*.ts"], + "exclude": ["node_modules", "dist"] +}