Skip to content

Commit b7e6d66

Browse files
committed
test(streaming): add fixtures for async and sync stream handling
- provide fake sse stream responses and contexts for unit tests - add integration fixtures for conversational streaming collections and docs
1 parent 0d6635d commit b7e6d66

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
"""Fixtures for streaming tests."""
2+
3+
import json
4+
import os
5+
import sys
6+
from types import TracebackType
7+
8+
import pytest
9+
import requests
10+
11+
if sys.version_info >= (3, 11):
12+
import typing
13+
else:
14+
import typing_extensions as typing
15+
16+
17+
JSONPrimitive: typing.TypeAlias = typing.Union[str, int, float, bool, None]
18+
JSONValue: typing.TypeAlias = typing.Union[
19+
JSONPrimitive, typing.Dict[str, "JSONValue"], typing.List["JSONValue"]
20+
]
21+
JSONDict: typing.TypeAlias = typing.Dict[str, JSONValue]
22+
23+
24+
class FakeAsyncStreamResponse:
25+
"""Minimal async streaming response for httpx.AsyncClient.stream()."""
26+
27+
def __init__(
28+
self,
29+
*,
30+
lines: typing.Sequence[str],
31+
status_code: int = 200,
32+
headers: typing.Mapping[str, str] | None = None,
33+
text: str = "",
34+
) -> None:
35+
self.status_code = status_code
36+
self._lines = list(lines)
37+
self.headers = dict(headers or {})
38+
self.text = text
39+
40+
async def aiter_lines(self) -> typing.AsyncIterator[str]:
41+
for line in self._lines:
42+
yield line
43+
44+
async def aread(self) -> bytes:
45+
return self.text.encode()
46+
47+
def json(self) -> JSONDict:
48+
return typing.cast(JSONDict, json.loads(self.text))
49+
50+
51+
class FakeAsyncStreamContext:
52+
"""Async context manager that yields a fake streaming response."""
53+
54+
def __init__(self, response: FakeAsyncStreamResponse) -> None:
55+
self._response = response
56+
57+
async def __aenter__(self) -> FakeAsyncStreamResponse:
58+
return self._response
59+
60+
async def __aexit__(
61+
self,
62+
exc_type: type[BaseException] | None,
63+
exc_val: BaseException | None,
64+
exc_tb: TracebackType | None,
65+
) -> None:
66+
return None
67+
68+
69+
class FakeStreamResponse:
70+
"""Minimal streaming response for httpx.Client.stream()."""
71+
72+
def __init__(
73+
self,
74+
*,
75+
lines: typing.Sequence[str],
76+
status_code: int = 200,
77+
headers: typing.Mapping[str, str] | None = None,
78+
text: str = "",
79+
) -> None:
80+
self.status_code = status_code
81+
self._lines = list(lines)
82+
self.headers = dict(headers or {})
83+
self.text = text
84+
85+
def iter_lines(self) -> typing.Iterator[str]:
86+
for line in self._lines:
87+
yield line
88+
89+
def read(self) -> bytes:
90+
return self.text.encode()
91+
92+
def json(self) -> JSONDict:
93+
return typing.cast(JSONDict, json.loads(self.text))
94+
95+
96+
class FakeStreamContext:
97+
"""Sync context manager that yields a fake streaming response."""
98+
99+
def __init__(self, response: FakeStreamResponse) -> None:
100+
self._response = response
101+
102+
def __enter__(self) -> FakeStreamResponse:
103+
return self._response
104+
105+
def __exit__(
106+
self,
107+
exc_type: type[BaseException] | None,
108+
exc_val: BaseException | None,
109+
exc_tb: TracebackType | None,
110+
) -> None:
111+
return None
112+
113+
114+
@pytest.fixture(name="stream_response_async")
115+
def stream_response_async_fixture() -> type[FakeAsyncStreamResponse]:
116+
return FakeAsyncStreamResponse
117+
118+
119+
@pytest.fixture(name="stream_context_async")
120+
def stream_context_async_fixture() -> type[FakeAsyncStreamContext]:
121+
return FakeAsyncStreamContext
122+
123+
124+
@pytest.fixture(name="stream_response")
125+
def stream_response_fixture() -> type[FakeStreamResponse]:
126+
return FakeStreamResponse
127+
128+
129+
@pytest.fixture(name="stream_context")
130+
def stream_context_fixture() -> type[FakeStreamContext]:
131+
return FakeStreamContext
132+
133+
134+
@pytest.fixture(name="create_streaming_collection")
135+
def create_streaming_collection_fixture(delete_all: None) -> str:
136+
"""Create a collection for streaming tests with an auto-embedding field."""
137+
open_ai_key = os.environ.get("OPEN_AI_KEY")
138+
if not open_ai_key:
139+
pytest.skip("OPEN_AI_KEY is required for streaming integration tests.")
140+
url = "http://localhost:8108/collections"
141+
headers = {"X-TYPESENSE-API-KEY": "xyz"}
142+
collection_data = {
143+
"name": "streaming_docs",
144+
"fields": [
145+
{
146+
"name": "title",
147+
"type": "string",
148+
},
149+
{
150+
"name": "embedding",
151+
"type": "float[]",
152+
"embed": {
153+
"from": ["title"],
154+
"model_config": {
155+
"model_name": "openai/text-embedding-3-small",
156+
"api_key": open_ai_key,
157+
},
158+
},
159+
},
160+
],
161+
}
162+
163+
response = requests.post(url, headers=headers, json=collection_data, timeout=3)
164+
response.raise_for_status()
165+
return "streaming_docs"
166+
167+
168+
@pytest.fixture(name="create_streaming_document")
169+
def create_streaming_document_fixture(create_streaming_collection: str) -> str:
170+
"""Create a document for streaming tests."""
171+
url = "http://localhost:8108/collections/streaming_docs/documents"
172+
headers = {"X-TYPESENSE-API-KEY": "xyz"}
173+
document_data = {
174+
"id": "stream-1",
175+
"title": "Company profile",
176+
}
177+
178+
response = requests.post(url, headers=headers, json=document_data, timeout=3)
179+
response.raise_for_status()
180+
return "stream-1"

0 commit comments

Comments
 (0)