Skip to content

Commit 9093292

Browse files
dancerbensabic
andauthored
feat(chat): add streaming options to thread.post() (#388)
* feat(chat): add streaming options to thread.post() * test(chat): add comprehensive tests for PostStreamOptions * feat(chat): add StreamMessage PostableObject for streaming with options * refactor(chat): remove PostStreamOptions second param, keep only StreamMessage PostableObject * refactor(chat): rename StreamMessage to StreamingPlan, fix post() return type Rename per Malte's feedback - StreamingPlan better describes what the options control (task grouping, stop blocks for streamed plans). Fix type safety issue where post<T extends PostableObject>() returned SentMessage at runtime instead of T. Now awaits handleStream() for side effects and returns the original StreamingPlan instance. * test(chat): cover updateIntervalMs-only and fallback paths for StreamingPlan Remove a duplicated test block, drop a duplicate JSDoc line on Thread.post, and add tests for posting a StreamingPlan with only updateIntervalMs and for routing StreamingPlan through the fallback post+edit path when the adapter has no native streaming. --------- Co-authored-by: Ben Sabic <bensabic@users.noreply.github.com>
1 parent 051245c commit 9093292

6 files changed

Lines changed: 278 additions & 3 deletions

File tree

.changeset/major-pianos-battle.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chat": minor
3+
---
4+
5+
add streaming options to thread.post() with platform-specific namespacing

packages/chat/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ export type {
4545
export { isPostableObject } from "./postable-object";
4646
export { reviver } from "./reviver";
4747
export { StreamingMarkdownRenderer } from "./streaming-markdown";
48+
export {
49+
StreamingPlan,
50+
type StreamingPlanData,
51+
type StreamingPlanOptions,
52+
} from "./streaming-plan";
4853
export { type SerializedThread, ThreadImpl } from "./thread";
4954

5055
// Card builders - import then re-export to ensure values are properly exported
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import {
2+
POSTABLE_OBJECT,
3+
type PostableObject,
4+
type PostableObjectContext,
5+
} from "./postable-object";
6+
import type { Adapter, StreamChunk, StreamEvent } from "./types";
7+
8+
export interface StreamingPlanOptions {
9+
/**
10+
* Block Kit elements to attach when the stream stops (Slack only).
11+
* Useful for adding feedback buttons after a streamed response.
12+
*/
13+
endWith?: unknown[];
14+
/**
15+
* Controls how task_update chunks are displayed (Slack only).
16+
* - `"plan"` - all tasks grouped into a single plan block
17+
* - `"timeline"` - individual task cards shown inline with text (default)
18+
*/
19+
groupTasks?: "plan" | "timeline";
20+
/**
21+
* Minimum interval between updates in ms (default: 500).
22+
* Used for fallback mode (post+edit on adapters without native streaming).
23+
*/
24+
updateIntervalMs?: number;
25+
}
26+
27+
export interface StreamingPlanData {
28+
options: StreamingPlanOptions;
29+
stream: AsyncIterable<string | StreamChunk | StreamEvent>;
30+
}
31+
32+
/**
33+
* A StreamingPlan wraps an async iterable with platform-specific streaming options.
34+
*
35+
* Use this when you need to pass options like task grouping or stop blocks
36+
* to the streaming API. For simple streaming without options, pass the
37+
* async iterable directly to `thread.post()`.
38+
*
39+
* @example
40+
* ```typescript
41+
* const stream = new StreamingPlan(result.fullStream, {
42+
* groupTasks: "plan",
43+
* endWith: [feedbackBlock],
44+
* });
45+
* await thread.post(stream);
46+
* ```
47+
*/
48+
export class StreamingPlan implements PostableObject<StreamingPlanData> {
49+
readonly $$typeof = POSTABLE_OBJECT;
50+
readonly kind = "stream";
51+
52+
private readonly _stream: AsyncIterable<string | StreamChunk | StreamEvent>;
53+
private readonly _options: StreamingPlanOptions;
54+
55+
constructor(
56+
stream: AsyncIterable<string | StreamChunk | StreamEvent>,
57+
options: StreamingPlanOptions = {}
58+
) {
59+
this._stream = stream;
60+
this._options = options;
61+
}
62+
63+
get stream(): AsyncIterable<string | StreamChunk | StreamEvent> {
64+
return this._stream;
65+
}
66+
67+
get options(): StreamingPlanOptions {
68+
return this._options;
69+
}
70+
71+
getFallbackText(): string {
72+
return "";
73+
}
74+
75+
getPostData(): StreamingPlanData {
76+
return {
77+
stream: this._stream,
78+
options: this._options,
79+
};
80+
}
81+
82+
isSupported(_adapter: Adapter): boolean {
83+
return true;
84+
}
85+
86+
onPosted(_context: PostableObjectContext): void {
87+
// Streams are one-shot, no lifecycle binding needed
88+
}
89+
}

packages/chat/src/thread.test.ts

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
mockLogger,
88
} from "./mock-adapter";
99
import { Plan } from "./plan";
10+
import { StreamingPlan } from "./streaming-plan";
1011
import { ThreadImpl } from "./thread";
1112
import type { Adapter, Message, ScheduledMessage, StreamChunk } from "./types";
1213
import { NotImplementedError } from "./types";
@@ -638,6 +639,151 @@ describe("ThreadImpl", () => {
638639
})
639640
);
640641
});
642+
643+
it("should pass StreamingPlan PostableObject options to adapter.stream", async () => {
644+
const mockStream = vi.fn().mockResolvedValue({
645+
id: "msg-stream",
646+
threadId: "t1",
647+
raw: "Hello",
648+
});
649+
mockAdapter.stream = mockStream;
650+
651+
const textStream = createTextStream(["Hello"]);
652+
const streamMsg = new StreamingPlan(textStream, {
653+
groupTasks: "plan",
654+
endWith: [{ type: "actions" }],
655+
updateIntervalMs: 1000,
656+
});
657+
await thread.post(streamMsg);
658+
659+
expect(mockStream).toHaveBeenCalledWith(
660+
"slack:C123:1234.5678",
661+
expect.any(Object),
662+
expect.objectContaining({
663+
taskDisplayMode: "plan",
664+
stopBlocks: [{ type: "actions" }],
665+
updateIntervalMs: 1000,
666+
})
667+
);
668+
});
669+
670+
it("should pass StreamingPlan with only groupTasks", async () => {
671+
const mockStream = vi.fn().mockResolvedValue({
672+
id: "msg-stream",
673+
threadId: "t1",
674+
raw: "Hello",
675+
});
676+
mockAdapter.stream = mockStream;
677+
678+
const textStream = createTextStream(["Hello"]);
679+
await thread.post(
680+
new StreamingPlan(textStream, { groupTasks: "timeline" })
681+
);
682+
683+
expect(mockStream).toHaveBeenCalledWith(
684+
"slack:C123:1234.5678",
685+
expect.any(Object),
686+
expect.objectContaining({
687+
taskDisplayMode: "timeline",
688+
})
689+
);
690+
const options = mockStream.mock.calls[0][2];
691+
expect(options.stopBlocks).toBeUndefined();
692+
});
693+
694+
it("should pass StreamingPlan with only endWith", async () => {
695+
const mockStream = vi.fn().mockResolvedValue({
696+
id: "msg-stream",
697+
threadId: "t1",
698+
raw: "Hello",
699+
});
700+
mockAdapter.stream = mockStream;
701+
702+
const textStream = createTextStream(["Hello"]);
703+
await thread.post(
704+
new StreamingPlan(textStream, { endWith: [{ type: "actions" }] })
705+
);
706+
707+
expect(mockStream).toHaveBeenCalledWith(
708+
"slack:C123:1234.5678",
709+
expect.any(Object),
710+
expect.objectContaining({
711+
stopBlocks: [{ type: "actions" }],
712+
})
713+
);
714+
const options = mockStream.mock.calls[0][2];
715+
expect(options.taskDisplayMode).toBeUndefined();
716+
});
717+
718+
it("should pass StreamingPlan with only updateIntervalMs", async () => {
719+
const mockStream = vi.fn().mockResolvedValue({
720+
id: "msg-stream",
721+
threadId: "t1",
722+
raw: "Hello",
723+
});
724+
mockAdapter.stream = mockStream;
725+
726+
const textStream = createTextStream(["Hello"]);
727+
await thread.post(
728+
new StreamingPlan(textStream, { updateIntervalMs: 2000 })
729+
);
730+
731+
expect(mockStream).toHaveBeenCalledWith(
732+
"slack:C123:1234.5678",
733+
expect.any(Object),
734+
expect.objectContaining({
735+
updateIntervalMs: 2000,
736+
})
737+
);
738+
const options = mockStream.mock.calls[0][2];
739+
expect(options.taskDisplayMode).toBeUndefined();
740+
expect(options.stopBlocks).toBeUndefined();
741+
});
742+
743+
it("should route StreamingPlan through fallback when adapter has no native streaming", async () => {
744+
mockAdapter.stream = undefined;
745+
746+
const textStream = createTextStream(["Hello", " ", "World"]);
747+
await thread.post(
748+
new StreamingPlan(textStream, {
749+
groupTasks: "plan",
750+
endWith: [{ type: "actions" }],
751+
updateIntervalMs: 2000,
752+
})
753+
);
754+
755+
// Should post initial placeholder and edit with final content
756+
expect(mockAdapter.postMessage).toHaveBeenCalledWith(
757+
"slack:C123:1234.5678",
758+
"..."
759+
);
760+
expect(mockAdapter.editMessage).toHaveBeenLastCalledWith(
761+
"slack:C123:1234.5678",
762+
"msg-1",
763+
{ markdown: "Hello World" }
764+
);
765+
});
766+
767+
it("should still work without options (backward compat)", async () => {
768+
const mockStream = vi.fn().mockResolvedValue({
769+
id: "msg-stream",
770+
threadId: "t1",
771+
raw: "Hello",
772+
});
773+
mockAdapter.stream = mockStream;
774+
775+
const textStream = createTextStream(["Hello"]);
776+
await thread.post(textStream);
777+
778+
expect(mockStream).toHaveBeenCalledWith(
779+
"slack:C123:1234.5678",
780+
expect.any(Object),
781+
expect.any(Object)
782+
);
783+
const options = mockStream.mock.calls[0][2];
784+
expect(options.taskDisplayMode).toBeUndefined();
785+
expect(options.stopBlocks).toBeUndefined();
786+
});
641787
});
642788

