Skip to content

Commit 11daaa0

Browse files
committed
WIP-stream
1 parent 73e4e09 commit 11daaa0

11 files changed

Lines changed: 1626 additions & 8 deletions
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
"""
2+
Example: Streaming Conversation Search
3+
4+
This script demonstrates how to use streaming conversation search with Typesense.
5+
It requires:
6+
1. A running Typesense server (default: localhost:8108)
7+
2. An OpenAI API key set in OPEN_AI_KEY (in .env file or environment)
8+
9+
Usage:
10+
python examples/streaming_conversation_search.py
11+
"""
12+
13+
import os
14+
import sys
15+
16+
from dotenv import load_dotenv
17+
18+
load_dotenv()
19+
20+
curr_dir = os.path.dirname(os.path.realpath(__file__))
21+
sys.path.insert(1, os.path.abspath(os.path.join(curr_dir, os.pardir, "src")))
22+
23+
import typesense
24+
from typesense.types.document import MessageChunk, StreamConfig
25+
26+
# Configuration
27+
API_KEY = os.environ.get("TYPESENSE_API_KEY", "xyz")
28+
HOST = os.environ.get("TYPESENSE_HOST", "localhost")
29+
PORT = os.environ.get("TYPESENSE_PORT", "8108")
30+
OPENAI_KEY = os.environ.get("OPEN_AI_KEY", "")
31+
32+
if not OPENAI_KEY:
33+
print("Error: OPEN_AI_KEY environment variable is required")
34+
print("Set it in a .env file or pass it directly")
35+
sys.exit(1)
36+
37+
# Create client (use longer timeout for streaming)
38+
client = typesense.Client({
39+
"api_key": API_KEY,
40+
"nodes": [{"host": HOST, "port": PORT, "protocol": "http"}],
41+
"connection_timeout_seconds": 120,
42+
})
43+
44+
COLLECTION_NAME = "streaming-example-essays"
45+
CONVERSATION_STORE_NAME = "streaming-example-conversation-store"
46+
MODEL_NAME = "streaming-example-model"
47+
48+
# Collection schema with embedding
49+
collection_schema = {
50+
"name": COLLECTION_NAME,
51+
"fields": [
52+
{"name": "title", "type": "string"},
53+
{"name": "content", "type": "string"},
54+
{
55+
"name": "embedding",
56+
"type": "float[]",
57+
"embed": {
58+
"from": ["content"],
59+
"model_config": {
60+
"model_name": "openai/text-embedding-3-small",
61+
"api_key": OPENAI_KEY,
62+
},
63+
},
64+
},
65+
],
66+
}
67+
68+
# Conversation history collection schema (required for conversation models)
69+
conversation_store_schema = {
70+
"name": CONVERSATION_STORE_NAME,
71+
"fields": [
72+
{"name": "conversation_id", "type": "string"},
73+
{"name": "model_id", "type": "string"},
74+
{"name": "timestamp", "type": "int32"},
75+
{"name": "role", "type": "string", "index": False},
76+
{"name": "message", "type": "string", "index": False},
77+
],
78+
}
79+
80+
# Conversation model schema
81+
conversation_model_schema = {
82+
"id": MODEL_NAME,
83+
"model_name": "openai/gpt-4o-mini",
84+
"api_key": OPENAI_KEY,
85+
"history_collection": CONVERSATION_STORE_NAME,
86+
"system_prompt": "You are a helpful assistant. Answer questions based on the provided context.",
87+
"max_bytes": 16384,
88+
}
89+
90+
# Sample documents
91+
documents = [
92+
{
93+
"title": "The Maker's Schedule",
94+
"content": """
95+
One reason programmers dislike meetings so much is that they're on a different
96+
type of schedule from other people. Meetings cost them more. There are two types
97+
of schedule, which I'll call the manager's schedule and the maker's schedule.
98+
The manager's schedule is for bosses. It's embodied in the traditional appointment
99+
book, with each day cut into one hour intervals. You can block off several hours
100+
for a single task if you need to, but by default you change what you're doing
101+
every hour. When you use time that way, it's merely a practical problem to meet
102+
with someone. The maker's schedule is different. Writers, programmers, and artists
103+
typically work in longer time units.
104+
""",
105+
},
106+
{
107+
"title": "How to Do What You Love",
108+
"content": """
109+
To do something well you have to like it. That idea is not exactly novel.
110+
We've got it down to four words: "Do what you love." But it's not enough just
111+
to tell people that. Doing what you love is complicated. The very idea is
112+
foreign to most people. It's not as easy as it sounds. You have to find work
113+
that you enjoy, and that's harder than most people think.
114+
""",
115+
},
116+
]
117+
118+
119+
def setup():
120+
"""Set up collection and conversation model."""
121+
# Clean up existing resources
122+
try:
123+
client.conversations_models[MODEL_NAME].delete()
124+
except Exception:
125+
pass
126+
127+
try:
128+
client.collections[COLLECTION_NAME].delete()
129+
except Exception:
130+
pass
131+
132+
try:
133+
client.collections[CONVERSATION_STORE_NAME].delete()
134+
except Exception:
135+
pass
136+
137+
# Create conversation history collection first
138+
print(f"Creating conversation store '{CONVERSATION_STORE_NAME}'...")
139+
client.collections.create(conversation_store_schema)
140+
141+
# Create main collection
142+
print(f"Creating collection '{COLLECTION_NAME}'...")
143+
client.collections.create(collection_schema)
144+
145+
# Index documents
146+
print("Indexing documents...")
147+
for doc in documents:
148+
client.collections[COLLECTION_NAME].documents.create(doc)
149+
150+
# Create conversation model
151+
print(f"Creating conversation model '{MODEL_NAME}'...")
152+
client.conversations_models.create(conversation_model_schema)
153+
154+
print("Setup complete!\n")
155+
156+
157+
def cleanup():
158+
"""Clean up resources."""
159+
try:
160+
client.conversations_models[MODEL_NAME].delete()
161+
except Exception:
162+
pass
163+
164+
try:
165+
client.collections[COLLECTION_NAME].delete()
166+
except Exception:
167+
pass
168+
169+
try:
170+
client.collections[CONVERSATION_STORE_NAME].delete()
171+
except Exception:
172+
pass
173+
174+
175+
def streaming_search_example():
176+
"""Demonstrate streaming conversation search."""
177+
print("=" * 60)
178+
print("Streaming Conversation Search Example")
179+
print("=" * 60)
180+
181+
# Track streamed chunks
182+
message_parts: list[str] = []
183+
184+
def on_chunk(chunk: MessageChunk) -> None:
185+
"""Called for each streamed message chunk."""
186+
message_parts.append(chunk["message"])
187+
# Print chunk immediately (no newline for streaming effect)
188+
print(chunk["message"], end="", flush=True)
189+
190+
def on_complete(response: dict) -> None:
191+
"""Called when streaming is complete."""
192+
print("\n") # Newline after streaming
193+
print("-" * 40)
194+
print(f"Streaming complete! Found {response.get('found', 0)} documents")
195+
196+
def on_error(error: BaseException) -> None:
197+
"""Called if an error occurs."""
198+
print(f"\nError: {error}")
199+
200+
stream_config: StreamConfig = {
201+
"on_chunk": on_chunk,
202+
"on_complete": on_complete,
203+
"on_error": on_error,
204+
}
205+
206+
query = "What is the maker's schedule and why do programmers prefer it?"
207+
print(f"\nQuery: {query}\n")
208+
print("Streaming response:")
209+
print("-" * 40)
210+
211+
# Perform streaming search
212+
response = client.collections[COLLECTION_NAME].documents.search({
213+
"q": query,
214+
"query_by": "embedding",
215+
"conversation": True,
216+
"conversation_stream": True,
217+
"conversation_model_id": MODEL_NAME,
218+
"stream_config": stream_config,
219+
})
220+
221+
# The full message from chunks
222+
full_message = "".join(message_parts)
223+
print(f"\nFull streamed message length: {len(full_message)} characters")
224+
225+
return response
226+
227+
228+
def main():
229+
try:
230+
setup()
231+
streaming_search_example()
232+
finally:
233+
print("\nCleaning up...")
234+
cleanup()
235+
print("Done!")
236+
237+
238+
if __name__ == "__main__":
239+
main()

