Skip to content

Commit e03dcff

Browse files
committed
fix(documents): split import batch sizing into server and client params
1 parent e863b44 commit e03dcff

4 files changed

Lines changed: 180 additions & 16 deletions

File tree

src/typesense/async_/documents.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ async def import_(
228228
documents: typing.List[TDoc],
229229
import_parameters: DocumentImportParametersReturnDocAndId,
230230
batch_size: typing.Union[int, None] = None,
231+
client_batch_size: typing.Union[int, None] = None,
231232
) -> typing.List[
232233
typing.Union[ImportResponseWithDocAndId[TDoc], ImportResponseFail[TDoc]]
233234
]: ...
@@ -238,6 +239,7 @@ async def import_(
238239
documents: typing.List[TDoc],
239240
import_parameters: DocumentImportParametersReturnId,
240241
batch_size: typing.Union[int, None] = None,
242+
client_batch_size: typing.Union[int, None] = None,
241243
) -> typing.List[typing.Union[ImportResponseWithId, ImportResponseFail[TDoc]]]: ...
242244

243245
@typing.overload
@@ -246,6 +248,7 @@ async def import_(
246248
documents: typing.List[TDoc],
247249
import_parameters: typing.Union[DocumentWriteParameters, None] = None,
248250
batch_size: typing.Union[int, None] = None,
251+
client_batch_size: typing.Union[int, None] = None,
249252
) -> typing.List[typing.Union[ImportResponseSuccess, ImportResponseFail[TDoc]]]: ...
250253

251254
@typing.overload
@@ -254,6 +257,7 @@ async def import_(
254257
documents: typing.List[TDoc],
255258
import_parameters: DocumentImportParametersReturnDoc,
256259
batch_size: typing.Union[int, None] = None,
260+
client_batch_size: typing.Union[int, None] = None,
257261
) -> typing.List[
258262
typing.Union[ImportResponseWithDoc[TDoc], ImportResponseFail[TDoc]]
259263
]: ...
@@ -264,6 +268,7 @@ async def import_(
264268
documents: typing.List[TDoc],
265269
import_parameters: _ImportParameters,
266270
batch_size: typing.Union[int, None] = None,
271+
client_batch_size: typing.Union[int, None] = None,
267272
) -> typing.List[ImportResponse[TDoc]]: ...
268273

269274
@typing.overload
@@ -272,13 +277,15 @@ async def import_(
272277
documents: typing.Union[bytes, str],
273278
import_parameters: _ImportParameters = None,
274279
batch_size: typing.Union[int, None] = None,
280+
client_batch_size: typing.Union[int, None] = None,
275281
) -> str: ...
276282

277283
async def import_(
278284
self,
279285
documents: typing.Union[bytes, str, typing.List[TDoc]],
280286
import_parameters: _ImportParameters = None,
281287
batch_size: typing.Union[int, None] = None,
288+
client_batch_size: typing.Union[int, None] = None,
282289
) -> typing.Union[ImportResponse[TDoc], str]:
283290
"""
284291
Import documents into the collection.
@@ -289,21 +296,33 @@ async def import_(
289296
Args:
290297
documents: The documents to import.
291298
import_parameters: Parameters for the import operation.
292-
batch_size: The size of each batch for batch imports.
299+
batch_size: Typesense import `batch_size` sent as request query parameter.
300+
client_batch_size: Client-side chunk size. When set, this method
301+
splits the input list and performs multiple import requests.
293302
294303
Returns:
295304
The import response, which can be a list of responses or a string.
296305
297306
Raises:
298307
TypesenseClientError: If an empty list of documents is provided.
299308
"""
309+
merged_import_parameters: DocumentImportParameters = {}
310+
if import_parameters:
311+
merged_import_parameters.update(import_parameters)
312+
if batch_size is not None:
313+
merged_import_parameters["batch_size"] = batch_size
314+
300315
if isinstance(documents, (str, bytes)):
301-
return await self._import_raw(documents, import_parameters)
316+
return await self._import_raw(documents, merged_import_parameters)
302317

