Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@b1dz/adapters-pumpfun": "workspace:*",
"@b1dz/adapters-evm": "workspace:*",
"@b1dz/adapters-solana": "workspace:*",
"@b1dz/ai-analyzer": "workspace:*",
"@b1dz/core": "workspace:*",
"@b1dz/equity-engine": "workspace:*",
"@b1dz/event-channel": "workspace:*",
Expand Down
246 changes: 245 additions & 1 deletion apps/daemon/src/sources/crypto-trade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ import {
setTradingOverride,
setDailyLossLimitPct,
setDexExecutor,
setSpendBudget,
setMaxPositionUsd,
syncSpendLedger,
drainSpendLedger,
isBudgetWindow,
windowStartFor,
setAiSizeMultiplier,
executeAgentMarketBuy,
type BudgetWindow,
} from '@b1dz/source-crypto-trade';
import { analyze, aiSizeMultiplier, RateLimiter, type AiAnalysis } from '@b1dz/ai-analyzer';
import { loadUserConfig } from '../user-config.js';
import { AlertBus, getAnalysisCache, getB1dzVersion, setAnalysisCache } from '@b1dz/core';
import { runnerStorageFor } from '../runner-storage.js';
import { logActivity, logRaw, getActivityLog, getRawLog } from './activity-log.js';
Expand All @@ -34,6 +45,210 @@ let dexExecutorArmedAt: number | null = null;
let dexExecutorLastAttemptLogAt = 0;
const DEX_EXECUTOR_RETRY_LOG_INTERVAL_MS = 5 * 60_000;

/** userId → windowStart the spend ledger was last seeded for (re-seed on rollover). */
const spendLedgerSeededFor = new Map<string, number>();

/**
* Keep the engine's in-memory spend budget in sync with the durable
* `crypto_spend_ledger` table:
* 1. Once per (user, window) — sum this window's spend from the DB and seed
* the engine, so the budget survives daemon restarts.
* 2. Every tick — drain buys the engine just recorded and persist them.
* Best-effort: ledger failures never block trading (the in-memory counter is
* still authoritative for the live process).
*/
async function reconcileSpendLedger(
ctx: UserContext,
_storage: unknown,
window: BudgetWindow,
): Promise<void> {
const windowStart = windowStartFor(window, Date.now());
try {
if (spendLedgerSeededFor.get(ctx.userId) !== windowStart) {
const { data, error } = await ctx.supabase
.from('crypto_spend_ledger')
.select('usd')
.eq('user_id', ctx.userId)
.gte('ts', new Date(windowStart).toISOString());
if (!error) {
const sum = (data ?? []).reduce((acc: number, r: { usd: number | string }) => acc + Number(r.usd ?? 0), 0);
syncSpendLedger(sum);
spendLedgerSeededFor.set(ctx.userId, windowStart);
}
}
} catch (e) {
logRaw(`[trade] spend-ledger seed failed: ${(e as Error).message}`, 'crypto-trade');
}

const entries = drainSpendLedger();
if (entries.length === 0) return;
try {
await ctx.supabase.from('crypto_spend_ledger').insert(
entries.map((e) => ({
user_id: ctx.userId,
ts: new Date(e.ts).toISOString(),
usd: e.usd,
exchange: e.exchange,
pair: e.pair,
source: e.source,
agent_token_id: e.tokenId ?? null,
})),
);
} catch (e) {
logRaw(`[trade] spend-ledger write failed: ${(e as Error).message}`, 'crypto-trade');
}
}

/** Per-user AI-analyzer rate limiters + last-run timestamps. */
const aiRateLimiters = new Map<string, RateLimiter>();
const aiLastRunAt = new Map<string, number>();
const AI_MIN_INTERVAL_MS = 30_000; // never analyze more often than this, regardless of maxPerMin

