Skip to content

Commit 5c7fe22

Browse files
authored
Merge pull request #71 from forcedotcom/jo_row_limit
expose row_limit
2 parents 8f0049e + 7aad107 commit 5c7fe22

5 files changed

Lines changed: 79 additions & 14 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Use of this project with Salesforce is subject to the [TERMS OF USE](./TERMS_OF_
99
## Prerequisites
1010

1111
- **Python 3.11 only** (currently supported version - if your system version is different, we recommend using [pyenv](https://github.com/pyenv/pyenv) to configure 3.11)
12-
- [Azul Zulu OpenJDK 17.x](https://www.azul.com/downloads/?version=java-17-lts&package=jdk#zulu)
12+
- JDK 17
1313
- Docker support like [Docker Desktop](https://docs.docker.com/desktop/)
1414
- A salesforce org with some DLOs or DMOs with data and this feature enabled (it is not GA)
1515
- An [External Client App](#creating-an-external-client-app)

src/datacustomcode/client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,29 +185,31 @@ def _new_function_client(cls) -> Client:
185185
)
186186
return cls._instance
187187

188-
def read_dlo(self, name: str) -> PySparkDataFrame:
188+
def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
189189
"""Read a DLO from Data Cloud.
190190
191191
Args:
192192
name: The name of the DLO to read.
193+
row_limit: Maximum number of rows to fetch (default: 1000).
193194
194195
Returns:
195196
A PySpark DataFrame containing the DLO data.
196197
"""
197198
self._record_dlo_access(name)
198-
return self._reader.read_dlo(name)
199+
return self._reader.read_dlo(name, row_limit=row_limit)
199200

200-
def read_dmo(self, name: str) -> PySparkDataFrame:
201+
def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
201202
"""Read a DMO from Data Cloud.
202203
203204
Args:
204205
name: The name of the DMO to read.
206+
row_limit: Maximum number of rows to fetch (default: 1000).
205207
206208
Returns:
207209
A PySpark DataFrame containing the DMO data.
208210
"""
209211
self._record_dmo_access(name)
210-
return self._reader.read_dmo(name)
212+
return self._reader.read_dmo(name, row_limit=row_limit)
211213

212214
def write_to_dlo(
213215
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs

src/datacustomcode/io/reader/base.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,31 @@
1515
from __future__ import annotations
1616

1717
from abc import abstractmethod
18-
from typing import TYPE_CHECKING
18+
from typing import TYPE_CHECKING, Union
1919

2020
from datacustomcode.io.base import BaseDataAccessLayer
2121

2222
if TYPE_CHECKING:
2323
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
24+
from pyspark.sql.types import AtomicType, StructType
2425

2526

2627
class BaseDataCloudReader(BaseDataAccessLayer):
2728
def __init__(self, spark: SparkSession):
2829
self.spark = spark
2930

3031
@abstractmethod
31-
def read_dlo(self, name: str) -> PySparkDataFrame: ...
32+
def read_dlo(
33+
self,
34+
name: str,
35+
schema: Union[AtomicType, StructType, str, None] = None,
36+
row_limit: int = 1000,
37+
) -> PySparkDataFrame: ...
3238

3339
@abstractmethod
34-
def read_dmo(self, name: str) -> PySparkDataFrame: ...
40+
def read_dmo(
41+
self,
42+
name: str,
43+
schema: Union[AtomicType, StructType, str, None] = None,
44+
row_limit: int = 1000,
45+
) -> PySparkDataFrame: ...

tests/io/reader/test_query_api.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,27 @@ def test_read_dmo_with_schema(
277277
reader_without_init.spark.createDataFrame.assert_called_once()
278278
args, _ = reader_without_init.spark.createDataFrame.call_args
279279
assert args[1] is custom_schema
280+
281+
def test_read_dlo_with_custom_row_limit(
282+
self, reader_without_init, mock_connection, mock_pandas_dataframe
283+
):
284+
"""Test read_dlo method with custom row_limit."""
285+
reader_without_init._conn = mock_connection
286+
287+
reader_without_init.read_dlo("test_dlo", row_limit=50)
288+
289+
mock_connection.get_pandas_dataframe.assert_called_once_with(
290+
SQL_QUERY_TEMPLATE.format("test_dlo", 50)
291+
)
292+
293+
def test_read_dmo_with_custom_row_limit(
294+
self, reader_without_init, mock_connection, mock_pandas_dataframe
295+
):
296+
"""Test read_dmo method with custom row_limit."""
297+
reader_without_init._conn = mock_connection
298+
299+
reader_without_init.read_dmo("test_dmo", row_limit=25)
300+
301+
mock_connection.get_pandas_dataframe.assert_called_once_with(
302+
SQL_QUERY_TEMPLATE.format("test_dmo", 25)
303+
)

tests/test_client.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ class MockDataCloudReader(BaseDataCloudReader):
2525

2626
CONFIG_NAME = "MockDataCloudReader"
2727

28-
def read_dlo(self, name: str) -> DataFrame:
28+
def read_dlo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame:
2929
df = MagicMock(spec=DataFrame)
3030
return df
3131

32-
def read_dmo(self, name: str) -> DataFrame:
32+
def read_dmo(self, name: str, schema=None, row_limit: int = 1000) -> DataFrame:
3333
df = MagicMock(spec=DataFrame)
3434
return df
3535

@@ -153,7 +153,7 @@ def test_read_dlo(self, reset_client, mock_spark, mock_proxy):
153153
client = Client(reader=reader, writer=writer, proxy=mock_proxy)
154154
result = client.read_dlo("test_dlo")
155155

156-
reader.read_dlo.assert_called_once_with("test_dlo")
156+
reader.read_dlo.assert_called_once_with("test_dlo", row_limit=1000)
157157
assert result is mock_df
158158
assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO]
159159

@@ -166,7 +166,7 @@ def test_read_dmo(self, reset_client, mock_spark, mock_proxy):
166166
client = Client(reader=reader, writer=writer, proxy=mock_proxy)
167167
result = client.read_dmo("test_dmo")
168168

169-
reader.read_dmo.assert_called_once_with("test_dmo")
169+
reader.read_dmo.assert_called_once_with("test_dmo", row_limit=1000)
170170
assert result is mock_df
171171
assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO]
172172

@@ -238,7 +238,7 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy):
238238
df = client.read_dlo("source_dlo")
239239
client.write_to_dlo("target_dlo", df, WriteMode.APPEND)
240240

241-
reader.read_dlo.assert_called_once_with("source_dlo")
241+
reader.read_dlo.assert_called_once_with("source_dlo", row_limit=1000)
242242
writer.write_to_dlo.assert_called_once_with(
243243
"target_dlo", mock_df, WriteMode.APPEND
244244
)
@@ -253,13 +253,41 @@ def test_read_pattern_flow(self, reset_client, mock_spark, mock_proxy):
253253
df = client.read_dmo("source_dmo")
254254
client.write_to_dmo("target_dmo", df, WriteMode.MERGE)
255255

256-
reader.read_dmo.assert_called_once_with("source_dmo")
256+
reader.read_dmo.assert_called_once_with("source_dmo", row_limit=1000)
257257
writer.write_to_dmo.assert_called_once_with(
258258
"target_dmo", mock_df, WriteMode.MERGE
259259
)
260260

261261
assert "source_dmo" in client._data_layer_history[DataCloudObjectType.DMO]
262262

263+
def test_read_dlo_with_row_limit(self, reset_client, mock_spark, mock_proxy):
264+
"""Test that row_limit parameter is passed through to reader."""
265+
reader = MagicMock(spec=BaseDataCloudReader)
266+
writer = MagicMock(spec=BaseDataCloudWriter)
267+
mock_df = MagicMock(spec=DataFrame)
268+
reader.read_dlo.return_value = mock_df
269+
270+
client = Client(reader=reader, writer=writer, proxy=mock_proxy)
271+
result = client.read_dlo("test_dlo", row_limit=500)
272+
273+
reader.read_dlo.assert_called_once_with("test_dlo", row_limit=500)
274+
assert result is mock_df
275+
assert "test_dlo" in client._data_layer_history[DataCloudObjectType.DLO]
276+
277+
def test_read_dmo_with_row_limit(self, reset_client, mock_spark, mock_proxy):
278+
"""Test that row_limit parameter is passed through to reader."""
279+
reader = MagicMock(spec=BaseDataCloudReader)
280+
writer = MagicMock(spec=BaseDataCloudWriter)
281+
mock_df = MagicMock(spec=DataFrame)
282+
reader.read_dmo.return_value = mock_df
283+
284+
client = Client(reader=reader, writer=writer, proxy=mock_proxy)
285+
result = client.read_dmo("test_dmo", row_limit=100)
286+
287+
reader.read_dmo.assert_called_once_with("test_dmo", row_limit=100)
288+
assert result is mock_df
289+
assert "test_dmo" in client._data_layer_history[DataCloudObjectType.DMO]
290+
263291

264292
# Add tests for DefaultSparkSessionProvider
265293
class TestDefaultSparkSessionProvider:

0 commit comments

Comments
 (0)