Skip to content

Commit d481ccc

Browse files
committed
default to no row limit inside data cloud
1 parent 884a9e0 commit d481ccc

8 files changed

Lines changed: 221 additions & 31 deletions

File tree

src/datacustomcode/client.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,25 +185,33 @@ def _new_function_client(cls) -> Client:
185185
)
186186
return cls._instance
187187

188-
def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
188+
def read_dlo(
189+
self, name: str, row_limit: Optional[int] = None
190+
) -> PySparkDataFrame:
189191
"""Read a DLO from Data Cloud.
190192
191193
Args:
192194
name: The name of the DLO to read.
193-
row_limit: Maximum number of rows to fetch (default: 1000).
195+
row_limit: Maximum number of rows to fetch. When ``None``, the
196+
reader's configured ``default_row_limit`` is used (1000 for
197+
local development, no limit when deployed).
194198
195199
Returns:
196200
A PySpark DataFrame containing the DLO data.
197201
"""
198202
self._record_dlo_access(name)
199203
return self._reader.read_dlo(name, row_limit=row_limit)
200204

201-
def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
205+
def read_dmo(
206+
self, name: str, row_limit: Optional[int] = None
207+
) -> PySparkDataFrame:
202208
"""Read a DMO from Data Cloud.
203209
204210
Args:
205211
name: The name of the DMO to read.
206-
row_limit: Maximum number of rows to fetch (default: 1000).
212+
row_limit: Maximum number of rows to fetch. When ``None``, the
213+
reader's configured ``default_row_limit`` is used (1000 for
214+
local development, no limit when deployed).
207215
208216
Returns:
209217
A PySpark DataFrame containing the DMO data.

src/datacustomcode/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ reader_config:
22
type_config_name: QueryAPIDataCloudReader
33
options:
44
credentials_profile: default
5+
default_row_limit: 1000
56

67
writer_config:
78
type_config_name: PrintDataCloudWriter

src/datacustomcode/io/reader/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from __future__ import annotations
1616

1717
from abc import abstractmethod
18-
from typing import TYPE_CHECKING, Union
18+
from typing import TYPE_CHECKING, Optional, Union
1919

2020
from datacustomcode.io.base import BaseDataAccessLayer
2121

@@ -33,13 +33,13 @@ def read_dlo(
3333
self,
3434
name: str,
3535
schema: Union[AtomicType, StructType, str, None] = None,
36-
row_limit: int = 1000,
36+
row_limit: Optional[int] = None,
3737
) -> PySparkDataFrame: ...
3838

3939
@abstractmethod
4040
def read_dmo(
4141
self,
4242
name: str,
4343
schema: Union[AtomicType, StructType, str, None] = None,
44-
row_limit: int = 1000,
44+
row_limit: Optional[int] = None,
4545
) -> PySparkDataFrame: ...

src/datacustomcode/io/reader/query_api.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838

3939
SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}"
40+
SQL_QUERY_TEMPLATE_NO_LIMIT: Final = "SELECT * FROM {}"
4041

4142

4243
def create_cdp_connection(
@@ -122,6 +123,7 @@ def __init__(
122123
credentials_profile: str = "default",
123124
dataspace: Optional[str] = None,
124125
sf_cli_org: Optional[str] = None,
126+
default_row_limit: Optional[int] = None,
125127
) -> None:
126128
"""Initialize QueryAPIDataCloudReader.
127129
@@ -137,8 +139,12 @@ def __init__(
137139
reader delegates to :class:`SFCLIDataCloudReader` which calls
138140
the Data Cloud REST API directly using the token obtained from
139141
``sf org display``, bypassing the CDP token-exchange flow.
142+
default_row_limit: Default maximum number of rows to fetch when
143+
``row_limit`` is not explicitly passed to read methods. When
144+
``None``, no limit is applied (all rows are returned).
140145
"""
141146
self.spark = spark
147+
self._default_row_limit = default_row_limit
142148
if sf_cli_org:
143149
logger.debug(
144150
f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'"
@@ -147,6 +153,7 @@ def __init__(
147153
spark=spark,
148154
sf_cli_org=sf_cli_org,
149155
dataspace=dataspace,
156+
default_row_limit=default_row_limit,
150157
)
151158
self._conn = None
152159
else:
@@ -158,19 +165,37 @@ def __init__(
158165
)
159166
self._conn = create_cdp_connection(credentials, dataspace)
160167

168+
def _build_query(self, name: str, row_limit: Optional[int]) -> str:
169+
"""Build a SQL query, applying the default row limit when needed.
170+
171+
Args:
172+
name: Object name to query.
173+
row_limit: Explicit row limit, or ``None`` to use the configured default.
174+
175+
Returns:
176+
SQL query string.
177+
"""
178+
effective_limit = (
179+
row_limit if row_limit is not None else self._default_row_limit
180+
)
181+
if effective_limit is not None:
182+
return SQL_QUERY_TEMPLATE.format(name, effective_limit)
183+
return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name)
184+
161185
def read_dlo(
162186
self,
163187
name: str,
164188
schema: Union[AtomicType, StructType, str, None] = None,
165-
row_limit: int = 1000,
189+
row_limit: Optional[int] = None,
166190
) -> PySparkDataFrame:
167191
"""
168-
Read a Data Lake Object (DLO) from the Data Cloud, limited to a number of rows.
192+
Read a Data Lake Object (DLO) from the Data Cloud.
169193
170194
Args:
171195
name (str): The name of the DLO.
172196
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO.
173-
row_limit (int): Maximum number of rows to fetch.
197+
row_limit (Optional[int]): Maximum number of rows to fetch.
198+
When ``None``, the configured ``default_row_limit`` is used.
174199
175200
Returns:
176201
PySparkDataFrame: The PySpark DataFrame.
@@ -181,7 +206,7 @@ def read_dlo(
181206
if sf_cli_reader is not None:
182207
return sf_cli_reader.read_dlo(name, schema, row_limit)
183208

184-
query = SQL_QUERY_TEMPLATE.format(name, row_limit)
209+
query = self._build_query(name, row_limit)
185210

186211
assert self._conn is not None
187212
pandas_df = self._conn.get_pandas_dataframe(query)
@@ -197,15 +222,16 @@ def read_dmo(
197222
self,
198223
name: str,
199224
schema: Union[AtomicType, StructType, str, None] = None,
200-
row_limit: int = 1000,
225+
row_limit: Optional[int] = None,
201226
) -> PySparkDataFrame:
202227
"""
203-
Read a Data Model Object (DMO) from the Data Cloud, limited to a number of rows.
228+
Read a Data Model Object (DMO) from the Data Cloud.
204229
205230
Args:
206231
name (str): The name of the DMO.
207232
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO.
208-
row_limit (int): Maximum number of rows to fetch.
233+
row_limit (Optional[int]): Maximum number of rows to fetch.
234+
When ``None``, the configured ``default_row_limit`` is used.
209235
210236
Returns:
211237
PySparkDataFrame: The PySpark DataFrame.
@@ -216,7 +242,7 @@ def read_dmo(
216242
if sf_cli_reader is not None:
217243
return sf_cli_reader.read_dmo(name, schema, row_limit)
218244

219-
query = SQL_QUERY_TEMPLATE.format(name, row_limit)
245+
query = self._build_query(name, row_limit)
220246

221247
assert self._conn is not None
222248
pandas_df = self._conn.get_pandas_dataframe(query)

src/datacustomcode/io/reader/sf_cli.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(
5555
spark: SparkSession,
5656
sf_cli_org: str,
5757
dataspace: Optional[str] = None,
58+
default_row_limit: Optional[int] = None,
5859
) -> None:
5960
"""Initialize SFCLIDataCloudReader.
6061
@@ -64,9 +65,13 @@ def __init__(
6465
(e.g. the alias given to ``sf org login web --alias dev1``).
6566
dataspace: Optional dataspace identifier. If ``None`` or
6667
``"default"`` the query runs against the default dataspace.
68+
default_row_limit: Default maximum number of rows to fetch when
69+
``row_limit`` is not explicitly passed to read methods. When
70+
``None``, no limit is applied (all rows are returned).
6771
"""
6872
self.spark = spark
6973
self.sf_cli_org = sf_cli_org
74+
self._default_row_limit = default_row_limit
7075
self.dataspace = (
7176
dataspace if dataspace and dataspace != "default" else "default"
7277
)
@@ -132,12 +137,12 @@ def _get_token(self) -> tuple[str, str]:
132137
logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'")
133138
return access_token, instance_url
134139

135-
def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
140+
def _execute_query(self, sql: str, row_limit: Optional[int]) -> pd.DataFrame:
136141
"""Execute *sql* against the Data Cloud REST endpoint.
137142
138143
Args:
139144
sql: Base SQL query (no ``LIMIT`` clause).
140-
row_limit: Maximum rows to return.
145+
row_limit: Maximum rows to return, or ``None`` for no limit.
141146
142147
Returns:
143148
Pandas DataFrame with query results.
@@ -147,10 +152,16 @@ def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
147152
"""
148153
access_token, instance_url = self._get_token()
149154

155+
effective_limit = (
156+
row_limit if row_limit is not None else self._default_row_limit
157+
)
150158
url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql"
151159
headers = {"Authorization": f"Bearer {access_token}"}
152160
params = {"dataspace": self.dataspace}
153-
body = {"sql": f"{sql} LIMIT {row_limit}"}
161+
if effective_limit is not None:
162+
body = {"sql": f"{sql} LIMIT {effective_limit}"}
163+
else:
164+
body = {"sql": sql}
154165

155166
logger.debug(f"Executing Data Cloud query: {body['sql']}")
156167

@@ -190,14 +201,14 @@ def read_dlo(
190201
self,
191202
name: str,
192203
schema: Union[AtomicType, StructType, str, None] = None,
193-
row_limit: int = 1000,
204+
row_limit: Optional[int] = None,
194205
) -> PySparkDataFrame:
195206
"""Read a Data Lake Object (DLO) from Data Cloud.
196207
197208
Args:
198209
name: DLO name.
199210
schema: Optional explicit schema.
200-
row_limit: Maximum rows to fetch.
211+
row_limit: Maximum rows to fetch, or ``None`` to use the configured default.
201212
202213
Returns:
203214
PySpark DataFrame.
@@ -211,14 +222,14 @@ def read_dmo(
211222
self,
212223
name: str,
213224
schema: Union[AtomicType, StructType, str, None] = None,
214-
row_limit: int = 1000,
225+
row_limit: Optional[int] = None,
215226
) -> PySparkDataFrame:
216227
"""Read a Data Model Object (DMO) from Data Cloud.
217228
218229
Args:
219230
name: DMO name.
220231
schema: Optional explicit schema.
221-
row_limit: Maximum rows to fetch.
232+
row_limit: Maximum rows to fetch, or ``None`` to use the configured default.
222233
223234
Returns:
224235
PySpark DataFrame.

tests/io/reader/test_query_api.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from datacustomcode.io.reader.query_api import (
2222
SQL_QUERY_TEMPLATE,
23+
SQL_QUERY_TEMPLATE_NO_LIMIT,
2324
QueryAPIDataCloudReader,
2425
)
2526
from datacustomcode.io.reader.utils import _pandas_to_spark_schema
@@ -188,6 +189,7 @@ def reader_without_init(self, mock_spark_session):
188189
with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None):
189190
reader = QueryAPIDataCloudReader(None) # None is ignored due to mock
190191
reader.spark = mock_spark_session
192+
reader._default_row_limit = 1000
191193
yield reader
192194

193195
def test_pandas_to_spark_schema_function(self):
@@ -341,3 +343,83 @@ def test_read_dmo_schema_is_lowercase(
341343

342344
_, schema_arg = reader_without_init.spark.createDataFrame.call_args[0]
343345
assert all(f.name == f.name.lower() for f in schema_arg.fields)
346+
347+
348+
@pytest.mark.usefixtures("patch_all_requests")
349+
class TestQueryAPIDataCloudReaderNoDefaultLimit:
350+
"""Tests for deployed behavior where default_row_limit is None (no limit)."""
351+
352+
@pytest.fixture(scope="class", autouse=True)
353+
def patch_all_requests(self, request):
354+
patches = []
355+
for target in [
356+
"requests.get",
357+
"requests.post",
358+
"requests.session",
359+
"requests.adapters.HTTPAdapter.send",
360+
"urllib3.connectionpool.HTTPConnectionPool.urlopen",
361+
]:
362+
patcher = patch(target)
363+
patches.append(patcher)
364+
patcher.start()
365+
366+
def fin():
367+
for patcher in patches:
368+
patcher.stop()
369+
370+
request.addfinalizer(fin)
371+
372+
@pytest.fixture
373+
def mock_spark_session(self):
374+
spark = MagicMock()
375+
spark.createDataFrame.return_value = spark
376+
return spark
377+
378+
@pytest.fixture
379+
def mock_pandas_dataframe(self):
380+
return pd.DataFrame({"Col1__c": [1, 2], "Col2__c": ["a", "b"]})
381+
382+
@pytest.fixture
383+
def mock_connection(self, mock_pandas_dataframe):
384+
mock_conn = MagicMock()
385+
mock_conn.get_pandas_dataframe.return_value = mock_pandas_dataframe
386+
return mock_conn
387+
388+
@pytest.fixture
389+
def reader_no_limit(self, mock_spark_session):
390+
"""Reader with no default row limit (simulates deployed environment)."""
391+
with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None):
392+
reader = QueryAPIDataCloudReader(None)
393+
reader.spark = mock_spark_session
394+
reader._default_row_limit = None
395+
yield reader
396+
397+
def test_read_dlo_no_limit_when_deployed(
398+
self, reader_no_limit, mock_connection, mock_pandas_dataframe
399+
):
400+
"""When default_row_limit is None and no explicit row_limit, omit LIMIT."""
401+
reader_no_limit._conn = mock_connection
402+
reader_no_limit.read_dlo("test_dlo")
403+
mock_connection.get_pandas_dataframe.assert_called_once_with(
404+
SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dlo")
405+
)
406+
407+
def test_read_dmo_no_limit_when_deployed(
408+
self, reader_no_limit, mock_connection, mock_pandas_dataframe
409+
):
410+
"""When default_row_limit is None and no explicit row_limit, omit LIMIT."""
411+
reader_no_limit._conn = mock_connection
412+
reader_no_limit.read_dmo("test_dmo")
413+
mock_connection.get_pandas_dataframe.assert_called_once_with(
414+
SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dmo")
415+
)
416+
417+
def test_read_dlo_explicit_limit_still_applied_when_deployed(
418+
self, reader_no_limit, mock_connection, mock_pandas_dataframe
419+
):
420+
"""An explicit row_limit always applies, even without a default."""
421+
reader_no_limit._conn = mock_connection
422+
reader_no_limit.read_dlo("test_dlo", row_limit=500)
423+
mock_connection.get_pandas_dataframe.assert_called_once_with(
424+
SQL_QUERY_TEMPLATE.format("test_dlo", 500)
425+
)

0 commit comments

Comments
 (0)