/**
* AI analyzer step (Phase 2 — "Coinbase Advisor" analog). When the user has
* enabled it AND set their own provider key, call OUT to their model to score
* the active market and set the engine's AI size multiplier. The multiplier
* only scales a buy the deterministic strategy already wants and is itself
* clamped + bounded by the spend budget. Best-effort: any failure resets the
* multiplier to neutral (1×) so a flaky key never blocks or distorts trading.
*/
async function reconcileAiAnalyzer(ctx: UserContext): Promise<void> {
let analysis: AiAnalysis | null = null;
try {
const cfg = await loadUserConfig(ctx.userId);
const enabled = (cfg.getUserPlain('AI_ANALYZER_ENABLED') ?? '').toLowerCase() === 'true';
if (!enabled) {
setAiSizeMultiplier(1);
return;
}
const provider = cfg.getUserPlain('AI_PROVIDER') === 'openai' ? 'openai' : 'anthropic';
// STRICT per-user key — never operator env (env-fallback incident).
const apiKey = provider === 'openai' ? cfg.getUserSecret('OPENAI_API_KEY') : cfg.getUserSecret('ANTHROPIC_API_KEY');
if (!apiKey) {
setAiSizeMultiplier(1);
return;
}

const now = Date.now();
if (now - (aiLastRunAt.get(ctx.userId) ?? 0) < AI_MIN_INTERVAL_MS) return; // keep last multiplier
const maxPerMin = Number(cfg.getUserPlain('AI_MAX_CALLS_PER_MIN') ?? '6');
let rl = aiRateLimiters.get(ctx.userId);
if (!rl) { rl = new RateLimiter(Number.isFinite(maxPerMin) && maxPerMin > 0 ? maxPerMin : 6); aiRateLimiters.set(ctx.userId, rl); }
if (!rl.allow(now)) return;
aiLastRunAt.set(ctx.userId, now);

// Use the active position with the most price history as the market read.
const status = getTradeStatus();
const positions = status.positions ?? [];
const target = positions
.filter((p) => (p.priceSamples?.length ?? 0) >= 3)
.sort((a, b) => (b.priceSamples?.length ?? 0) - (a.priceSamples?.length ?? 0))[0];
if (!target) {
setAiSizeMultiplier(1); // flat / not enough data → neutral
return;
}

analysis = await analyze(
{
pair: target.pair,
exchange: target.exchange,
lastPrice: target.currentPrice,
closes: target.priceSamples.slice(-20),
deterministicSignal: null,
},
{ provider, apiKey, model: cfg.getUserPlain('AI_MODEL') || undefined },
);
setAiSizeMultiplier(aiSizeMultiplier(analysis, Date.now()));
} catch (e) {
setAiSizeMultiplier(1); // fail safe to neutral
logRaw(`[trade] AI analyzer skipped: ${(e as Error).message}`, 'crypto-trade');
return;
}

// Persist the latest analysis for the dashboard/TUI (best-effort).
if (analysis) {
try {
await ctx.supabase.from('source_state').upsert(
{ user_id: ctx.userId, source_id: 'ai-analysis', payload: analysis as unknown as Record<string, unknown>, updated_at: new Date().toISOString() },
{ onConflict: 'user_id,source_id' },
);
} catch { /* non-fatal */ }
}
}

interface AgentQueueItem {
idempotencyKey: string;
tokenId: string;
pair: string;
exchange: string | null;
usd: number;
side: 'buy';
enqueuedAt: string;
}

/**
* Drain the agent-orders queue (Phase 3 — "Coinbase for Agents"). Orders are
* authorized + per-token-budgeted by the web API, then enqueued in source_state
* 'agent-orders'; here we execute each BUY through the engine, which re-applies
* the global spend budget + records the spend tagged source='agent' with the
* token id (so per-token budgets reconcile). Processed items are removed and
* the outcome written to agent_actions.
*/
async function reconcileAgentOrders(ctx: UserContext): Promise<void> {
let queue: AgentQueueItem[];
try {
const { data } = await ctx.supabase
.from('source_state')
.select('payload')
.eq('user_id', ctx.userId)
.eq('source_id', 'agent-orders')
.maybeSingle();
queue = ((data?.payload as { queue?: AgentQueueItem[] } | undefined)?.queue ?? []) as AgentQueueItem[];
} catch {
return;
}
if (queue.length === 0) return;

for (const item of queue) {
let ok = false;
let message = 'skipped';
try {
const res = await executeAgentMarketBuy({ pair: item.pair, exchange: item.exchange ?? undefined, usd: item.usd, tokenId: item.tokenId });
ok = res.ok;
message = res.message;
} catch (e) {
message = (e as Error).message;
}
try {
await ctx.supabase.from('agent_actions').insert({
user_id: ctx.userId,
agent_token_id: item.tokenId,
action: 'execute_order',
detail: { pair: item.pair, usd: item.usd, idempotencyKey: item.idempotencyKey, message },
ok,
});
} catch { /* non-fatal */ }
}

// Remove only the keys we processed — re-read so orders enqueued mid-tick
// survive instead of being clobbered.
const processed = new Set(queue.map((q) => q.idempotencyKey));
try {
const { data } = await ctx.supabase
.from('source_state')
.select('payload')
.eq('user_id', ctx.userId)
.eq('source_id', 'agent-orders')
.maybeSingle();
const current = ((data?.payload as { queue?: AgentQueueItem[] } | undefined)?.queue ?? []) as AgentQueueItem[];
const remaining = current.filter((q) => !processed.has(q.idempotencyKey));
await ctx.supabase.from('source_state').upsert(
{ user_id: ctx.userId, source_id: 'agent-orders', payload: { queue: remaining }, updated_at: new Date().toISOString() },
{ onConflict: 'user_id,source_id' },
);
} catch { /* non-fatal */ }
}