examples/streaming_test_simple.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""
2+
Simple test script for async streaming conversation search against the demo endpoint.
3+
"""
4+
5+
import asyncio
6+
import os
7+
import sys
8+
9+
curr_dir = os.path.dirname(os.path.realpath(__file__))
10+
sys.path.insert(1, os.path.abspath(os.path.join(curr_dir, os.pardir, "src")))
11+
12+
import typesense
13+
from typesense.types.document import MessageChunk, StreamConfigBuilder
14+
15+
# Use the public demo endpoint
16+
client = typesense.AsyncClient({
17+
"api_key": "8hLCPSQTYcBuK29zY5q6Xhin7ONxHy99",
18+
"nodes": [{"host": "qtg5aekc2iosjh93p.a1.typesense.net", "port": "443", "protocol": "https"}],
19+
"connection_timeout_seconds": 120,
20+
})
21+
22+
COLLECTION_NAME = "pg-essays"
23+
MODEL_ID = "gpt-4-turbo-model-222"
24+
25+
26+
async def main():
27+
print("Testing async streaming conversation search...")
28+
print("=" * 60)
29+
30+
message_parts: list[str] = []
31+
stream = StreamConfigBuilder()
32+
33+
@stream.on_chunk
34+
def handle_chunk(chunk: MessageChunk) -> None:
35+
message_parts.append(chunk["message"])
36+
print(chunk["message"], end="", flush=True)
37+
38+
@stream.on_complete
39+
def handle_complete(response: dict) -> None:
40+
print("\n")
41+
print("-" * 40)
42+
print(f"Streaming complete! Found {response.get('found', 0)} documents")
43+
44+
@stream.on_error
45+
def handle_error(error: BaseException) -> None:
46+
print(f"\nError: {error}")
47+
48+
query = "What are the characteristics of a good startup idea?"
49+
print(f"Query: {query}\n")
50+
print("Streaming response:")
51+
print("-" * 40)
52+
53+
response = await client.collections[COLLECTION_NAME].documents.search({
54+
"q": query,
55+
"query_by": "embedding",
56+
"conversation": True,
57+
"conversation_stream": True,
58+
"conversation_model_id": MODEL_ID,
59+
"exclude_fields": "embedding",
60+
"stream_config": stream,
61+
})
62+
63+
print(f"\nFull message length: {len(''.join(message_parts))} characters")
64+
65+
66+
if __name__ == "__main__":
67+
asyncio.run(main())

0 commit comments

Comments
 (0)