Skip to content

Commit 9fc38b5

Browse files
committed
feat(streaming): add sse stream parsing and chunk combiner
- parse conversation stream sse lines into message chunks or search responses - combine streamed chunks into a final search response for async calls
1 parent 3d01160 commit 9fc38b5

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed

src/typesense/stream_handlers.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
"""
2+
SSE stream parsing and chunk combining for conversation search streaming.
3+
4+
This module contains pure logic for parsing server-sent event lines from
5+
conversation_stream responses and combining message chunks into a final
6+
search response. Used by async API calls.
7+
"""
8+
9+
import json
10+
import sys
11+
12+
if sys.version_info >= (3, 11):
13+
import typing
14+
else:
15+
import typing_extensions as typing
16+
17+
from typesense.types.document import MessageChunk
18+
19+
JSONPrimitive: typing.TypeAlias = typing.Union[str, int, float, bool, None]
20+
JSONValue: typing.TypeAlias = typing.Union[
21+
JSONPrimitive, typing.Dict[str, "JSONValue"], typing.List["JSONValue"]
22+
]
23+
JSONDict: typing.TypeAlias = typing.Dict[str, JSONValue]
24+
25+
_SEARCH_RESPONSE_KEYS = frozenset(
26+
{"results", "found", "hits", "page", "search_time_ms"}
27+
)
28+
29+
StreamChunk: typing.TypeAlias = typing.Union[MessageChunk, JSONDict]
30+
31+
32+
def parse_sse_line(line: str) -> typing.Optional[StreamChunk]:
33+
"""
34+
Parse a single SSE line into a MessageChunk, search response dict, or None.
35+
36+
Handles:
37+
- Empty lines and "data: [DONE]" -> None
38+
- "data: {...}" -> parse JSON, return MessageChunk or search response
39+
- Raw JSON line starting with "{" -> same
40+
- Plain text -> return chunk with conversation_id="unknown", message=line
41+
42+
Returns:
43+
MessageChunk for conversation chunks, dict for search responses, or None to skip.
44+
"""
45+
line = line.strip()
46+
if not line or line == "data: [DONE]":
47+
return None
48+
49+
# SSE format: "data: {...}"
50+
if line.startswith("data:"):
51+
content = line[5:].strip()
52+
return _parse_data_content(content)
53+
54+
# Raw JSON
55+
if line.startswith("{"):
56+
return _parse_json_content(line)
57+
58+
return _chunk_from_text(line)
59+
60+
61+
def _parse_data_content(content: str) -> typing.Optional[StreamChunk]:
62+
"""Parse the content after 'data:' into a MessageChunk, search response, or None."""
63+
if not content:
64+
return None
65+
if content.startswith("{"):
66+
return _parse_json_content(content)
67+
return _chunk_from_text(content)
68+
69+
70+
def _parse_json_content(raw: str) -> StreamChunk:
71+
"""Parse a JSON string into a MessageChunk or search response dict."""
72+
try:
73+
data = json.loads(raw)
74+
except json.JSONDecodeError:
75+
return _chunk_from_text(raw)
76+
if not isinstance(data, dict):
77+
return _chunk_from_text(json.dumps(data))
78+
79+
parsed = typing.cast(JSONDict, data)
80+
conversation_id = parsed.get("conversation_id")
81+
message = parsed.get("message")
82+
nested_conversation = parsed.get("conversation")
83+
84+
if conversation_id is None or message is None:
85+
if isinstance(nested_conversation, dict):
86+
nested_conversation_id = nested_conversation.get("conversation_id")
87+
nested_message = nested_conversation.get("message")
88+
if conversation_id is None and nested_conversation_id is not None:
89+
conversation_id = nested_conversation_id
90+
if message is None and nested_message is not None:
91+
message = nested_message
92+
93+
if conversation_id is None:
94+
parsed["conversation_id"] = "unknown"
95+
elif not isinstance(conversation_id, str):
96+
parsed["conversation_id"] = str(conversation_id)
97+
else:
98+
parsed["conversation_id"] = conversation_id
99+
100+
if message is None:
101+
parsed["message"] = ""
102+
elif not isinstance(message, str):
103+
parsed["message"] = str(message)
104+
else:
105+
parsed["message"] = message
106+
107+
return parsed
108+
109+
110+
def _is_search_response_dict(data: typing.Mapping[str, JSONValue]) -> bool:
111+
"""Check if a dict is a search response (has found, hits, results, etc.)."""
112+
return bool(set(data.keys()) & _SEARCH_RESPONSE_KEYS)
113+
114+
115+
def is_message_chunk(chunk: JSONValue) -> bool:
116+
"""Return True if chunk is a conversation message chunk (has conversation_id and message)."""
117+
if not isinstance(chunk, dict):
118+
return False
119+
if "message" not in chunk or "conversation_id" not in chunk:
120+
return False
121+
return not _is_search_response_dict(chunk)
122+
123+
124+
def is_complete_search_response(chunk: JSONValue) -> bool:
125+
"""Return True if chunk looks like a full search response (has hits, found, etc.)."""
126+
if not isinstance(chunk, dict) or not chunk:
127+
return False
128+
keys = set(chunk.keys())
129+
return bool(keys & _SEARCH_RESPONSE_KEYS)
130+
131+
132+
def combine_stream_chunks(
133+
chunks: typing.Sequence[StreamChunk],
134+
) -> JSONDict:
135+
"""
136+
Combine streamed chunks into a single search response.
137+
138+
- If no chunks, return empty dict.
139+
- If one chunk, return it.
140+
- If we have message chunks (conversation_id + message), find the metadata
141+
chunk (complete search response) and return it; otherwise return last chunk
142+
if it is complete.
143+
- For regular search streams, return the last chunk if it is a complete response.
144+
"""
145+
if not chunks:
146+
return {}
147+
if len(chunks) == 1:
148+
return typing.cast(JSONDict, chunks[0])
149+
150+
message_chunks = [c for c in chunks if is_message_chunk(c)]
151+
if message_chunks:
152+
for chunk in chunks:
153+
if is_complete_search_response(chunk):
154+
return typing.cast(JSONDict, chunk)
155+
return typing.cast(JSONDict, chunks[-1])
156+
157+
last = chunks[-1]
158+
if is_complete_search_response(last):
159+
return typing.cast(JSONDict, last)
160+
return typing.cast(JSONDict, last)
161+
162+
163+
def _chunk_from_text(text: str) -> MessageChunk:
164+
return {"conversation_id": "unknown", "message": text}

0 commit comments

Comments
 (0)