// Railway-level panic kill switch, captured at module-load time. The
// per-tick applyEnvOverlay() copies user_settings.TRADING_ENABLED on top
// of process.env, which would otherwise let a stale "true" in a user's
Expand Down Expand Up @@ -116,7 +331,13 @@ export const cryptoTradeWorker: SourceWorker = {
// (1) bypasses the per-tick user-config overlay, so a stale "true"
// in user_settings.TRADING_ENABLED cannot mask the Railway panic
// switch. (2)–(4) fall back to normal toggle behavior.
const uiSettings = await storage.get<{ tradingEnabled?: boolean | null; dailyLossLimitPct?: number | null }>('source-state', 'crypto-ui-settings');
const uiSettings = await storage.get<{
tradingEnabled?: boolean | null;
dailyLossLimitPct?: number | null;
spendBudgetUsd?: number | null;
budgetWindow?: string | null;
maxPositionUsd?: number | null;
}>('source-state', 'crypto-ui-settings');
const uiOverride = uiSettings?.tradingEnabled;
const envRaw = (process.env.TRADING_ENABLED ?? '').trim().toLowerCase();
const envOverride = envRaw === 'true' ? true : envRaw === 'false' ? false : null;
Expand All @@ -138,6 +359,29 @@ export const cryptoTradeWorker: SourceWorker = {
: null,
);

// Crypto spend budget — the rolling USD cap on buys (engine/AI/agent).
// Per-user, flows via crypto-ui-settings (NOT operator env), same channel
// as the daily loss limit above.
const budgetWindow: BudgetWindow = isBudgetWindow(uiSettings?.budgetWindow) ? uiSettings.budgetWindow : 'daily';
setSpendBudget(
typeof uiSettings?.spendBudgetUsd === 'number' && Number.isFinite(uiSettings.spendBudgetUsd) && uiSettings.spendBudgetUsd > 0
? uiSettings.spendBudgetUsd
: null,
budgetWindow,
);
setMaxPositionUsd(
typeof uiSettings?.maxPositionUsd === 'number' && Number.isFinite(uiSettings.maxPositionUsd) && uiSettings.maxPositionUsd > 0
? uiSettings.maxPositionUsd
: null,
);

// Seed the in-memory spent counter from the durable ledger once per user
// (so the budget survives daemon restarts). Then drain freshly-recorded
// buys into the ledger each tick.
await reconcileSpendLedger(ctx, storage, budgetWindow);
await reconcileAiAnalyzer(ctx);
await reconcileAgentOrders(ctx);

// Arm the DEX executor lazily, inside applyEnvOverlay, so the
// user's keys + RPC URLs are visible on process.env. We retry
// every tick until success — operators frequently add an RPC URL
Expand Down
12 changes: 12 additions & 0 deletions apps/web/src/app/api/agent/budget/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { NextRequest } from 'next/server';
import { authenticateAgent, unauthorized } from '@/lib/api-auth';
import { getAgentBudget } from '@/lib/agent-trade';

export const dynamic = 'force-dynamic';

export async function GET(req: NextRequest) {
const agent = await authenticateAgent(req);
if (!agent) return unauthorized();
const { status, body } = await getAgentBudget(agent);
return Response.json(body, { status });
}
103 changes: 103 additions & 0 deletions apps/web/src/app/api/agent/mcp/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';

const authenticateAgentMock = vi.fn();
const placeAgentOrderMock = vi.fn();
const getAgentBudgetMock = vi.fn();
const getAgentPortfolioMock = vi.fn();
const getAgentQuoteMock = vi.fn();

vi.mock('@/lib/api-auth', () => ({
authenticateAgent: (...a: unknown[]) => authenticateAgentMock(...a),
// route imports the type only; provide a stub to satisfy the module
unauthorized: () => new Response(null, { status: 401 }),
}));
vi.mock('@/lib/agent-trade', () => ({
placeAgentOrder: (...a: unknown[]) => placeAgentOrderMock(...a),
getAgentBudget: (...a: unknown[]) => getAgentBudgetMock(...a),
getAgentPortfolio: (...a: unknown[]) => getAgentPortfolioMock(...a),
getAgentQuote: (...a: unknown[]) => getAgentQuoteMock(...a),
}));

async function importRoute() {
return (await import('./route.js')) as typeof import('./route.js');
}

function rpc(method: string, params?: unknown, id: unknown = 1, withAuth = true) {
return new Request('http://test.local/api/agent/mcp', {
method: 'POST',
headers: {
'content-type': 'application/json',
...(withAuth ? { authorization: 'Bearer b1dz_agent_test' } : {}),
},
body: JSON.stringify({ jsonrpc: '2.0', id, method, params }),
});
}

describe('MCP route (e2e)', () => {
beforeEach(() => {
vi.clearAllMocks();
authenticateAgentMock.mockResolvedValue({ token: { scopes: ['read', 'trade:crypto'] }, scopes: ['read', 'trade:crypto'], tokenId: 't', userId: 'u', admin: {} });
});

it('handles initialize without auth and advertises tool capability', async () => {
const { POST } = await importRoute();
const res = await POST(rpc('initialize', {}, 1, false) as never);
const body = await res.json();
expect(body.result.serverInfo.name).toBe('b1dz-agent');
expect(body.result.capabilities.tools).toBeDefined();
});

it('rejects a bad JSON-RPC envelope with 400', async () => {
const { POST } = await importRoute();
const bad = new Request('http://test.local/api/agent/mcp', {
method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ foo: 'bar' }),
});
const res = await POST(bad as never);
expect(res.status).toBe(400);
});

it('lists the four tools (requires auth)', async () => {
const { POST } = await importRoute();
const res = await POST(rpc('tools/list') as never);
const body = await res.json();
const names = (body.result.tools as Array<{ name: string }>).map((t) => t.name).sort();
expect(names).toEqual(['get_budget', 'get_portfolio', 'get_quote', 'place_order']);
});

it('401s tool calls without a valid agent token', async () => {
authenticateAgentMock.mockResolvedValueOnce(null);
const { POST } = await importRoute();
const res = await POST(rpc('tools/list', undefined, 1) as never);
expect(res.status).toBe(401);
const body = await res.json();
expect(body.error.code).toBe(-32001);
});

it('routes tools/call place_order through placeAgentOrder', async () => {
placeAgentOrderMock.mockResolvedValue({ status: 202, body: { status: 'accepted', idempotencyKey: 'k' } });
const { POST } = await importRoute();
const res = await POST(rpc('tools/call', { name: 'place_order', arguments: { pair: 'BTC-USD', usd: 25 } }) as never);
const body = await res.json();
expect(placeAgentOrderMock).toHaveBeenCalledOnce();
expect(placeAgentOrderMock.mock.calls[0]![1]).toMatchObject({ pair: 'BTC-USD', usd: 25 });
// tool result is wrapped as MCP text content
const text = JSON.parse(body.result.content[0].text);
expect(text.status).toBe('accepted');
expect(body.result.isError).toBe(false);
});

it('returns an isError tool result for an unknown tool', async () => {
const { POST } = await importRoute();
const res = await POST(rpc('tools/call', { name: 'launch_missiles', arguments: {} }) as never);
const body = await res.json();
expect(body.result.isError).toBe(true);
expect(JSON.parse(body.result.content[0].text).error).toMatch(/unknown tool/);
});

it('returns -32601 for an unknown method', async () => {
const { POST } = await importRoute();
const res = await POST(rpc('resources/list') as never);
const body = await res.json();
expect(body.error.code).toBe(-32601);
});
});
Loading
Loading