Skip to content

Commit c6c658a

Browse files
committed
Fix chat tools
1 parent 690be53 commit c6c658a

16 files changed

Lines changed: 448 additions & 76 deletions

File tree

apps/sim/executor/handlers/agent/agent-handler.ts

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -987,18 +987,19 @@ export class AgentBlockHandler implements BlockHandler {
987987
try {
988988
const executionData = JSON.parse(executionDataHeader)
989989

990-
// If execution data contains full content, persist to memory
991-
if (ctx && inputs && executionData.output?.content) {
992-
const assistantMessage: Message = {
993-
role: 'assistant',
994-
content: executionData.output.content,
995-
}
996-
// Fire and forget - don't await
997-
memoryService
998-
.persistMemoryMessage(ctx, inputs, assistantMessage, block.id)
999-
.catch((error) =>
1000-
logger.error('Failed to persist streaming response to memory:', error)
990+
// If execution data contains content or tool calls, persist to memory
991+
if (ctx && inputs && (executionData.output?.content || executionData.output?.toolCalls?.list?.length)) {
992+
const toolCalls = executionData.output?.toolCalls?.list
993+
const messages = this.buildMessagesForMemory(executionData.output.content, toolCalls)
994+
995+
// Fire and forget - don't await, persist all messages
996+
Promise.all(
997+
messages.map((message) =>
998+
memoryService.persistMemoryMessage(ctx, inputs, message, block.id)
1001999
)
1000+
).catch((error) =>
1001+
logger.error('Failed to persist streaming response to memory:', error)
1002+
)
10021003
}
10031004

10041005
return {
@@ -1117,32 +1118,92 @@ export class AgentBlockHandler implements BlockHandler {
11171118
return
11181119
}
11191120

1120-
// Extract content from regular response
1121+
// Extract content and tool calls from regular response
11211122
const blockOutput = result as any
11221123
const content = blockOutput?.content
1124+
const toolCalls = blockOutput?.toolCalls?.list
11231125

1124-
if (!content || typeof content !== 'string') {
1126+
// Build messages to persist
1127+
const messages = this.buildMessagesForMemory(content, toolCalls)
1128+
1129+
if (messages.length === 0) {
11251130
return
11261131
}
11271132

1128-
const assistantMessage: Message = {
1129-
role: 'assistant',
1130-
content,
1133+
// Persist all messages
1134+
for (const message of messages) {
1135+
await memoryService.persistMemoryMessage(ctx, inputs, message, blockId)
11311136
}
11321137

1133-
await memoryService.persistMemoryMessage(ctx, inputs, assistantMessage, blockId)
1134-
11351138
logger.debug('Persisted assistant response to memory', {
11361139
workflowId: ctx.workflowId,
11371140
memoryType: inputs.memoryType,
11381141
conversationId: inputs.conversationId,
1142+
messageCount: messages.length,
11391143
})
11401144
} catch (error) {
11411145
logger.error('Failed to persist response to memory:', error)
11421146
// Don't throw - memory persistence failure shouldn't break workflow execution
11431147
}
11441148
}
11451149

1150+
/**
1151+
* Builds messages for memory storage including tool calls and results
1152+
* Returns proper OpenAI-compatible message format:
1153+
* - Assistant message with tool_calls array (if tools were used)
1154+
* - Tool role messages with results (one per tool call)
1155+
* - Final assistant message with content (if present)
1156+
*/
1157+
private buildMessagesForMemory(content: string | undefined, toolCalls: any[] | undefined): Message[] {
1158+
const messages: Message[] = []
1159+
1160+
if (toolCalls?.length) {
1161+
// Generate stable IDs for each tool call (only if not provided by provider)
1162+
// Use index to ensure uniqueness even for same tool name in same millisecond
1163+
const toolCallsWithIds = toolCalls.map((tc: any, index: number) => ({
1164+
...tc,
1165+
_stableId: tc.id || `call_${tc.name}_${Date.now()}_${index}_${Math.random().toString(36).slice(2, 7)}`,
1166+
}))
1167+
1168+
// Add assistant message with tool_calls
1169+
const formattedToolCalls = toolCallsWithIds.map((tc: any) => ({
1170+
id: tc._stableId,
1171+
type: 'function' as const,
1172+
function: {
1173+
name: tc.name,
1174+
arguments: tc.rawArguments || JSON.stringify(tc.arguments || {}),
1175+
},
1176+
}))
1177+
1178+
messages.push({
1179+
role: 'assistant',
1180+
content: null,
1181+
tool_calls: formattedToolCalls,
1182+
})
1183+
1184+
// Add tool result messages using the same stable IDs
1185+
for (const tc of toolCallsWithIds) {
1186+
const resultContent = typeof tc.result === 'string' ? tc.result : JSON.stringify(tc.result || {})
1187+
messages.push({
1188+
role: 'tool',
1189+
content: resultContent,
1190+
tool_call_id: tc._stableId,
1191+
name: tc.name, // Store tool name for providers that need it (e.g., Google/Gemini)
1192+
})
1193+
}
1194+
}
1195+
1196+
// Add final assistant response if present
1197+
if (content && typeof content === 'string') {
1198+
messages.push({
1199+
role: 'assistant',
1200+
content,
1201+
})
1202+
}
1203+
1204+
return messages
1205+
}
1206+
11461207
private processProviderResponse(
11471208
response: any,
11481209
block: SerializedBlock,

apps/sim/executor/handlers/agent/memory.ts

Lines changed: 125 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -202,20 +202,99 @@ export class Memory {
202202
const systemMessages = messages.filter((msg) => msg.role === 'system')
203203
const conversationMessages = messages.filter((msg) => msg.role !== 'system')
204204

205-
const recentMessages = conversationMessages.slice(-limit)
205+
// Group messages into conversation turns
206+
// A turn = user message + any tool calls/results + assistant response
207+
const turns = this.groupMessagesIntoTurns(conversationMessages)
208+
209+
// Take the last N turns
210+
const recentTurns = turns.slice(-limit)
211+
212+
// Flatten back to messages
213+
const recentMessages = recentTurns.flat()
206214

207215
const firstSystemMessage = systemMessages.length > 0 ? [systemMessages[0]] : []
208216

209217
return [...firstSystemMessage, ...recentMessages]
210218
}
211219

220+
/**
221+
* Groups messages into conversation turns.
222+
* A turn starts with a user message and includes all subsequent messages
223+
* until the next user message (tool calls, tool results, assistant response).
224+
*/
225+
private groupMessagesIntoTurns(messages: Message[]): Message[][] {
226+
const turns: Message[][] = []
227+
let currentTurn: Message[] = []
228+
229+
for (const msg of messages) {
230+
if (msg.role === 'user') {
231+
// Start a new turn
232+
if (currentTurn.length > 0) {
233+
turns.push(currentTurn)
234+
}
235+
currentTurn = [msg]
236+
} else {
237+
// Add to current turn (assistant, tool, etc.)
238+
currentTurn.push(msg)
239+
}
240+
}
241+
242+
// Don't forget the last turn
243+
if (currentTurn.length > 0) {
244+
turns.push(currentTurn)
245+
}
246+
247+
return turns
248+
}
249+
250+
/**
251+
* Remove orphaned tool messages that don't have a corresponding tool_calls message
252+
* This prevents errors like "tool_result without corresponding tool_use"
253+
*/
254+
private removeOrphanedToolMessages(messages: Message[]): Message[] {
255+
const result: Message[] = []
256+
const seenToolCallIds = new Set<string>()
257+
258+
// First pass: collect all tool_call IDs from assistant messages with tool_calls
259+
for (const msg of messages) {
260+
if (msg.role === 'assistant' && msg.tool_calls && Array.isArray(msg.tool_calls)) {
261+
for (const tc of msg.tool_calls) {
262+
if (tc.id) {
263+
seenToolCallIds.add(tc.id)
264+
}
265+
}
266+
}
267+
}
268+
269+
// Second pass: only include tool messages that have a matching tool_calls message
270+
for (const msg of messages) {
271+
if (msg.role === 'tool') {
272+
const toolCallId = (msg as any).tool_call_id
273+
if (toolCallId && seenToolCallIds.has(toolCallId)) {
274+
result.push(msg)
275+
} else {
276+
logger.debug('Removing orphaned tool message', { toolCallId })
277+
}
278+
} else {
279+
result.push(msg)
280+
}
281+
}
282+
283+
return result
284+
}
285+
212286
/**
213287
* Apply token-based sliding window to limit conversation by token count
214288
*
215289
* System message handling:
216290
* - For consistency with message-based sliding window, the first system message is preserved
217291
* - System messages are excluded from the token count
218292
* - This ensures system prompts are always available while limiting conversation history
293+
*
294+
* Turn handling:
295+
* - Messages are grouped into turns (user + tool calls/results + assistant response)
296+
* - Complete turns are added to stay within token limit
297+
* - This prevents breaking tool call/result pairs
219298
*/
220299
private applySlidingWindowByTokens(
221300
messages: Message[],
@@ -233,43 +312,52 @@ export class Memory {
233312
const systemMessages = messages.filter((msg) => msg.role === 'system')
234313
const conversationMessages = messages.filter((msg) => msg.role !== 'system')
235314

315+
// Group into turns to keep tool call/result pairs together
316+
const turns = this.groupMessagesIntoTurns(conversationMessages)
317+
236318
const result: Message[] = []
237319
let currentTokenCount = 0
238320

239-
// Add conversation messages from most recent backwards
240-
for (let i = conversationMessages.length - 1; i >= 0; i--) {
241-
const message = conversationMessages[i]
242-
const messageTokens = getAccurateTokenCount(message.content, model)
321+
// Add turns from most recent backwards
322+
for (let i = turns.length - 1; i >= 0; i--) {
323+
const turn = turns[i]
324+
const turnTokens = turn.reduce(
325+
(sum, msg) => sum + getAccurateTokenCount(msg.content || '', model),
326+
0
327+
)
243328

244-
if (currentTokenCount + messageTokens <= tokenLimit) {
245-
result.unshift(message)
246-
currentTokenCount += messageTokens
329+
if (currentTokenCount + turnTokens <= tokenLimit) {
330+
result.unshift(...turn)
331+
currentTokenCount += turnTokens
247332
} else if (result.length === 0) {
248-
logger.warn('Single message exceeds token limit, including anyway', {
249-
messageTokens,
333+
logger.warn('Single turn exceeds token limit, including anyway', {
334+
turnTokens,
250335
tokenLimit,
251-
messageRole: message.role,
336+
turnMessages: turn.length,
252337
})
253-
result.unshift(message)
254-
currentTokenCount += messageTokens
338+
result.unshift(...turn)
339+
currentTokenCount += turnTokens
255340
break
256341
} else {
257342
// Token limit reached, stop processing
258343
break
259344
}
260345
}
261346

347+
// No need to remove orphaned messages - turns are already complete
348+
const cleanedResult = result
349+
262350
logger.debug('Applied token-based sliding window', {
263351
totalMessages: messages.length,
264352
conversationMessages: conversationMessages.length,
265-
includedMessages: result.length,
353+
includedMessages: cleanedResult.length,
266354
totalTokens: currentTokenCount,
267355
tokenLimit,
268356
})
269357

270358
// Preserve first system message and prepend to results (consistent with message-based window)
271359
const firstSystemMessage = systemMessages.length > 0 ? [systemMessages[0]] : []
272-
return [...firstSystemMessage, ...result]
360+
return [...firstSystemMessage, ...cleanedResult]
273361
}
274362

275363
/**
@@ -324,7 +412,7 @@ export class Memory {
324412
// Count tokens used by system messages first
325413
let systemTokenCount = 0
326414
for (const msg of systemMessages) {
327-
systemTokenCount += getAccurateTokenCount(msg.content, model)
415+
systemTokenCount += getAccurateTokenCount(msg.content || '', model)
328416
}
329417

330418
// Calculate remaining tokens available for conversation messages
@@ -339,30 +427,36 @@ export class Memory {
339427
return systemMessages
340428
}
341429

430+
// Group into turns to keep tool call/result pairs together
431+
const turns = this.groupMessagesIntoTurns(conversationMessages)
432+
342433
const result: Message[] = []
343434
let currentTokenCount = 0
344435

345-
for (let i = conversationMessages.length - 1; i >= 0; i--) {
346-
const message = conversationMessages[i]
347-
const messageTokens = getAccurateTokenCount(message.content, model)
436+
for (let i = turns.length - 1; i >= 0; i--) {
437+
const turn = turns[i]
438+
const turnTokens = turn.reduce(
439+
(sum, msg) => sum + getAccurateTokenCount(msg.content || '', model),
440+
0
441+
)
348442

349-
if (currentTokenCount + messageTokens <= remainingTokens) {
350-
result.unshift(message)
351-
currentTokenCount += messageTokens
443+
if (currentTokenCount + turnTokens <= remainingTokens) {
444+
result.unshift(...turn)
445+
currentTokenCount += turnTokens
352446
} else if (result.length === 0) {
353-
logger.warn('Single message exceeds remaining context window, including anyway', {
354-
messageTokens,
447+
logger.warn('Single turn exceeds remaining context window, including anyway', {
448+
turnTokens,
355449
remainingTokens,
356450
systemTokenCount,
357-
messageRole: message.role,
451+
turnMessages: turn.length,
358452
})
359-
result.unshift(message)
360-
currentTokenCount += messageTokens
453+
result.unshift(...turn)
454+
currentTokenCount += turnTokens
361455
break
362456
} else {
363457
logger.info('Auto-trimmed conversation history to fit context window', {
364-
originalMessages: conversationMessages.length,
365-
trimmedMessages: result.length,
458+
originalTurns: turns.length,
459+
trimmedTurns: turns.length - i - 1,
366460
conversationTokens: currentTokenCount,
367461
systemTokens: systemTokenCount,
368462
totalTokens: currentTokenCount + systemTokenCount,
@@ -372,6 +466,7 @@ export class Memory {
372466
}
373467
}
374468

469+
// No need to remove orphaned messages - turns are already complete
375470
return [...systemMessages, ...result]
376471
}
377472

@@ -638,7 +733,7 @@ export class Memory {
638733
/**
639734
* Validate inputs to prevent malicious data or performance issues
640735
*/
641-
private validateInputs(conversationId?: string, content?: string): void {
736+
private validateInputs(conversationId?: string, content?: string | null): void {
642737
if (conversationId) {
643738
if (conversationId.length > 255) {
644739
throw new Error('Conversation ID too long (max 255 characters)')

0 commit comments

Comments
 (0)