diff --git a/apps/browser-scraper/package.json b/apps/browser-scraper/package.json index 96e02a3..65ca09b 100644 --- a/apps/browser-scraper/package.json +++ b/apps/browser-scraper/package.json @@ -3,7 +3,7 @@ "private": true, "type": "module", "scripts": { - "dev": "wrangler dev", + "dev": "wrangler dev --port 8788 --inspector-port 9230", "deploy": "wrangler deploy", "typecheck": "tsc", "lint": "eslint .", diff --git a/apps/operator/migrations/0006_unknown_the_santerians.sql b/apps/operator/migrations/0006_unknown_the_santerians.sql new file mode 100644 index 0000000..23f6fd1 --- /dev/null +++ b/apps/operator/migrations/0006_unknown_the_santerians.sql @@ -0,0 +1,8 @@ +CREATE TABLE `pending_conversations` ( + `chat_id` integer PRIMARY KEY NOT NULL, + `messages_json` text NOT NULL, + `pending_tool_call_id` text NOT NULL, + `options_json` text NOT NULL, + `token` text NOT NULL, + `expires_at` text NOT NULL +); diff --git a/apps/operator/migrations/meta/0006_snapshot.json b/apps/operator/migrations/meta/0006_snapshot.json new file mode 100644 index 0000000..d48e3bb --- /dev/null +++ b/apps/operator/migrations/meta/0006_snapshot.json @@ -0,0 +1,283 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "8e57a58d-9d91-45b3-8f6c-5defe7f366c0", + "prevId": "b69943e8-9429-48a9-b896-82adc44bd6d7", + "tables": { + "pending_actions": { + "name": "pending_actions", + "columns": { + "chat_id": { + "name": "chat_id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "action_type": { + "name": "action_type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "payload": { + "name": "payload", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expires_at": { + "name": "expires_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "pending_conversations": { + "name": "pending_conversations", + "columns": { + "chat_id": { + "name": "chat_id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "messages_json": { + "name": "messages_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "pending_tool_call_id": { + "name": "pending_tool_call_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "options_json": { + "name": "options_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expires_at": { + "name": "expires_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "schedules": { + "name": "schedules", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "chat_id": { + "name": "chat_id", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "schedule_type": { + "name": "schedule_type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "hour": { + "name": "hour", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "minute": { + "name": "minute", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "day_of_week": { + "name": "day_of_week", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "day_of_month": { + "name": "day_of_month", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "timezone": { + "name": "timezone", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'UTC'" + }, + "fixed_message": { + "name": "fixed_message", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "message_prompt": { + "name": "message_prompt", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "source_url": { + "name": "source_url", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "keywords": { + "name": "keywords", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "state_json": { + "name": "state_json", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "active": { + "name": "active", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": true + }, + "use_browser": { + "name": "use_browser", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "next_run_at": { + "name": "next_run_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "claimed_at": { + "name": "claimed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "retry_count": { + "name": "retry_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "idx_schedules_next_run": { + "name": "idx_schedules_next_run", + "columns": ["active", "next_run_at"], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/apps/operator/migrations/meta/_journal.json b/apps/operator/migrations/meta/_journal.json index d65369f..5943086 100644 --- a/apps/operator/migrations/meta/_journal.json +++ b/apps/operator/migrations/meta/_journal.json @@ -43,6 +43,13 @@ "when": 1777491867371, "tag": "0005_nervous_scorpion", "breakpoints": true + }, + { + "idx": 6, + "version": "6", + "when": 1777705134403, + "tag": "0006_unknown_the_santerians", + "breakpoints": true } ] } diff --git a/apps/operator/src/db/schema.ts b/apps/operator/src/db/schema.ts index 8ba75ec..5f2196f 100644 --- a/apps/operator/src/db/schema.ts +++ b/apps/operator/src/db/schema.ts @@ -46,4 +46,13 @@ const pendingActions = sqliteTable("pending_actions", { token: text("token"), }); -export { pendingActions, schedules }; +const pendingConversations = sqliteTable("pending_conversations", { + chatId: int("chat_id").primaryKey(), + messagesJson: text("messages_json").notNull(), + pendingToolCallId: text("pending_tool_call_id").notNull(), + optionsJson: text("options_json").notNull(), + token: text("token").notNull(), + expiresAt: text("expires_at").notNull(), +}); + +export { pendingActions, pendingConversations, schedules }; diff --git a/apps/operator/src/modules/telegram/callback-data.ts b/apps/operator/src/modules/telegram/callback-data.ts new file mode 100644 index 0000000..400f51c --- /dev/null +++ b/apps/operator/src/modules/telegram/callback-data.ts @@ -0,0 +1,47 @@ +type ParsedCallback = + | { action: "confirm" | "cancel"; token: string } + | { action: "answer"; token: string; optionIndex: number }; + +const parseCallbackData = ( + data: string | undefined +): ParsedCallback | undefined => { + if (!data) { + return undefined; + } + if (data.startsWith("q:")) { + const rest = data.slice(2); + const colon = rest.indexOf(":"); + if (colon === -1) { + return undefined; + } + const token = rest.slice(0, colon); + const indexStr = rest.slice(colon + 1); + if (!token || !/^\d+$/.test(indexStr)) { + return undefined; + } + return { + action: "answer", + token, + optionIndex: Number.parseInt(indexStr, 10), + }; + } + const colon = data.indexOf(":"); + if (colon === -1) { + return undefined; + } + const prefix = data.slice(0, colon); + const token = data.slice(colon + 1); + if (!token) { + return undefined; + } + if (prefix === "c") { + return { action: "confirm", token }; + } + if (prefix === "x") { + return { action: "cancel", token }; + } + return undefined; +}; + +export { parseCallbackData }; +export type { ParsedCallback }; diff --git a/apps/operator/src/modules/telegram/controller.ts b/apps/operator/src/modules/telegram/controller.ts index 8110af7..9fb8d55 100644 --- a/apps/operator/src/modules/telegram/controller.ts +++ b/apps/operator/src/modules/telegram/controller.ts @@ -1,91 +1,27 @@ +import type { Logger } from "@repo/logger"; import type { Context } from "hono"; -import { OpenAiService } from "../../services/openai"; -import type { ToolExecutor, ToolResult } from "../../services/openai"; import { PendingActionService } from "../../services/pending-action"; import type { PendingAction } from "../../services/pending-action"; -import { - createScheduleSchema, - MAX_ACTIVE_SCHEDULES, - ScheduleService, -} from "../../services/schedule"; +import { PendingConversationService } from "../../services/pending-conversation"; +import { ScheduleService } from "../../services/schedule"; import type { CreateScheduleInput } from "../../services/schedule"; import { TelegramService } from "../../services/telegram"; import type { AppEnv } from "../../types/env"; import type { - InlineKeyboardMarkup, TelegramCallbackQuery, TelegramUpdate, } from "../../types/telegram"; -import { markdownToTelegramHtml } from "../../utils/markdown-to-html"; -import { - splitMessage, - TELEGRAM_HTML_SAFE_LENGTH, - TELEGRAM_MAX_MESSAGE_LENGTH, -} from "../../utils/message"; -import { validateSourceUrl } from "@repo/url-validator"; - -const DAYS = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]; - -const formatScheduleDescription = ( - type: string, - args: Record -): string => { - const schedType = - typeof args.schedule_type === "string" ? args.schedule_type : type; - const parts: string[] = [`${schedType} schedule`]; - if (args.hour != null) { - const h = typeof args.hour === "number" ? String(args.hour) : "0"; - const m = - typeof args.minute === "number" - ? String(args.minute).padStart(2, "0") - : "00"; - parts.push(`at ${h}:${m}`); - } - if (typeof args.day_of_week === "number") { - parts.push(`on ${DAYS[args.day_of_week] ?? "?"}`); - } - if (typeof args.day_of_month === "number") { - parts.push(`on day ${String(args.day_of_month)}`); - } - const tz = typeof args.timezone === "string" ? args.timezone : "UTC"; - parts.push(`(${tz})`); - if (typeof args.description === "string") { - parts.push(`— "${args.description}"`); - } - return parts.join(" "); -}; - -const mapToolArgsToInput = ( - args: Record -): CreateScheduleInput => ({ - scheduleType: args.schedule_type as CreateScheduleInput["scheduleType"], - hour: args.hour as number | undefined, - minute: args.minute as number | undefined, - dayOfWeek: args.day_of_week as number | undefined, - dayOfMonth: args.day_of_month as number | undefined, - timezone: (args.timezone as string | undefined) ?? "Europe/Helsinki", - fixedMessage: args.fixed_message as string | undefined, - messagePrompt: args.message_prompt as string | undefined, - sourceUrl: args.source_url as string | undefined, - keywords: args.keywords as string[] | undefined, - description: (args.description as string | undefined) ?? "", -}); - -const buildConfirmationKeyboard = (token: string): InlineKeyboardMarkup => ({ - inline_keyboard: [ - [ - { text: "✅ Yes", callback_data: `c:${token}` }, - { text: "❌ No", callback_data: `x:${token}` }, - ], - ], -}); +import { splitMessage } from "../../utils/message"; +import { parseCallbackData } from "./callback-data"; +import type { ParsedCallback } from "./callback-data"; +import { ConversationRunner } from "./conversation-runner"; const executePendingAction = async ( action: PendingAction, chatId: number, db: D1Database, - logger: { info: (msg: string, meta?: Record) => void } + logger: Logger ): Promise => { const scheduleService = new ScheduleService(db); @@ -186,154 +122,11 @@ const handleMessage = async ( } } - const scheduleService = new ScheduleService(c.env.DB); - - let pendingButtonToken: string | undefined; - - const toolExecutor: ToolExecutor = async ( - name: string, - args: Record - ): Promise => { - logger.debug("tool call received", { tool: name }); - - if (name === "list_schedules") { - const list = await scheduleService.list(chatId); - return { - result: JSON.stringify( - list.map((s) => ({ - id: s.id, - type: s.scheduleType, - description: s.description, - hour: s.hour, - minute: s.minute, - dayOfWeek: s.dayOfWeek, - dayOfMonth: s.dayOfMonth, - timezone: s.timezone, - sourceUrl: s.sourceUrl, - nextRunAt: s.nextRunAt, - })) - ), - }; - } - - if (name === "create_schedule") { - const count = await scheduleService.countActive(chatId); - if (count >= MAX_ACTIVE_SCHEDULES) { - return { - error: `Quota exceeded: maximum ${String(MAX_ACTIVE_SCHEDULES)} active schedules`, - }; - } - - const input = mapToolArgsToInput(args); - const validation = createScheduleSchema.safeParse(input); - if (!validation.success) { - return { error: validation.error.message }; - } - - if (input.sourceUrl) { - const urlCheck = validateSourceUrl(input.sourceUrl); - if (!urlCheck.valid) { - return { error: urlCheck.reason }; - } - } - - pendingButtonToken = await pendingService.set(chatId, { - type: "create_schedule", - payload: input as unknown as Record, - description: formatScheduleDescription("create", args), - }); - - return { - result: `Confirmation buttons attached. Reply with a short summary like 'Confirm creating: ${formatScheduleDescription("create", args)}' — do NOT ask the user to type YES.`, - }; - } - - if (name === "delete_schedule") { - const id = args.id as string | undefined; - if (!id) { - return { error: "Missing schedule ID" }; - } - - pendingButtonToken = await pendingService.set(chatId, { - type: "delete_schedule", - payload: { id }, - description: `Delete schedule ${id}`, - }); - - return { - result: `Confirmation buttons attached. Reply with a short summary like 'Confirm deleting schedule ${id}' — do NOT ask the user to type YES.`, - }; - } - - return { error: `Unknown tool: ${name}` }; - }; - - let reply: string; - let replyIsHtml = true; - try { - const openai = new OpenAiService(c.env.OPENAI_API_KEY, logger); - reply = await openai.replyWithTools(message.text, toolExecutor); - } catch (error) { - logger.error("openai request failed", { - error: error instanceof Error ? error.message : String(error), - }); - reply = - "Something went wrong while generating a response. Please try again."; - replyIsHtml = false; - pendingButtonToken = undefined; - await pendingService.clear(chatId); - } - - const replyMarkup = pendingButtonToken - ? buildConfirmationKeyboard(pendingButtonToken) - : undefined; - - try { - const chunkMax = replyIsHtml - ? TELEGRAM_HTML_SAFE_LENGTH - : TELEGRAM_MAX_MESSAGE_LENGTH; - const chunks = [...splitMessage(reply, chunkMax)]; - for (let i = 0; i < chunks.length; i++) { - const isLast = i === chunks.length - 1; - await telegram.sendMessage({ - chat_id: chatId, - text: replyIsHtml ? markdownToTelegramHtml(chunks[i]) : chunks[i], - ...(replyIsHtml ? { parse_mode: "HTML" as const } : {}), - ...(isLast && replyMarkup ? { reply_markup: replyMarkup } : {}), - }); - } - } catch (error) { - await pendingService.clear(chatId); - throw error; - } - + const runner = new ConversationRunner(chatId, c.env, logger, telegram); + await runner.startFromMessage(message.text); return c.json({ ok: true }); }; -const parseCallbackData = ( - data: string | undefined -): { action: "confirm" | "cancel"; token: string } | undefined => { - if (!data) { - return undefined; - } - const colon = data.indexOf(":"); - if (colon === -1) { - return undefined; - } - const prefix = data.slice(0, colon); - const token = data.slice(colon + 1); - if (!token) { - return undefined; - } - if (prefix === "c") { - return { action: "confirm", token }; - } - if (prefix === "x") { - return { action: "cancel", token }; - } - return undefined; -}; - const handleCallbackQuery = async ( c: Context, cq: TelegramCallbackQuery @@ -373,13 +166,72 @@ const handleCallbackQuery = async ( if (!parsed) { logger.warn("callback_query has malformed data", { callbackId: cq.id }); await telegram - .answerCallbackQuery({ callback_query_id: cq.id }) + .answerCallbackQuery({ callback_query_id: cq.id, text: "Malformed" }) .catch(() => undefined); return c.json({ ok: true }); } - const pendingService = new PendingActionService(c.env.DB); + if (parsed.action === "answer") { + await handleAnswerCallback( + c, + cq, + parsed, + chatId, + messageId, + telegram, + logger + ); + return c.json({ ok: true }); + } + + await handleConfirmOrCancel( + c, + cq, + parsed, + chatId, + messageId, + telegram, + logger + ); + return c.json({ ok: true }); +}; +const ackAndClearKeyboard = async ( + cq: TelegramCallbackQuery, + chatId: number, + messageId: number | undefined, + telegram: TelegramService, + logger: Logger, + toast: string | undefined +): Promise => { + await telegram + .answerCallbackQuery({ callback_query_id: cq.id, text: toast }) + .catch((err: unknown) => { + logger.warn("answerCallbackQuery failed", { + error: err instanceof Error ? err.message : String(err), + }); + }); + if (messageId !== undefined) { + await telegram + .editMessageReplyMarkup({ chat_id: chatId, message_id: messageId }) + .catch((err: unknown) => { + logger.warn("editMessageReplyMarkup failed", { + error: err instanceof Error ? err.message : String(err), + }); + }); + } +}; + +const handleConfirmOrCancel = async ( + c: Context, + cq: TelegramCallbackQuery, + parsed: Extract, + chatId: number, + messageId: number | undefined, + telegram: TelegramService, + logger: Logger +): Promise => { + const pendingService = new PendingActionService(c.env.DB); let toast: string | undefined; let resultMessage: string | undefined; try { @@ -407,22 +259,7 @@ const handleCallbackQuery = async ( }); toast = "Something went wrong"; } finally { - await telegram - .answerCallbackQuery({ callback_query_id: cq.id, text: toast }) - .catch((err: unknown) => { - logger.warn("answerCallbackQuery failed", { - error: err instanceof Error ? err.message : String(err), - }); - }); - if (messageId !== undefined) { - await telegram - .editMessageReplyMarkup({ chat_id: chatId, message_id: messageId }) - .catch((err: unknown) => { - logger.warn("editMessageReplyMarkup failed", { - error: err instanceof Error ? err.message : String(err), - }); - }); - } + await ackAndClearKeyboard(cq, chatId, messageId, telegram, logger, toast); } if (resultMessage) { @@ -430,8 +267,110 @@ const handleCallbackQuery = async ( await telegram.sendMessage({ chat_id: chatId, text: chunk }); } } +}; - return c.json({ ok: true }); +const handleAnswerCallback = async ( + c: Context, + cq: TelegramCallbackQuery, + parsed: Extract, + chatId: number, + messageId: number | undefined, + telegram: TelegramService, + logger: Logger +): Promise => { + const conversationService = new PendingConversationService(c.env.DB); + + let consumed: Awaited>; + try { + // Peek first so an out-of-range/tampered index doesn't destroy the + // pending state — only consume after the index is validated. + const peek = await conversationService.getByToken(chatId, parsed.token); + if (!peek) { + await ackAndClearKeyboard( + cq, + chatId, + messageId, + telegram, + logger, + "Expired or already used" + ); + return; + } + + if (parsed.optionIndex < 0 || parsed.optionIndex >= peek.options.length) { + logger.warn("answer callback option index out of range", { + callbackId: cq.id, + optionIndex: parsed.optionIndex, + optionsLength: peek.options.length, + }); + await ackAndClearKeyboard( + cq, + chatId, + messageId, + telegram, + logger, + "Invalid option" + ); + return; + } + + // Atomic delete-by-token guards against double-tap; only one click wins. + consumed = await conversationService.consumeByToken(chatId, parsed.token); + if (!consumed) { + await ackAndClearKeyboard( + cq, + chatId, + messageId, + telegram, + logger, + "Expired or already used" + ); + return; + } + } catch (error) { + logger.error("answer callback consume failed", { + callbackId: cq.id, + error: error instanceof Error ? error.message : String(error), + }); + await ackAndClearKeyboard( + cq, + chatId, + messageId, + telegram, + logger, + "Something went wrong" + ); + return; + } + + // Ack the click immediately so the user's button stops spinning while we + // call OpenAI; clear the inline keyboard from the question message. + await ackAndClearKeyboard( + cq, + chatId, + messageId, + telegram, + logger, + "Recorded" + ); + + const chosen = consumed.options[parsed.optionIndex]; + const runner = new ConversationRunner(chatId, c.env, logger, telegram); + try { + await runner.resumeFromAnswer( + consumed.messages as Parameters[0], + consumed.pendingToolCallId, + chosen + ); + } catch (error) { + // The user's click was already acked above, so failing the webhook here + // would cause Telegram to retry — which can't redeliver the click. + // Log and continue. + logger.error("dispatch resumed outcome failed", { + callbackId: cq.id, + error: error instanceof Error ? error.message : String(error), + }); + } }; -export { handleWebhook, parseCallbackData }; +export { handleWebhook }; diff --git a/apps/operator/src/modules/telegram/conversation-runner.ts b/apps/operator/src/modules/telegram/conversation-runner.ts new file mode 100644 index 0000000..02d53a1 --- /dev/null +++ b/apps/operator/src/modules/telegram/conversation-runner.ts @@ -0,0 +1,261 @@ +import type { Logger } from "@repo/logger"; +import { validateSourceUrl } from "@repo/url-validator"; + +import { buildInitialMessages, OpenAiService } from "../../services/openai"; +import type { + ToolExecutor, + ToolLoopMessages, + ToolLoopOutcome, + ToolResult, +} from "../../services/openai"; +import { PendingActionService } from "../../services/pending-action"; +import { PendingConversationService } from "../../services/pending-conversation"; +import type { QuestionOption } from "../../services/pending-conversation"; +import { + createScheduleSchema, + MAX_ACTIVE_SCHEDULES, + ScheduleService, +} from "../../services/schedule"; +import type { TelegramService } from "../../services/telegram"; +import type { AppEnv } from "../../types/env"; +import { markdownToTelegramHtml } from "../../utils/markdown-to-html"; +import { + splitMessage, + TELEGRAM_HTML_SAFE_LENGTH, + TELEGRAM_MAX_MESSAGE_LENGTH, +} from "../../utils/message"; +import { + buildConfirmationKeyboard, + buildQuestionKeyboard, + formatScheduleDescription, + mapToolArgsToInput, +} from "./ui"; + +class ConversationRunner { + private readonly env: AppEnv["Bindings"]; + private readonly logger: Logger; + private readonly chatId: number; + private readonly telegram: TelegramService; + private readonly pendingActions: PendingActionService; + private readonly pendingConversations: PendingConversationService; + private pendingButtonToken: string | undefined; + + constructor( + chatId: number, + env: AppEnv["Bindings"], + logger: Logger, + telegram: TelegramService + ) { + this.chatId = chatId; + this.env = env; + this.logger = logger; + this.telegram = telegram; + this.pendingActions = new PendingActionService(env.DB); + this.pendingConversations = new PendingConversationService(env.DB); + } + + async startFromMessage(userMessage: string): Promise { + // Any new typed message starts a fresh conversation — discard a stale + // pending question so the user can change direction by typing. + await this.pendingConversations.clear(this.chatId); + await this.runLoop(buildInitialMessages(userMessage)); + } + + async resumeFromAnswer( + priorMessages: ToolLoopMessages, + pendingToolCallId: string, + chosen: QuestionOption + ): Promise { + const messages: ToolLoopMessages = [ + ...priorMessages, + { + role: "tool", + tool_call_id: pendingToolCallId, + content: JSON.stringify({ value: chosen.value, label: chosen.label }), + }, + ]; + await this.runLoop(messages); + } + + private async runLoop(messages: ToolLoopMessages): Promise { + const scheduleService = new ScheduleService(this.env.DB); + const toolExecutor = this.buildToolExecutor(scheduleService); + + let outcome: ToolLoopOutcome | undefined; + try { + const openai = new OpenAiService(this.env.OPENAI_API_KEY, this.logger); + outcome = await openai.runToolLoop(messages, toolExecutor); + } catch (error) { + await this.handleLoopError(error); + return; + } + + try { + await this.dispatchOutcome(outcome); + } catch (error) { + await this.pendingActions.clear(this.chatId); + throw error; + } + } + + private buildToolExecutor(scheduleService: ScheduleService): ToolExecutor { + return async (name, args): Promise => { + this.logger.debug("tool call received", { tool: name }); + + if (name === "list_schedules") { + const list = await scheduleService.list(this.chatId); + return { + result: JSON.stringify( + list.map((s, idx) => ({ + position: idx + 1, + id: s.id, + type: s.scheduleType, + description: s.description, + hour: s.hour, + minute: s.minute, + dayOfWeek: s.dayOfWeek, + dayOfMonth: s.dayOfMonth, + timezone: s.timezone, + sourceUrl: s.sourceUrl, + nextRunAt: s.nextRunAt, + })) + ), + }; + } + + if (name === "create_schedule") { + if ( + typeof args.source_url === "string" && + args.source_url.length > 0 && + args.use_browser === undefined + ) { + return { + error: + "Monitors require an explicit use_browser choice. Call ask_user_question first to ask the user whether this URL needs the browser scraper (JS rendering), then retry create_schedule with the chosen boolean.", + }; + } + + const count = await scheduleService.countActive(this.chatId); + if (count >= MAX_ACTIVE_SCHEDULES) { + return { + error: `Quota exceeded: maximum ${String(MAX_ACTIVE_SCHEDULES)} active schedules`, + }; + } + + const input = mapToolArgsToInput(args); + const validation = createScheduleSchema.safeParse(input); + if (!validation.success) { + return { error: validation.error.message }; + } + + if (input.sourceUrl) { + const urlCheck = validateSourceUrl(input.sourceUrl); + if (!urlCheck.valid) { + return { error: urlCheck.reason }; + } + } + + this.pendingButtonToken = await this.pendingActions.set(this.chatId, { + type: "create_schedule", + payload: input as unknown as Record, + description: formatScheduleDescription("create", args), + }); + + return { + result: `Confirmation buttons attached. Reply with a short summary like 'Confirm creating: ${formatScheduleDescription("create", args)}' — do NOT ask the user to type YES.`, + }; + } + + if (name === "delete_schedule") { + const id = args.id as string | undefined; + if (!id) { + return { error: "Missing schedule ID" }; + } + const summary = + typeof args.summary === "string" && args.summary.trim().length > 0 + ? args.summary.trim() + : undefined; + if (!summary) { + return { + error: + "Missing summary. Pass a human-readable summary built from the matching list_schedules entry (type, time, description) so the user can recognize what's being deleted.", + }; + } + + this.pendingButtonToken = await this.pendingActions.set(this.chatId, { + type: "delete_schedule", + payload: { id }, + description: `Delete: ${summary}`, + }); + + return { + result: `Confirmation buttons attached. Reply with a short summary like 'Confirm deleting: ${summary}' — do NOT ask the user to type YES.`, + }; + } + + return { error: `Unknown tool: ${name}` }; + }; + } + + private async dispatchOutcome(outcome: ToolLoopOutcome): Promise { + if (outcome.kind === "ask_user_question") { + const questionToken = await this.pendingConversations.set(this.chatId, { + messages: outcome.messages, + pendingToolCallId: outcome.toolCallId, + options: outcome.options, + }); + + try { + const replyMarkup = buildQuestionKeyboard( + questionToken, + outcome.options + ); + const chunks = [ + ...splitMessage(outcome.question, TELEGRAM_MAX_MESSAGE_LENGTH), + ]; + for (let i = 0; i < chunks.length; i++) { + const isLast = i === chunks.length - 1; + await this.telegram.sendMessage({ + chat_id: this.chatId, + text: chunks[i], + ...(isLast ? { reply_markup: replyMarkup } : {}), + }); + } + } catch (error) { + await this.pendingConversations.clear(this.chatId); + throw error; + } + return; + } + + const replyMarkup = this.pendingButtonToken + ? buildConfirmationKeyboard(this.pendingButtonToken) + : undefined; + const chunks = [ + ...splitMessage(outcome.content, TELEGRAM_HTML_SAFE_LENGTH), + ]; + for (let i = 0; i < chunks.length; i++) { + const isLast = i === chunks.length - 1; + await this.telegram.sendMessage({ + chat_id: this.chatId, + text: markdownToTelegramHtml(chunks[i]), + parse_mode: "HTML" as const, + ...(isLast && replyMarkup ? { reply_markup: replyMarkup } : {}), + }); + } + } + + private async handleLoopError(error: unknown): Promise { + this.logger.error("openai request failed", { + error: error instanceof Error ? error.message : String(error), + }); + await this.pendingActions.clear(this.chatId); + await this.pendingConversations.clear(this.chatId); + await this.telegram.sendMessage({ + chat_id: this.chatId, + text: "Something went wrong while generating a response. Please try again.", + }); + } +} + +export { ConversationRunner }; diff --git a/apps/operator/src/modules/telegram/routes.test.ts b/apps/operator/src/modules/telegram/routes.test.ts index 611c0f1..9f50e2d 100644 --- a/apps/operator/src/modules/telegram/routes.test.ts +++ b/apps/operator/src/modules/telegram/routes.test.ts @@ -23,6 +23,21 @@ vi.mock("../../services/pending-action", () => ({ clear = pendingClearMock; set = pendingSetMock; }, + generateToken: () => "tok-fake", +})); + +const conversationGetByTokenMock = vi.fn(); +const conversationConsumeByTokenMock = vi.fn(); +const conversationSetMock = vi.fn(); +const conversationClearMock = vi.fn(); + +vi.mock("../../services/pending-conversation", () => ({ + PendingConversationService: class { + getByToken = conversationGetByTokenMock; + consumeByToken = conversationConsumeByTokenMock; + set = conversationSetMock; + clear = conversationClearMock; + }, })); import { onErrorHandler } from "../../middleware/error-handlers"; @@ -279,6 +294,120 @@ describe("POST /webhook/telegram", () => { expect(sentBody.text).not.toContain("**bold**"); }); + it("nudges the model to ask first when create_schedule omits use_browser for a monitor", async () => { + openaiCreateMock.mockClear(); + // First turn: model emits create_schedule with source_url but no use_browser. + openaiCreateMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_create_1", + type: "function", + function: { + name: "create_schedule", + arguments: JSON.stringify({ + schedule_type: "daily", + hour: 9, + minute: 0, + timezone: "Europe/Helsinki", + source_url: "https://twitter.com/elonmusk", + message_prompt: "Check for new posts", + description: "Monitor Twitter", + }), + }, + }, + ], + }, + }, + ], + }); + // Second turn: model corrects and asks the question. + openaiCreateMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_q_1", + type: "function", + function: { + name: "ask_user_question", + arguments: JSON.stringify({ + question: + "Should I use the browser scraper for this page (renders JavaScript)?", + options: [ + { label: "Yes — needs JS rendering", value: true }, + { label: "No — static HTML", value: false }, + ], + }), + }, + }, + ], + }, + }, + ], + }); + conversationSetMock.mockResolvedValueOnce("conv-tok-1"); + + const update = { + ...validUpdate, + message: { ...validUpdate.message, text: "monitor twitter.com" }, + }; + const res = await sendRequest(update, { + "x-telegram-bot-api-secret-token": ENV.TELEGRAM_WEBHOOK_SECRET, + }); + + expect(res.status).toBe(200); + + // The second OpenAI call must include the guard error as the tool result + // for the rejected create_schedule call. + const secondCall = openaiCreateMock.mock.calls[1] as + | [ + { + messages: { + role: string; + tool_call_id?: string; + content?: string; + }[]; + }, + ] + | undefined; + expect(secondCall).toBeDefined(); + if (!secondCall) { + return; + } + const toolMessage = secondCall[0].messages.find( + (m) => m.role === "tool" && m.tool_call_id === "call_create_1" + ); + expect(toolMessage?.content).toContain("use_browser"); + expect(toolMessage?.content).toContain("ask_user_question"); + + // No schedule was committed — pendingActions.set was not invoked. + expect(pendingSetMock).not.toHaveBeenCalled(); + + // Question was dispatched to the user with inline buttons. + expect(conversationSetMock).toHaveBeenCalledTimes(1); + const sendCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/sendMessage") + ); + expect(sendCall).toBeDefined(); + if (!sendCall) { + return; + } + const sendBody = JSON.parse( + (sendCall[1] as { body: string }).body + ) as Record; + expect(sendBody.reply_markup).toBeDefined(); + }); + it("sends error message as plain text when OpenAI fails", async () => { openaiCreateMock.mockRejectedValueOnce( new Error("401 insufficient permissions") @@ -434,6 +563,14 @@ describe("POST /webhook/telegram — callback_query", () => { pendingClearMock.mockResolvedValue(undefined); pendingSetMock.mockReset(); pendingSetMock.mockResolvedValue("tok-default"); + conversationGetByTokenMock.mockReset(); + conversationGetByTokenMock.mockResolvedValue(undefined); + conversationConsumeByTokenMock.mockReset(); + conversationConsumeByTokenMock.mockResolvedValue(undefined); + conversationSetMock.mockReset(); + conversationSetMock.mockResolvedValue("conv-tok"); + conversationClearMock.mockReset(); + conversationClearMock.mockResolvedValue(undefined); mockFetch.mockImplementation(() => Promise.resolve( new Response(JSON.stringify({ ok: true }), { @@ -611,4 +748,220 @@ describe("POST /webhook/telegram — callback_query", () => { expect(removeMock).toHaveBeenCalledTimes(1); }); + + describe("question-answer callback (q::)", () => { + it("acks 'Malformed' for non-numeric option index", async () => { + const res = await sendRequest( + callbackUpdate({ data: "q:tok-1:abc" }), + headers + ); + + expect(res.status).toBe(200); + expect(conversationConsumeByTokenMock).not.toHaveBeenCalled(); + const ackCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/answerCallbackQuery") + ); + expect(ackCall).toBeDefined(); + if (!ackCall) { + return; + } + const ackBody = JSON.parse( + (ackCall[1] as { body: string }).body + ) as Record; + expect(ackBody.text).toBe("Malformed"); + }); + + it("acks 'Expired or already used' when conversation token is unknown", async () => { + conversationGetByTokenMock.mockResolvedValueOnce(undefined); + + const res = await sendRequest( + callbackUpdate({ data: "q:tok-x:0" }), + headers + ); + + expect(res.status).toBe(200); + expect(conversationGetByTokenMock).toHaveBeenCalledWith(12345, "tok-x"); + // Out-of-range/unknown tokens must not consume the row. + expect(conversationConsumeByTokenMock).not.toHaveBeenCalled(); + const ackCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/answerCallbackQuery") + ); + expect(ackCall).toBeDefined(); + if (!ackCall) { + return; + } + const ackBody = JSON.parse( + (ackCall[1] as { body: string }).body + ) as Record; + expect(ackBody.text).toBe("Expired or already used"); + }); + + it("acks 'Invalid option' when index is out of range and preserves pending state", async () => { + conversationGetByTokenMock.mockResolvedValueOnce({ + messages: [], + pendingToolCallId: "call_q", + options: [ + { label: "Yes", value: true }, + { label: "No", value: false }, + ], + }); + + const res = await sendRequest( + callbackUpdate({ data: "q:tok-r:9" }), + headers + ); + + expect(res.status).toBe(200); + // The pending row must remain intact so a tampered/malformed callback + // can't strand the user with no way to retry from the real buttons. + expect(conversationConsumeByTokenMock).not.toHaveBeenCalled(); + const ackCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/answerCallbackQuery") + ); + expect(ackCall).toBeDefined(); + if (!ackCall) { + return; + } + const ackBody = JSON.parse( + (ackCall[1] as { body: string }).body + ) as Record; + expect(ackBody.text).toBe("Invalid option"); + }); + + it("acks 'Something went wrong' when peek throws", async () => { + conversationGetByTokenMock.mockRejectedValueOnce(new Error("db down")); + + const res = await sendRequest( + callbackUpdate({ data: "q:tok-r:0" }), + headers + ); + + expect(res.status).toBe(200); + const ackCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/answerCallbackQuery") + ); + expect(ackCall).toBeDefined(); + if (!ackCall) { + return; + } + const ackBody = JSON.parse( + (ackCall[1] as { body: string }).body + ) as Record; + expect(ackBody.text).toBe("Something went wrong"); + }); + + it("happy path: appends tool result with raw boolean value and resumes", async () => { + const stored = { + messages: [ + { role: "system", content: "sys" }, + { role: "user", content: "monitor twitter.com" }, + { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_q", + type: "function", + function: { + name: "ask_user_question", + arguments: JSON.stringify({ + question: "Use browser?", + options: [ + { label: "Yes", value: true }, + { label: "No", value: false }, + ], + }), + }, + }, + ], + }, + ], + pendingToolCallId: "call_q", + options: [ + { label: "Yes", value: true }, + { label: "No", value: false }, + ], + }; + conversationGetByTokenMock.mockResolvedValueOnce(stored); + conversationConsumeByTokenMock.mockResolvedValueOnce(stored); + + // OpenAI returns a final answer on resume. + openaiCreateMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "stop", + message: { role: "assistant", content: "Got it." }, + }, + ], + }); + + const res = await sendRequest( + callbackUpdate({ data: "q:tok-r:0" }), + headers + ); + + expect(res.status).toBe(200); + expect(conversationConsumeByTokenMock).toHaveBeenCalledWith( + 12345, + "tok-r" + ); + + // The OpenAI call should have received messages with a tool result that + // carries the raw boolean (NOT a string). + const openaiCall = openaiCreateMock.mock.calls.at(-1) as + | [{ messages: { role: string; content?: string }[] }] + | undefined; + expect(openaiCall).toBeDefined(); + if (!openaiCall) { + return; + } + const sentMessages = openaiCall[0].messages; + const toolResult = sentMessages.find((m) => m.role === "tool"); + expect(toolResult).toBeDefined(); + expect(toolResult?.content).toBe( + JSON.stringify({ value: true, label: "Yes" }) + ); + + // The bot then sends the LLM's final reply. + const sendCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/sendMessage") + ); + expect(sendCall).toBeDefined(); + }); + + it("ack toast says 'Recorded' on happy path", async () => { + const stored = { + messages: [{ role: "system", content: "sys" }], + pendingToolCallId: "call_q", + options: [ + { label: "Yes", value: true }, + { label: "No", value: false }, + ], + }; + conversationGetByTokenMock.mockResolvedValueOnce(stored); + conversationConsumeByTokenMock.mockResolvedValueOnce(stored); + openaiCreateMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "stop", + message: { role: "assistant", content: "ok" }, + }, + ], + }); + + await sendRequest(callbackUpdate({ data: "q:tok-r:1" }), headers); + + const ackCall = mockFetch.mock.calls.find( + (c) => typeof c[0] === "string" && c[0].endsWith("/answerCallbackQuery") + ); + expect(ackCall).toBeDefined(); + if (!ackCall) { + return; + } + const ackBody = JSON.parse( + (ackCall[1] as { body: string }).body + ) as Record; + expect(ackBody.text).toBe("Recorded"); + }); + }); }); diff --git a/apps/operator/src/modules/telegram/ui.ts b/apps/operator/src/modules/telegram/ui.ts new file mode 100644 index 0000000..2530b6d --- /dev/null +++ b/apps/operator/src/modules/telegram/ui.ts @@ -0,0 +1,86 @@ +import type { QuestionOption } from "../../services/pending-conversation"; +import type { CreateScheduleInput } from "../../services/schedule"; +import type { InlineKeyboardMarkup } from "../../types/telegram"; + +const DAYS = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]; +const QUESTION_BUTTON_LABEL_MAX = 32; + +const formatScheduleDescription = ( + type: string, + args: Record +): string => { + const schedType = + typeof args.schedule_type === "string" ? args.schedule_type : type; + const parts: string[] = [`${schedType} schedule`]; + if (args.hour != null) { + const h = typeof args.hour === "number" ? String(args.hour) : "0"; + const m = + typeof args.minute === "number" + ? String(args.minute).padStart(2, "0") + : "00"; + parts.push(`at ${h}:${m}`); + } + if (typeof args.day_of_week === "number") { + parts.push(`on ${DAYS[args.day_of_week] ?? "?"}`); + } + if (typeof args.day_of_month === "number") { + parts.push(`on day ${String(args.day_of_month)}`); + } + const tz = typeof args.timezone === "string" ? args.timezone : "UTC"; + parts.push(`(${tz})`); + if (args.use_browser === true) { + parts.push("(browser rendering)"); + } + if (typeof args.description === "string") { + parts.push(`— "${args.description}"`); + } + return parts.join(" "); +}; + +const mapToolArgsToInput = ( + args: Record +): CreateScheduleInput => ({ + scheduleType: args.schedule_type as CreateScheduleInput["scheduleType"], + hour: args.hour as number | undefined, + minute: args.minute as number | undefined, + dayOfWeek: args.day_of_week as number | undefined, + dayOfMonth: args.day_of_month as number | undefined, + timezone: (args.timezone as string | undefined) ?? "Europe/Helsinki", + fixedMessage: args.fixed_message as string | undefined, + messagePrompt: args.message_prompt as string | undefined, + sourceUrl: args.source_url as string | undefined, + keywords: args.keywords as string[] | undefined, + useBrowser: args.use_browser as boolean | undefined, + description: (args.description as string | undefined) ?? "", +}); + +const buildConfirmationKeyboard = (token: string): InlineKeyboardMarkup => ({ + inline_keyboard: [ + [ + { text: "✅ Yes", callback_data: `c:${token}` }, + { text: "❌ No", callback_data: `x:${token}` }, + ], + ], +}); + +const truncateLabel = (label: string, max: number): string => + label.length <= max ? label : `${label.slice(0, max - 1)}…`; + +const buildQuestionKeyboard = ( + token: string, + options: QuestionOption[] +): InlineKeyboardMarkup => ({ + inline_keyboard: [ + options.map((opt, idx) => ({ + text: truncateLabel(opt.label, QUESTION_BUTTON_LABEL_MAX), + callback_data: `q:${token}:${String(idx)}`, + })), + ], +}); + +export { + buildConfirmationKeyboard, + buildQuestionKeyboard, + formatScheduleDescription, + mapToolArgsToInput, +}; diff --git a/apps/operator/src/services/openai.test.ts b/apps/operator/src/services/openai.test.ts index 85d6556..718132e 100644 --- a/apps/operator/src/services/openai.test.ts +++ b/apps/operator/src/services/openai.test.ts @@ -1,8 +1,8 @@ import type { Logger } from "@repo/logger"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { OpenAiService } from "./openai"; -import type { ToolExecutor } from "./openai"; +import { buildInitialMessages, OpenAiService } from "./openai"; +import type { ToolExecutor, ToolLoopMessages } from "./openai"; const createMock = vi.fn(); const constructorMock = vi.fn(); @@ -281,6 +281,325 @@ describe("OpenAiService", () => { }); }); + describe("runToolLoop", () => { + const validQuestionArgs = JSON.stringify({ + question: "Use browser rendering?", + options: [ + { label: "Yes — needs JS", value: true }, + { label: "No — static HTML", value: false }, + ], + }); + + it("returns ask_user_question outcome when the model emits the tool", async () => { + createMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_q1", + type: "function", + function: { + name: "ask_user_question", + arguments: validQuestionArgs, + }, + }, + ], + }, + }, + ], + }); + + const executor: ToolExecutor = vi.fn(); + const messages = buildInitialMessages("monitor twitter.com"); + const outcome = await service.runToolLoop(messages, executor); + + expect(outcome.kind).toBe("ask_user_question"); + if (outcome.kind !== "ask_user_question") { + return; + } + expect(outcome.toolCallId).toBe("call_q1"); + expect(outcome.question).toBe("Use browser rendering?"); + expect(outcome.options).toHaveLength(2); + expect(outcome.options[0].value).toBe(true); + expect(outcome.options[1].value).toBe(false); + // The paused outcome contains the assistant turn but no tool result yet + // for the question's tool_call_id. + expect(executor).not.toHaveBeenCalled(); + const lastMsg = outcome.messages[outcome.messages.length - 1]; + expect(lastMsg.role).toBe("assistant"); + const toolResultsForQuestion = outcome.messages.filter( + (m) => + m.role === "tool" && + (m as { tool_call_id?: string }).tool_call_id === "call_q1" + ); + expect(toolResultsForQuestion).toHaveLength(0); + }); + + it("processes sibling tool calls before suspending on ask_user_question", async () => { + createMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_list", + type: "function", + function: { name: "list_schedules", arguments: "{}" }, + }, + { + id: "call_q", + type: "function", + function: { + name: "ask_user_question", + arguments: validQuestionArgs, + }, + }, + ], + }, + }, + ], + }); + + const executor: ToolExecutor = vi + .fn() + .mockResolvedValueOnce({ result: "[]" }); + + const messages = buildInitialMessages("test"); + const outcome = await service.runToolLoop(messages, executor); + + expect(executor).toHaveBeenCalledTimes(1); + expect(executor).toHaveBeenCalledWith("list_schedules", {}); + expect(outcome.kind).toBe("ask_user_question"); + // Sibling's tool result is in messages so the assistant turn is satisfiable + // on resume. + const siblingResult = outcome.messages.find( + (m) => + m.role === "tool" && + (m as { tool_call_id?: string }).tool_call_id === "call_list" + ); + expect(siblingResult).toBeDefined(); + }); + + it("rejects mutating sibling tool calls when ask_user_question is in the same batch", async () => { + createMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_create", + type: "function", + function: { + name: "create_schedule", + arguments: "{}", + }, + }, + { + id: "call_q", + type: "function", + function: { + name: "ask_user_question", + arguments: validQuestionArgs, + }, + }, + ], + }, + }, + ], + }); + + const executor: ToolExecutor = vi.fn(); + const messages = buildInitialMessages("monitor twitter.com"); + const outcome = await service.runToolLoop(messages, executor); + + // Mutating sibling must NOT execute; the model gets a tool error so + // it can retry on the next turn (after the user answers). + expect(executor).not.toHaveBeenCalled(); + expect(outcome.kind).toBe("ask_user_question"); + const rejection = outcome.messages.find( + (m) => + m.role === "tool" && + (m as { tool_call_id?: string }).tool_call_id === "call_create" + ) as { content?: string } | undefined; + expect(rejection?.content).toContain("create_schedule"); + expect(rejection?.content).toContain("ask_user_question"); + }); + + it("denies the 4th ask_user_question and continues to final", async () => { + const priorAskTurn = (id: string): unknown => ({ + role: "assistant", + content: null, + tool_calls: [ + { + id, + type: "function", + function: { + name: "ask_user_question", + arguments: validQuestionArgs, + }, + }, + ], + }); + const priorAnswer = (id: string): unknown => ({ + role: "tool", + tool_call_id: id, + content: JSON.stringify({ value: true, label: "Yes" }), + }); + + const messages = [ + ...buildInitialMessages("test"), + priorAskTurn("p1"), + priorAnswer("p1"), + priorAskTurn("p2"), + priorAnswer("p2"), + priorAskTurn("p3"), + priorAnswer("p3"), + ] as ToolLoopMessages; + + // Model tries to ask a 4th question; loop denies, then on next round + // model returns a final answer. + createMock + .mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_q4", + type: "function", + function: { + name: "ask_user_question", + arguments: validQuestionArgs, + }, + }, + ], + }, + }, + ], + }) + .mockResolvedValueOnce({ + choices: [ + { + finish_reason: "stop", + message: { role: "assistant", content: "Done." }, + }, + ], + }); + + const executor: ToolExecutor = vi.fn(); + const outcome = await service.runToolLoop(messages, executor); + + expect(outcome.kind).toBe("final"); + if (outcome.kind !== "final") { + return; + } + expect(outcome.content).toBe("Done."); + // Quota error fed back as tool result + const quotaToolResult = messages.find( + (m) => + m.role === "tool" && + (m as { tool_call_id?: string }).tool_call_id === "call_q4" + ) as { content?: string } | undefined; + expect(quotaToolResult).toBeDefined(); + expect(quotaToolResult?.content).toContain("quota"); + }); + + it("returns a tool error for malformed ask_user_question args", async () => { + const badArgs = JSON.stringify({ question: "" }); + + createMock + .mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_bad", + type: "function", + function: { + name: "ask_user_question", + arguments: badArgs, + }, + }, + ], + }, + }, + ], + }) + .mockResolvedValueOnce({ + choices: [ + { + finish_reason: "stop", + message: { role: "assistant", content: "Recovered." }, + }, + ], + }); + + const messages = buildInitialMessages("test"); + const outcome = await service.runToolLoop(messages, vi.fn()); + + expect(outcome.kind).toBe("final"); + const errResult = messages.find( + (m) => + m.role === "tool" && + (m as { tool_call_id?: string }).tool_call_id === "call_bad" + ) as { content?: string } | undefined; + expect(errResult?.content).toContain("Invalid ask_user_question"); + }); + }); + + describe("replyWithTools (back-compat)", () => { + it("throws if the model tries to ask a question", async () => { + createMock.mockResolvedValueOnce({ + choices: [ + { + finish_reason: "tool_calls", + message: { + role: "assistant", + content: null, + tool_calls: [ + { + id: "call_q", + type: "function", + function: { + name: "ask_user_question", + arguments: JSON.stringify({ + question: "?", + options: [ + { label: "A", value: "a" }, + { label: "B", value: "b" }, + ], + }), + }, + }, + ], + }, + }, + ], + }); + + await expect(service.replyWithTools("test", vi.fn())).rejects.toThrow( + /not supported in replyWithTools/ + ); + }); + }); + describe("analyzeMonitor", () => { it("returns parsed monitor analysis", async () => { const analysis = { diff --git a/apps/operator/src/services/openai.ts b/apps/operator/src/services/openai.ts index a3f39ee..31b2b46 100644 --- a/apps/operator/src/services/openai.ts +++ b/apps/operator/src/services/openai.ts @@ -3,6 +3,8 @@ import OpenAI from "openai"; import type { ChatCompletionTool } from "openai/resources/chat/completions"; import { z } from "zod"; +import type { QuestionOption } from "./pending-conversation"; + const SYSTEM_PROMPT = `You are a helpful personal assistant called Switch Operator. Be concise and helpful. You can create, list, and delete scheduled messages for the user. @@ -24,12 +26,22 @@ The message_prompt should describe what to look for or how to analyze the page c For monitors with large pages, use the keywords parameter to pre-filter content before AI analysis. When keywords are set, the system only calls AI if at least one keyword appears on the page — saving time and cost. +Use the use_browser parameter (boolean) on create_schedule to opt a monitor into JavaScript rendering via the browser scraper. +Whenever source_url is set on a create_schedule call, you MUST first call ask_user_question to confirm whether the monitor needs the browser scraper. Do not infer use_browser from the URL. Do not omit it. Do not call create_schedule and ask_user_question in the same turn — emit ask_user_question alone, then call create_schedule after the user's answer arrives, passing the chosen boolean verbatim into create_schedule.use_browser. +Use this exact form for the question: + question: "Should I use the browser scraper for this page (renders JavaScript)?" + options: + - { label: "Yes — needs JS rendering", value: true } + - { label: "No — static HTML", value: false } + +Do not ask about timezones, schedule types, or anything you can derive from the user's wording. + Monitor examples: - "Notify me when Beck is on TV" → source_url with the TV listings page, message_prompt: "Check if Beck appears in today's listings. Notify with channel and time if found.", keywords: ["Beck"] - "Weekly report changes" → source_url with the report page, message_prompt: "Compare this week's content to last week. Summarize key changes." -When listing schedules, format them as a numbered list (1, 2, 3...) with key details like description, type, time, and next run. -When the user asks to delete a schedule by number, first call list_schedules to get the current list, then use the ID from the matching position to call delete_schedule.`; +When listing schedules, render each entry on its own line prefixed with its position number followed by a period and a space (e.g. "1. ", "2. "). Use the position field from the tool result verbatim — do not renumber. Include description, type, time, and next run on each line. +When the user asks to delete a schedule by number, first call list_schedules to get the current list, then call delete_schedule with the ID from the matching position AND a short human-readable summary (type, time, description) so the user can recognize the row in the confirmation prompt.`; const SCHEDULE_TOOLS: ChatCompletionTool[] = [ { @@ -87,6 +99,11 @@ const SCHEDULE_TOOLS: ChatCompletionTool[] = [ description: "Optional keywords to pre-filter scraped page content before AI analysis. When set, only runs AI if at least one keyword appears on the page. Use for efficiency on large pages. Only valid for monitors (requires source_url).", }, + use_browser: { + type: "boolean", + description: + "When true, the monitor fetches via the browser scraper which executes JavaScript. Only useful for SPA / JS-rendered pages. Required whenever source_url is set — confirm the value with the user via ask_user_question before calling create_schedule.", + }, description: { type: "string", description: "Short description of this schedule (max 200 chars).", @@ -113,21 +130,135 @@ const SCHEDULE_TOOLS: ChatCompletionTool[] = [ type: "object", properties: { id: { type: "string", description: "The schedule ID to delete." }, + summary: { + type: "string", + description: + "Human-readable summary of the schedule being deleted (e.g. 'daily monitor at 9:00 — Twitter Elon'). Shown in the user's confirmation prompt so they can recognize what's about to be deleted. Build this from the matching list_schedules entry: include schedule type, time, and description.", + }, + }, + required: ["id", "summary"], + }, + }, + }, + { + type: "function", + function: { + name: "ask_user_question", + description: + "Ask the user a clarifying question with 2–4 button-labeled options. REQUIRED before every create_schedule call that has source_url set, to confirm use_browser. Otherwise use whenever a tool parameter affects observable behavior and you are not certain. The selected option's value flows back as the tool result; pass it directly into the appropriate downstream tool parameter (boolean for yes/no, string for choices).", + parameters: { + type: "object", + properties: { + question: { + type: "string", + description: "The question to display to the user (max 500 chars).", + }, + options: { + type: "array", + minItems: 2, + maxItems: 4, + items: { + type: "object", + properties: { + label: { + type: "string", + description: "Short button label (max 100 chars).", + }, + value: { + description: + "Opaque value passed back as the tool result when this option is chosen. Type matches the downstream parameter (boolean for yes/no, string for choices, number for counts).", + anyOf: [ + { type: "boolean" }, + { type: "string" }, + { type: "number" }, + ], + }, + }, + required: ["label", "value"], + }, + }, }, - required: ["id"], + required: ["question", "options"], }, }, }, ]; +const questionOptionSchema = z.object({ + label: z.string().min(1).max(100), + value: z.union([z.boolean(), z.string(), z.number()]), +}); + +const askUserQuestionSchema = z.object({ + question: z.string().min(1).max(500), + options: z.array(questionOptionSchema).min(2).max(4), +}); + +const MAX_QUESTIONS_PER_CONVERSATION = 3; + +// Tools that mutate persistent state. When the model emits one of these in +// the same assistant turn as ask_user_question, we refuse rather than +// execute it — otherwise we'd silently write a pending action that the +// user only sees after answering the question, leaving stale state if they +// don't. +const MUTATING_TOOLS = new Set(["create_schedule", "delete_schedule"]); + type ToolResult = { result: string } | { error: string }; type ToolExecutor = ( name: string, args: Record ) => Promise; +type ToolLoopMessages = OpenAI.Chat.Completions.ChatCompletionMessageParam[]; + +type ToolLoopOutcome = + | { kind: "final"; content: string; messages: ToolLoopMessages } + | { + kind: "ask_user_question"; + question: string; + options: QuestionOption[]; + toolCallId: string; + messages: ToolLoopMessages; + }; + const MAX_TOOL_ITERATIONS = 5; +const buildInitialMessages = (userMessage: string): ToolLoopMessages => [ + { role: "system", content: SYSTEM_PROMPT }, + { role: "user", content: userMessage }, +]; + +const countAskUserQuestionCalls = (messages: ToolLoopMessages): number => { + let count = 0; + for (const m of messages) { + if (m.role !== "assistant") { + continue; + } + const toolCalls = m.tool_calls; + if (!toolCalls) { + continue; + } + for (const tc of toolCalls) { + if (tc.type === "function" && tc.function.name === "ask_user_question") { + count++; + } + } + } + return count; +}; + +const pushToolError = ( + messages: ToolLoopMessages, + toolCallId: string, + error: string +): void => { + messages.push({ + role: "tool", + tool_call_id: toolCallId, + content: JSON.stringify({ error }), + }); +}; + class OpenAiService { private readonly client: OpenAI; private readonly logger: Logger; @@ -163,19 +294,14 @@ class OpenAiService { return content; } - async replyWithTools( - userMessage: string, + async runToolLoop( + messages: ToolLoopMessages, toolExecutor: ToolExecutor - ): Promise { - this.logger.debug("sending chat completion with tools", { - messageLength: userMessage.length, + ): Promise { + this.logger.debug("running tool loop", { + initialMessageCount: messages.length, }); - const messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] = [ - { role: "system", content: SYSTEM_PROMPT }, - { role: "user", content: userMessage }, - ]; - for (let i = 0; i < MAX_TOOL_ITERATIONS; i++) { const response = await this.client.chat.completions.create({ model: "gpt-5.4-mini", @@ -196,14 +322,87 @@ class OpenAiService { if (!content) { throw new Error("OpenAI returned empty response"); } - return content; + return { kind: "final", content, messages }; } + let pausedQuestion: + | { question: string; options: QuestionOption[]; toolCallId: string } + | undefined; + + // Pre-scan: if this batch contains an ask_user_question, mutating + // sibling tool calls must be rejected (not executed) so we don't + // commit hidden writes while the user only sees a question UI. + const hasAskInBatch = message.tool_calls.some( + (tc) => + tc.type === "function" && tc.function.name === "ask_user_question" + ); + for (const toolCall of message.tool_calls) { if (toolCall.type !== "function") { continue; } + if (toolCall.function.name === "ask_user_question") { + if (pausedQuestion) { + // Only one ask_user_question per turn is honored; extras get + // a tool error so the assistant turn is still well-formed. + pushToolError( + messages, + toolCall.id, + "Only one ask_user_question per assistant turn is supported." + ); + continue; + } + + let parsedArgs: unknown; + try { + parsedArgs = JSON.parse(toolCall.function.arguments); + } catch { + this.logger.error("failed to parse ask_user_question arguments", { + arguments: toolCall.function.arguments, + }); + pushToolError(messages, toolCall.id, "Invalid tool arguments"); + continue; + } + + const validation = askUserQuestionSchema.safeParse(parsedArgs); + if (!validation.success) { + pushToolError( + messages, + toolCall.id, + `Invalid ask_user_question args: ${validation.error.message}` + ); + continue; + } + + if ( + countAskUserQuestionCalls(messages) > MAX_QUESTIONS_PER_CONVERSATION + ) { + pushToolError( + messages, + toolCall.id, + "Question quota exceeded — proceed without further questions." + ); + continue; + } + + pausedQuestion = { + question: validation.data.question, + options: validation.data.options, + toolCallId: toolCall.id, + }; + continue; + } + + if (hasAskInBatch && MUTATING_TOOLS.has(toolCall.function.name)) { + pushToolError( + messages, + toolCall.id, + `${toolCall.function.name} cannot run in the same turn as ask_user_question. Wait for the user's answer, then call it on the next turn.` + ); + continue; + } + this.logger.debug("executing tool call", { tool: toolCall.function.name, iteration: i, @@ -220,26 +419,49 @@ class OpenAiService { tool: toolCall.function.name, arguments: toolCall.function.arguments, }); - messages.push({ - role: "tool", - tool_call_id: toolCall.id, - content: JSON.stringify({ error: "Invalid tool arguments" }), - }); + pushToolError(messages, toolCall.id, "Invalid tool arguments"); continue; } const result = await toolExecutor(toolCall.function.name, args); - messages.push({ role: "tool", tool_call_id: toolCall.id, content: JSON.stringify(result), }); } + + if (pausedQuestion) { + return { + kind: "ask_user_question", + question: pausedQuestion.question, + options: pausedQuestion.options, + toolCallId: pausedQuestion.toolCallId, + messages, + }; + } } throw new Error("Tool calling exceeded maximum iterations"); } + async replyWithTools( + userMessage: string, + toolExecutor: ToolExecutor + ): Promise { + this.logger.debug("sending chat completion with tools", { + messageLength: userMessage.length, + }); + + const messages = buildInitialMessages(userMessage); + const outcome = await this.runToolLoop(messages, toolExecutor); + if (outcome.kind !== "final") { + throw new Error( + "ask_user_question is not supported in replyWithTools — use runToolLoop directly to handle pause/resume." + ); + } + return outcome.content; + } + async analyzeMonitor(params: { task: string; scrapedContent: string; @@ -307,5 +529,19 @@ const monitorAnalysisSchema = z.object({ type MonitorAnalysis = z.infer; -export { MAX_TOOL_ITERATIONS, OpenAiService, SCHEDULE_TOOLS }; -export type { MonitorAnalysis, ToolExecutor, ToolResult }; +export { + buildInitialMessages, + countAskUserQuestionCalls, + MAX_QUESTIONS_PER_CONVERSATION, + MAX_TOOL_ITERATIONS, + OpenAiService, + SCHEDULE_TOOLS, + SYSTEM_PROMPT, +}; +export type { + MonitorAnalysis, + ToolExecutor, + ToolLoopMessages, + ToolLoopOutcome, + ToolResult, +}; diff --git a/apps/operator/src/services/pending-conversation.ts b/apps/operator/src/services/pending-conversation.ts new file mode 100644 index 0000000..3a4a32f --- /dev/null +++ b/apps/operator/src/services/pending-conversation.ts @@ -0,0 +1,138 @@ +import { and, eq, gt, lte } from "drizzle-orm"; +import { drizzle } from "drizzle-orm/d1"; +import type { DrizzleD1Database } from "drizzle-orm/d1"; + +import { pendingConversations } from "../db/schema"; +import { generateToken } from "./pending-action"; + +type QuestionOptionValue = boolean | string | number; + +type QuestionOption = { + label: string; + value: QuestionOptionValue; +}; + +type PendingConversation = { + messages: unknown[]; + pendingToolCallId: string; + options: QuestionOption[]; +}; + +const TTL_MS = 5 * 60 * 1000; + +type PendingConversationRow = { + messagesJson: string; + pendingToolCallId: string; + optionsJson: string; +}; + +const parseRow = ( + row: PendingConversationRow +): PendingConversation | undefined => { + try { + return { + messages: JSON.parse(row.messagesJson) as unknown[], + pendingToolCallId: row.pendingToolCallId, + options: JSON.parse(row.optionsJson) as QuestionOption[], + }; + } catch { + return undefined; + } +}; + +class PendingConversationService { + private readonly db: DrizzleD1Database; + + constructor(d1: D1Database) { + this.db = drizzle(d1); + } + + async set( + chatId: number, + conversation: PendingConversation + ): Promise { + const expiresAt = new Date(Date.now() + TTL_MS).toISOString(); + const token = generateToken(); + const messagesJson = JSON.stringify(conversation.messages); + const optionsJson = JSON.stringify(conversation.options); + await this.db + .insert(pendingConversations) + .values({ + chatId, + messagesJson, + pendingToolCallId: conversation.pendingToolCallId, + optionsJson, + token, + expiresAt, + }) + .onConflictDoUpdate({ + target: pendingConversations.chatId, + set: { + messagesJson, + pendingToolCallId: conversation.pendingToolCallId, + optionsJson, + token, + expiresAt, + }, + }); + return token; + } + + async getByToken( + chatId: number, + token: string + ): Promise { + const nowIso = new Date().toISOString(); + const rows = await this.db + .select() + .from(pendingConversations) + .where( + and( + eq(pendingConversations.chatId, chatId), + eq(pendingConversations.token, token), + gt(pendingConversations.expiresAt, nowIso) + ) + ); + if (rows.length === 0) { + return undefined; + } + return parseRow(rows[0]); + } + + async consumeByToken( + chatId: number, + token: string + ): Promise { + const nowIso = new Date().toISOString(); + const deleted = await this.db + .delete(pendingConversations) + .where( + and( + eq(pendingConversations.chatId, chatId), + eq(pendingConversations.token, token), + gt(pendingConversations.expiresAt, nowIso) + ) + ) + .returning(); + if (deleted.length === 0) { + return undefined; + } + return parseRow(deleted[0]); + } + + async clear(chatId: number): Promise { + await this.db + .delete(pendingConversations) + .where(eq(pendingConversations.chatId, chatId)); + } + + async clearExpired(): Promise { + const now = new Date().toISOString(); + await this.db + .delete(pendingConversations) + .where(lte(pendingConversations.expiresAt, now)); + } +} + +export { PendingConversationService, TTL_MS }; +export type { PendingConversation, QuestionOption, QuestionOptionValue }; diff --git a/apps/operator/src/services/schedule.test.ts b/apps/operator/src/services/schedule.test.ts index 15e1af6..85583f7 100644 --- a/apps/operator/src/services/schedule.test.ts +++ b/apps/operator/src/services/schedule.test.ts @@ -351,4 +351,52 @@ describe("createScheduleSchema", () => { }); expect(result.success).toBe(false); }); + + it("accepts useBrowser=true on a monitor", () => { + const result = createScheduleSchema.safeParse({ + ...base, + sourceUrl: "https://example.com", + messagePrompt: "Summarize changes", + useBrowser: true, + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.useBrowser).toBe(true); + } + }); + + it("accepts useBrowser=false explicitly", () => { + const result = createScheduleSchema.safeParse({ + ...base, + sourceUrl: "https://example.com", + messagePrompt: "Summarize changes", + useBrowser: false, + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.useBrowser).toBe(false); + } + }); + + it("leaves useBrowser undefined when omitted", () => { + const result = createScheduleSchema.safeParse({ + ...base, + sourceUrl: "https://example.com", + messagePrompt: "Summarize changes", + }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.useBrowser).toBeUndefined(); + } + }); + + it("rejects useBrowser of non-boolean type", () => { + const result = createScheduleSchema.safeParse({ + ...base, + sourceUrl: "https://example.com", + messagePrompt: "Summarize changes", + useBrowser: "yes", + }); + expect(result.success).toBe(false); + }); }); diff --git a/apps/operator/src/services/schedule.ts b/apps/operator/src/services/schedule.ts index 397edf4..ba5dddc 100644 --- a/apps/operator/src/services/schedule.ts +++ b/apps/operator/src/services/schedule.ts @@ -39,6 +39,7 @@ const createScheduleSchema = z messagePrompt: z.string().max(500).optional(), sourceUrl: z.string().max(2048).optional(), keywords: z.array(z.string().trim().min(1).max(100)).max(10).optional(), + useBrowser: z.boolean().optional(), description: z.string().max(200), }) .refine( @@ -324,6 +325,7 @@ class ScheduleService { keywords: validated.keywords?.length ? JSON.stringify(validated.keywords) : undefined, + useBrowser: validated.useBrowser ?? false, description: validated.description, nextRunAt: nextRunAt.toISOString(), }) @@ -333,10 +335,14 @@ class ScheduleService { } async list(chatId: number) { + // Sort by createdAt then id so the position numbers we hand to the + // model are stable across calls — the user refers to schedules by their + // displayed position when asking to delete. return this.db .select() .from(schedules) - .where(and(eq(schedules.chatId, chatId), eq(schedules.active, true))); + .where(and(eq(schedules.chatId, chatId), eq(schedules.active, true))) + .orderBy(schedules.createdAt, schedules.id); } async countActive(chatId: number): Promise {