303-
if batch_size:
304-
return await self._batch_import(documents, import_parameters, batch_size)
318+
if client_batch_size:
319+
return await self._batch_import(
320+
documents,
321+
merged_import_parameters,
322+
client_batch_size,
323+
)
305324

306-
return await self._bulk_import(documents, import_parameters)
325+
return await self._bulk_import(documents, merged_import_parameters)
307326

308327
async def export(
309328
self,

src/typesense/sync/documents.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ def import_(
228228
documents: typing.List[TDoc],
229229
import_parameters: DocumentImportParametersReturnDocAndId,
230230
batch_size: typing.Union[int, None] = None,
231+
client_batch_size: typing.Union[int, None] = None,
231232
) -> typing.List[
232233
typing.Union[ImportResponseWithDocAndId[TDoc], ImportResponseFail[TDoc]]
233234
]: ...
@@ -238,6 +239,7 @@ def import_(
238239
documents: typing.List[TDoc],
239240
import_parameters: DocumentImportParametersReturnId,
240241
batch_size: typing.Union[int, None] = None,
242+
client_batch_size: typing.Union[int, None] = None,
241243
) -> typing.List[typing.Union[ImportResponseWithId, ImportResponseFail[TDoc]]]: ...
242244

243245
@typing.overload
@@ -246,6 +248,7 @@ def import_(
246248
documents: typing.List[TDoc],
247249
import_parameters: typing.Union[DocumentWriteParameters, None] = None,
248250
batch_size: typing.Union[int, None] = None,
251+
client_batch_size: typing.Union[int, None] = None,
249252
) -> typing.List[typing.Union[ImportResponseSuccess, ImportResponseFail[TDoc]]]: ...
250253

251254
@typing.overload
@@ -254,6 +257,7 @@ def import_(
254257
documents: typing.List[TDoc],
255258
import_parameters: DocumentImportParametersReturnDoc,
256259
batch_size: typing.Union[int, None] = None,
260+
client_batch_size: typing.Union[int, None] = None,
257261
) -> typing.List[
258262
typing.Union[ImportResponseWithDoc[TDoc], ImportResponseFail[TDoc]]
259263
]: ...
@@ -264,6 +268,7 @@ def import_(
264268
documents: typing.List[TDoc],
265269
import_parameters: _ImportParameters,
266270
batch_size: typing.Union[int, None] = None,
271+
client_batch_size: typing.Union[int, None] = None,
267272
) -> typing.List[ImportResponse[TDoc]]: ...
268273

269274
@typing.overload
@@ -272,13 +277,15 @@ def import_(
272277
documents: typing.Union[bytes, str],
273278
import_parameters: _ImportParameters = None,
274279
batch_size: typing.Union[int, None] = None,
280+
client_batch_size: typing.Union[int, None] = None,
275281
) -> str: ...
276282

277283
def import_(
278284
self,
279285
documents: typing.Union[bytes, str, typing.List[TDoc]],
280286
import_parameters: _ImportParameters = None,
281287
batch_size: typing.Union[int, None] = None,
288+
client_batch_size: typing.Union[int, None] = None,
282289
) -> typing.Union[ImportResponse[TDoc], str]:
283290
"""
284291
Import documents into the collection.
@@ -289,21 +296,33 @@ def import_(
289296
Args:
290297
documents: The documents to import.
291298
import_parameters: Parameters for the import operation.
292-
batch_size: The size of each batch for batch imports.
299+
batch_size: Typesense import `batch_size` sent as request query parameter.
300+
client_batch_size: Client-side chunk size. When set, this method
301+
splits the input list and performs multiple import requests.
293302
294303
Returns:
295304
The import response, which can be a list of responses or a string.
296305
297306
Raises:
298307
TypesenseClientError: If an empty list of documents is provided.
299308
"""
309+
merged_import_parameters: DocumentImportParameters = {}
310+
if import_parameters:
311+
merged_import_parameters.update(import_parameters)
312+
if batch_size is not None:
313+
merged_import_parameters["batch_size"] = batch_size
314+
300315
if isinstance(documents, (str, bytes)):
301-
return self._import_raw(documents, import_parameters)
316+
return self._import_raw(documents, merged_import_parameters)
302317

303-
if batch_size:
304-
return self._batch_import(documents, import_parameters, batch_size)
318+
if client_batch_size:
319+
return self._batch_import(
320+
documents,
321+
merged_import_parameters,
322+
client_batch_size,
323+
)
305324

306-
return self._bulk_import(documents, import_parameters)
325+
return self._bulk_import(documents, merged_import_parameters)
307326

308327
def export(
309328
self,

src/typesense/types/document.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ class DocumentWriteParameters(DirtyValuesParameters):
9494
If coercion fails, drop the particular field and index the rest of the document.
9595
- `drop`: Drop the particular field and index the rest of the document.
9696
- `reject`: Reject the write outright with an error message.
97+
98+
batch_size (int): Batch size to be sent as query param for the import endpoint.
9799
"""
98100

99101
action: typing.NotRequired[typing.Literal["create", "update", "upsert", "emplace"]]
102+
batch_size: typing.NotRequired[int]
100103

101104

102105
class UpdateByFilterParameters(typing.TypedDict):

tests/documents_test.py

Lines changed: 129 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,26 +296,75 @@ def test_import_json_fail(
296296
actual_documents.import_(generate_companies)
297297

298298

299-
def test_import_batch_size(
299+
def test_import_client_batch_size(
300300
generate_companies: typing.List[Companies],
301301
actual_documents: Documents[Companies],
302302
actual_api_call: ApiCall,
303303
delete_all: None,
304304
create_collection: None,
305305
mocker: MockFixture,
306306
) -> None:
307-
"""Test that the Documents object can import documents in batches."""
308-
batch_size = 5
307+
"""Test that the Documents object can import documents in client-side batches."""
308+
client_batch_size = 5
309309
import_spy = mocker.spy(actual_documents, "import_")
310310
batch_import_spy = mocker.spy(actual_documents, "_bulk_import")
311311
request_spy = mocker.spy(actual_api_call, "post")
312-
response = actual_documents.import_(generate_companies, batch_size=batch_size)
312+
response = actual_documents.import_(
313+
generate_companies,
314+
client_batch_size=client_batch_size,
315+
)
313316

314317
expected = [{"success": True} for _ in generate_companies]
315318
assert import_spy.call_count == 1
316-
assert batch_import_spy.call_count == len(generate_companies) // batch_size
317-
assert request_spy.call_count == len(generate_companies) // batch_size
319+
assert batch_import_spy.call_count == len(generate_companies) // client_batch_size
320+
assert request_spy.call_count == len(generate_companies) // client_batch_size
321+
assert response == expected
322+
323+
324+
def test_import_batch_size_query_parameter(
325+
generate_companies: typing.List[Companies],
326+
actual_documents: Documents[Companies],
327+
actual_api_call: ApiCall,
328+
delete_all: None,
329+
create_collection: None,
330+
mocker: MockFixture,
331+
) -> None:
332+
"""Test that batch_size arg is sent as an import query parameter."""
333+
request_spy = mocker.spy(actual_api_call, "post")
334+
response = actual_documents.import_(generate_companies, batch_size=42)
335+
336+
expected = [{"success": True} for _ in generate_companies]
337+
assert response == expected
338+
request_spy.assert_called_once_with(
339+
"/collections/companies/documents/import",
340+
body="\n".join([json.dumps(doc) for doc in generate_companies]),
341+
params={"batch_size": 42},
342+
entity_type=str,
343+
as_json=False,
344+
)
345+
346+
347+
def test_import_batch_size_query_parameter_from_import_parameters(
348+
generate_companies: typing.List[Companies],
349+
actual_documents: Documents[Companies],
350+
actual_api_call: ApiCall,
351+
delete_all: None,
352+
create_collection: None,
353+
mocker: MockFixture,
354+
) -> None:
355+
"""Test that import_parameters.batch_size is sent as an import query parameter."""
356+
request_spy = mocker.spy(actual_api_call, "post")
357+
response = actual_documents.import_(generate_companies, {"batch_size": 42})
358+
359+
expected = [{"success": True} for _ in generate_companies]
318360
assert response == expected
361+
request_spy.assert_called_once_with(
362+
"/collections/companies/documents/import",
363+
body="\n".join([json.dumps(doc) for doc in generate_companies]),
364+
params={"batch_size": 42},
365+
entity_type=str,
366+
as_json=False,
367+
)
319368

320369

321370
def test_import_return_docs(
@@ -524,6 +573,80 @@ async def test_upsert_async(
524573
assert response == company
525574

526575

576+
async def test_import_batch_size_query_parameter_async(
577+
generate_companies: typing.List[Companies],
578+
actual_async_documents: AsyncDocuments[Companies],
579+
actual_async_api_call: AsyncApiCall,
580+
delete_all: None,
581+
create_collection: None,
582+
mocker: MockFixture,
583+
) -> None:
584+
"""Test that batch_size arg is sent as an import query parameter."""
585+
request_spy = mocker.spy(actual_async_api_call, "post")
586+
response = await actual_async_documents.import_(generate_companies, batch_size=42)
587+
588+
expected = [{"success": True} for _ in generate_companies]
589+
assert response == expected
590+
request_spy.assert_called_once_with(
591+
"/collections/companies/documents/import",
592+
body="\n".join([json.dumps(doc) for doc in generate_companies]),
593+
params={"batch_size": 42},
594+
entity_type=str,
595+
as_json=False,
596+
)
597+
598+
599+
async def test_import_batch_size_query_parameter_from_import_parameters_async(
600+
generate_companies: typing.List[Companies],
601+
actual_async_documents: AsyncDocuments[Companies],
602+
actual_async_api_call: AsyncApiCall,
603+
delete_all: None,
604+
create_collection: None,
605+
mocker: MockFixture,
606+
) -> None:
607+
"""Test that import_parameters.batch_size is sent as an import query parameter."""
608+
request_spy = mocker.spy(actual_async_api_call, "post")
609+
response = await actual_async_documents.import_(
610+
generate_companies,
611+
{"batch_size": 42},
612+
)
613+
614+
expected = [{"success": True} for _ in generate_companies]
615+
assert response == expected
616+
request_spy.assert_called_once_with(
617+
"/collections/companies/documents/import",
618+
body="\n".join([json.dumps(doc) for doc in generate_companies]),
619+
params={"batch_size": 42},
620+
entity_type=str,
621+
as_json=False,
622+
)
623+
624+
625+
async def test_import_client_batch_size_async(
626+
generate_companies: typing.List[Companies],
627+
actual_async_documents: AsyncDocuments[Companies],
628+
actual_async_api_call: AsyncApiCall,
629+
delete_all: None,
630+
create_collection: None,
631+
mocker: MockFixture,
632+
) -> None:
633+
"""Test that AsyncDocuments can import documents in client-side batches."""
634+
client_batch_size = 5
635+
import_spy = mocker.spy(actual_async_documents, "import_")
636+
batch_import_spy = mocker.spy(actual_async_documents, "_bulk_import")
637+
request_spy = mocker.spy(actual_async_api_call, "post")
638+
response = await actual_async_documents.import_(
639+
generate_companies,
640+
client_batch_size=client_batch_size,
641+
)
642+
643+
expected = [{"success": True} for _ in generate_companies]
644+
assert import_spy.call_count == 1
645+
assert batch_import_spy.call_count == len(generate_companies) // client_batch_size
646+
assert request_spy.call_count == len(generate_companies) // client_batch_size
647+
assert response == expected
648+
649+
527650
async def test_export_async(
528651
actual_async_documents: AsyncDocuments[Companies],
529652
delete_all: None,

0 commit comments

Comments
 (0)