Skip to content

Commit 3d01160

Browse files
committed
feat(types): add conversation streaming config and builder
- introduce typed stream callbacks and message chunks for conversation search - add decorator-based StreamConfigBuilder and wire streaming params into search
1 parent e863b44 commit 3d01160

File tree

1 file changed

+163
-5
lines changed

1 file changed

+163
-5
lines changed

src/typesense/types/document.py

Lines changed: 163 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,162 @@ class NLLanguageParameters(typing.TypedDict):
586586
nl_query_debug: typing.NotRequired[bool]
587587

588588

589+
class MessageChunk(typing.TypedDict):
590+
"""
591+
A single chunk from a conversation stream response.
592+
593+
Attributes:
594+
conversation_id (str): ID of the conversation.
595+
message (str): Message content for this chunk.
596+
"""
597+
598+
conversation_id: str
599+
message: str
600+
601+
602+
class StreamConfig(typing.Generic[TDoc], typing.TypedDict, total=False):
603+
"""
604+
Configuration for streaming conversation search responses.
605+
606+
Attributes:
607+
on_chunk: Callback invoked for each streamed chunk (conversation_id, message).
608+
on_complete: Callback invoked when the stream completes with the full search response.
609+
on_error: Callback invoked if an error occurs during streaming.
610+
"""
611+
612+
on_chunk: typing.Callable[[MessageChunk], None]
613+
on_complete: "OnCompleteCallback[TDoc]"
614+
on_error: typing.Callable[[BaseException], None]
615+
616+
617+
OnChunkCallback = typing.Callable[[MessageChunk], None]
618+
619+
620+
class OnCompleteCallback(typing.Protocol[TDoc]):
621+
def __call__(self, response: "SearchResponse[TDoc]") -> None: ...
622+
623+
624+
OnErrorCallback = typing.Callable[[BaseException], None]
625+
626+
627+
class StreamConfigBuilder(typing.Generic[TDoc]):
628+
"""
629+
Builder for StreamConfig using decorators.
630+
631+
Example:
632+
>>> stream = StreamConfigBuilder()
633+
>>>
634+
>>> @stream.on_chunk
635+
... def handle_chunk(chunk: MessageChunk) -> None:
636+
... print(chunk["message"], end="", flush=True)
637+
>>>
638+
>>> @stream.on_complete
639+
... def handle_complete(response: dict) -> None:
640+
... print(f"Done! Found {response.get('found', 0)}")
641+
>>>
642+
>>> response = await client.collections["docs"].documents.search({
643+
... "q": "query",
644+
... "query_by": "content",
645+
... "conversation_stream": True,
646+
... "stream_config": stream,
647+
... })
648+
"""
649+
650+
def __init__(self) -> None:
651+
"""Initialize an empty StreamConfigBuilder."""
652+
self._on_chunk: OnChunkCallback | None = None
653+
self._on_complete: OnCompleteCallback[TDoc] | None = None
654+
self._on_error: OnErrorCallback | None = None
655+
656+
def on_chunk(self, func: OnChunkCallback) -> OnChunkCallback:
657+
"""
658+
Decorator to register an on_chunk callback.
659+
660+
Args:
661+
func: Callback invoked for each streamed message chunk.
662+
663+
Returns:
664+
The original function (unmodified).
665+
"""
666+
self._on_chunk = func
667+
return func
668+
669+
def on_complete(self, func: OnCompleteCallback[TDoc]) -> OnCompleteCallback[TDoc]:
670+
"""
671+
Decorator to register an on_complete callback.
672+
673+
Args:
674+
func: Callback invoked when streaming completes with the full response.
675+
676+
Returns:
677+
The original function (unmodified).
678+
"""
679+
self._on_complete = func
680+
return func
681+
682+
def on_error(self, func: OnErrorCallback) -> OnErrorCallback:
683+
"""
684+
Decorator to register an on_error callback.
685+
686+
Args:
687+
func: Callback invoked if an error occurs during streaming.
688+
689+
Returns:
690+
The original function (unmodified).
691+
"""
692+
self._on_error = func
693+
return func
694+
695+
def build(self) -> StreamConfig[TDoc]:
696+
"""
697+
Build the StreamConfig dictionary.
698+
699+
Returns:
700+
A StreamConfig with the registered callbacks.
701+
"""
702+
config: StreamConfig[TDoc] = {}
703+
if self._on_chunk is not None:
704+
config["on_chunk"] = self._on_chunk
705+
if self._on_complete is not None:
706+
config["on_complete"] = self._on_complete
707+
if self._on_error is not None:
708+
config["on_error"] = self._on_error
709+
return config
710+
711+
def get(
712+
self,
713+
key: typing.Literal["on_chunk", "on_complete", "on_error"],
714+
default: typing.Callable[..., None] | None = None,
715+
) -> typing.Callable[..., None] | None:
716+
"""
717+
Get a callback by key (for compatibility with dict-like access).
718+
719+
Args:
720+
key: The callback name ('on_chunk', 'on_complete', or 'on_error').
721+
default: Default value if the callback is not set.
722+
723+
Returns:
724+
The callback function or the default value.
725+
"""
726+
return self.build().get(key, default)
727+
728+
729+
class ConversationStreamParameters(typing.Generic[TDoc], typing.TypedDict):
730+
"""
731+
Parameters for conversational search streaming.
732+
733+
Attributes:
734+
conversation_stream (bool): When true, the search response is streamed (SSE).
735+
stream_config: Callbacks for stream events. Not sent to the API.
736+
Can be a StreamConfig dict or a StreamConfigBuilder instance.
737+
"""
738+
739+
conversation_stream: typing.NotRequired[bool]
740+
stream_config: typing.NotRequired[
741+
typing.Union[StreamConfig[TDoc], StreamConfigBuilder[TDoc]]
742+
]
743+
744+
589745
class SearchParameters(
590746
RequiredSearchParameters,
591747
QueryParameters,
@@ -598,11 +754,13 @@ class SearchParameters(
598754
TypoToleranceParameters,
599755
CachingParameters,
600756
NLLanguageParameters,
757+
ConversationStreamParameters[TDoc],
758+
typing.Generic[TDoc],
601759
):
602760
"""Parameters for searching documents."""
603761

604762

605-
class MultiSearchParameters(SearchParameters):
763+
class MultiSearchParameters(SearchParameters[TDoc], typing.Generic[TDoc]):
606764
"""
607765
Parameters for performing a [Federated/Multi-Search](https://typesense.org/docs/26.0/api/federated-multi-search.html#federated-multi-search).
608766
@@ -856,7 +1014,7 @@ class LLMResponse(typing.TypedDict):
8561014
model: str
8571015

8581016

859-
class ParsedNLQuery(typing.TypedDict):
1017+
class ParsedNLQuery(typing.Generic[TDoc], typing.TypedDict):
8601018
"""
8611019
Schema for a parsed natural language query.
8621020
@@ -868,8 +1026,8 @@ class ParsedNLQuery(typing.TypedDict):
8681026
"""
8691027

8701028
parse_time_ms: int
871-
generated_params: SearchParameters
872-
augmented_params: SearchParameters
1029+
generated_params: SearchParameters[TDoc]
1030+
augmented_params: SearchParameters[TDoc]
8731031
llm_response: typing.NotRequired[LLMResponse]
8741032

8751033

@@ -901,7 +1059,7 @@ class SearchResponse(typing.Generic[TDoc], typing.TypedDict):
9011059
hits: typing.List[Hit[TDoc]]
9021060
grouped_hits: typing.NotRequired[typing.List[GroupedHit[TDoc]]]
9031061
conversation: typing.NotRequired[Conversation]
904-
parsed_nl_query: typing.NotRequired[ParsedNLQuery]
1062+
parsed_nl_query: typing.NotRequired[ParsedNLQuery[TDoc]]
9051063

9061064

9071065
class DeleteSingleDocumentParameters(typing.TypedDict):

0 commit comments

Comments
 (0)