643789
describe("fallback streaming error logging", () => {

packages/chat/src/thread.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,28 @@ export class ThreadImpl<TState = Record<string, unknown>>
415415
message: string | PostableMessage | ChatElement
416416
): Promise<SentMessage | PostableObject> {
417417
if (isPostableObject(message)) {
418+
// StreamingPlan PostableObject - route to streaming with options
419+
if (message.kind === "stream") {
420+
const data = message.getPostData() as {
421+
stream: AsyncIterable<string | StreamChunk | StreamEvent>;
422+
options: {
423+
groupTasks?: "plan" | "timeline";
424+
endWith?: unknown[];
425+
updateIntervalMs?: number;
426+
};
427+
};
428+
const streamOptions: StreamOptions = {
429+
...(data.options.updateIntervalMs
430+
? { updateIntervalMs: data.options.updateIntervalMs }
431+
: {}),
432+
...(data.options.groupTasks
433+
? { taskDisplayMode: data.options.groupTasks }
434+
: {}),
435+
...(data.options.endWith ? { stopBlocks: data.options.endWith } : {}),
436+
};
437+
await this.handleStream(data.stream, streamOptions);
438+
return message;
439+
}
418440
await this.handlePostableObject(message);
419441
return message;
420442
}
@@ -543,12 +565,13 @@ export class ThreadImpl<TState = Record<string, unknown>>
543565
* then uses adapter's native streaming if available, otherwise falls back to post+edit.
544566
*/
545567
private async handleStream(
546-
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>
568+
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>,
569+
callerOptions?: StreamOptions
547570
): Promise<SentMessage> {
548571
// Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects
549572
const textStream = fromFullStream(rawStream);
550-
// Build streaming options from current message context
551-
const options: StreamOptions = {};
573+
// Build streaming options from current message context + caller options
574+
const options: StreamOptions = { ...callerOptions };
552575
if (this._currentMessage) {
553576
options.recipientUserId = this._currentMessage.author.userId;
554577
// Extract teamId from raw Slack payload

packages/chat/src/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,13 @@ export interface Thread<TState = Record<string, unknown>, TRawMessage = unknown>
11111111
* const result = await agent.stream({ prompt: message.text });
11121112
* await thread.post(result.textStream);
11131113
*
1114+
* // Stream with options via StreamingPlan PostableObject
1115+
* const stream = new StreamingPlan(result.fullStream, {
1116+
* groupTasks: "plan",
1117+
* endWith: [feedbackBlocks],
1118+
* });
1119+
* await thread.post(stream);
1120+
*
11141121
* // Plan with live updates
11151122
* const plan = new Plan({ initialMessage: "Working..." });
11161123
* await thread.post(plan);

0 commit comments

Comments
 (0)