Skip to content

Commit 11ff105

Browse files
committed
remove row_limit parameter completely
1 parent d481ccc commit 11ff105

9 files changed

Lines changed: 52 additions & 161 deletions

File tree

src/datacustomcode/client.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -185,39 +185,29 @@ def _new_function_client(cls) -> Client:
185185
)
186186
return cls._instance
187187

188-
def read_dlo(
189-
self, name: str, row_limit: Optional[int] = None
190-
) -> PySparkDataFrame:
188+
def read_dlo(self, name: str) -> PySparkDataFrame:
191189
"""Read a DLO from Data Cloud.
192190
193191
Args:
194192
name: The name of the DLO to read.
195-
row_limit: Maximum number of rows to fetch. When ``None``, the
196-
reader's configured ``default_row_limit`` is used (1000 for
197-
local development, no limit when deployed).
198193
199194
Returns:
200195
A PySpark DataFrame containing the DLO data.
201196
"""
202197
self._record_dlo_access(name)
203-
return self._reader.read_dlo(name, row_limit=row_limit)
198+
return self._reader.read_dlo(name)
204199

205-
def read_dmo(
206-
self, name: str, row_limit: Optional[int] = None
207-
) -> PySparkDataFrame:
200+
def read_dmo(self, name: str) -> PySparkDataFrame:
208201
"""Read a DMO from Data Cloud.
209202
210203
Args:
211204
name: The name of the DMO to read.
212-
row_limit: Maximum number of rows to fetch. When ``None``, the
213-
reader's configured ``default_row_limit`` is used (1000 for
214-
local development, no limit when deployed).
215205
216206
Returns:
217207
A PySpark DataFrame containing the DMO data.
218208
"""
219209
self._record_dmo_access(name)
220-
return self._reader.read_dmo(name, row_limit=row_limit)
210+
return self._reader.read_dmo(name)
221211

222212
def write_to_dlo(
223213
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs

src/datacustomcode/io/reader/base.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from __future__ import annotations
1616

1717
from abc import abstractmethod
18-
from typing import TYPE_CHECKING, Optional, Union
18+
from typing import TYPE_CHECKING, Union
1919

2020
from datacustomcode.io.base import BaseDataAccessLayer
2121

@@ -33,13 +33,11 @@ def read_dlo(
3333
self,
3434
name: str,
3535
schema: Union[AtomicType, StructType, str, None] = None,
36-
row_limit: Optional[int] = None,
3736
) -> PySparkDataFrame: ...
3837

3938
@abstractmethod
4039
def read_dmo(
4140
self,
4241
name: str,
4342
schema: Union[AtomicType, StructType, str, None] = None,
44-
row_limit: Optional[int] = None,
4543
) -> PySparkDataFrame: ...

src/datacustomcode/io/reader/query_api.py

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ def __init__(
139139
reader delegates to :class:`SFCLIDataCloudReader` which calls
140140
the Data Cloud REST API directly using the token obtained from
141141
``sf org display``, bypassing the CDP token-exchange flow.
142-
default_row_limit: Default maximum number of rows to fetch when
143-
``row_limit`` is not explicitly passed to read methods. When
144-
``None``, no limit is applied (all rows are returned).
142+
default_row_limit: Maximum number of rows to fetch automatically.
143+
When ``None``, no limit is applied (all rows are returned).
144+
Set via ``default_row_limit`` in ``config.yaml`` reader options.
145145
"""
146146
self.spark = spark
147147
self._default_row_limit = default_row_limit
@@ -165,37 +165,30 @@ def __init__(
165165
)
166166
self._conn = create_cdp_connection(credentials, dataspace)
167167

168-
def _build_query(self, name: str, row_limit: Optional[int]) -> str:
169-
"""Build a SQL query, applying the default row limit when needed.
168+
def _build_query(self, name: str) -> str:
169+
"""Build a SQL query, applying the configured default row limit.
170170
171171
Args:
172172
name: Object name to query.
173-
row_limit: Explicit row limit, or ``None`` to use the configured default.
174173
175174
Returns:
176175
SQL query string.
177176
"""
178-
effective_limit = (
179-
row_limit if row_limit is not None else self._default_row_limit
180-
)
181-
if effective_limit is not None:
182-
return SQL_QUERY_TEMPLATE.format(name, effective_limit)
177+
if self._default_row_limit is not None:
178+
return SQL_QUERY_TEMPLATE.format(name, self._default_row_limit)
183179
return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name)
184180

185181
def read_dlo(
186182
self,
187183
name: str,
188184
schema: Union[AtomicType, StructType, str, None] = None,
189-
row_limit: Optional[int] = None,
190185
) -> PySparkDataFrame:
191186
"""
192187
Read a Data Lake Object (DLO) from the Data Cloud.
193188
194189
Args:
195190
name (str): The name of the DLO.
196191
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO.
197-
row_limit (Optional[int]): Maximum number of rows to fetch.
198-
When ``None``, the configured ``default_row_limit`` is used.
199192
200193
Returns:
201194
PySparkDataFrame: The PySpark DataFrame.
@@ -204,9 +197,9 @@ def read_dlo(
204197
self, "_sf_cli_reader", None
205198
)
206199
if sf_cli_reader is not None:
207-
return sf_cli_reader.read_dlo(name, schema, row_limit)
200+
return sf_cli_reader.read_dlo(name, schema)
208201

209-
query = self._build_query(name, row_limit)
202+
query = self._build_query(name)
210203

211204
assert self._conn is not None
212205
pandas_df = self._conn.get_pandas_dataframe(query)
@@ -222,16 +215,13 @@ def read_dmo(
222215
self,
223216
name: str,
224217
schema: Union[AtomicType, StructType, str, None] = None,
225-
row_limit: Optional[int] = None,
226218
) -> PySparkDataFrame:
227219
"""
228220
Read a Data Model Object (DMO) from the Data Cloud.
229221
230222
Args:
231223
name (str): The name of the DMO.
232224
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO.
233-
row_limit (Optional[int]): Maximum number of rows to fetch.
234-
When ``None``, the configured ``default_row_limit`` is used.
235225
236226
Returns:
237227
PySparkDataFrame: The PySpark DataFrame.
@@ -240,9 +230,9 @@ def read_dmo(
240230
self, "_sf_cli_reader", None
241231
)
242232
if sf_cli_reader is not None:
243-
return sf_cli_reader.read_dmo(name, schema, row_limit)
233+
return sf_cli_reader.read_dmo(name, schema)
244234

245-
query = self._build_query(name, row_limit)
235+
query = self._build_query(name)
246236

247237
assert self._conn is not None
248238
pandas_df = self._conn.get_pandas_dataframe(query)

src/datacustomcode/io/reader/sf_cli.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ def __init__(
6565
(e.g. the alias given to ``sf org login web --alias dev1``).
6666
dataspace: Optional dataspace identifier. If ``None`` or
6767
``"default"`` the query runs against the default dataspace.
68-
default_row_limit: Default maximum number of rows to fetch when
69-
``row_limit`` is not explicitly passed to read methods. When
70-
``None``, no limit is applied (all rows are returned).
68+
default_row_limit: Maximum number of rows to fetch automatically.
69+
When ``None``, no limit is applied (all rows are returned).
70+
Set via ``default_row_limit`` in ``config.yaml`` reader options.
7171
"""
7272
self.spark = spark
7373
self.sf_cli_org = sf_cli_org
@@ -137,12 +137,14 @@ def _get_token(self) -> tuple[str, str]:
137137
logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'")
138138
return access_token, instance_url
139139

140-
def _execute_query(self, sql: str, row_limit: Optional[int]) -> pd.DataFrame:
140+
def _execute_query(self, sql: str) -> pd.DataFrame:
141141
"""Execute *sql* against the Data Cloud REST endpoint.
142142
143+
The configured ``default_row_limit`` is automatically appended as a
144+
``LIMIT`` clause when set (typically for local development).
145+
143146
Args:
144147
sql: Base SQL query (no ``LIMIT`` clause).
145-
row_limit: Maximum rows to return, or ``None`` for no limit.
146148
147149
Returns:
148150
Pandas DataFrame with query results.
@@ -152,14 +154,11 @@ def _execute_query(self, sql: str, row_limit: Optional[int]) -> pd.DataFrame:
152154
"""
153155
access_token, instance_url = self._get_token()
154156

155-
effective_limit = (
156-
row_limit if row_limit is not None else self._default_row_limit
157-
)
158157
url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql"
159158
headers = {"Authorization": f"Bearer {access_token}"}
160159
params = {"dataspace": self.dataspace}
161-
if effective_limit is not None:
162-
body = {"sql": f"{sql} LIMIT {effective_limit}"}
160+
if self._default_row_limit is not None:
161+
body = {"sql": f"{sql} LIMIT {self._default_row_limit}"}
163162
else:
164163
body = {"sql": sql}
165164

@@ -201,19 +200,17 @@ def read_dlo(
201200
self,
202201
name: str,
203202
schema: Union[AtomicType, StructType, str, None] = None,
204-
row_limit: Optional[int] = None,
205203
) -> PySparkDataFrame:
206204
"""Read a Data Lake Object (DLO) from Data Cloud.
207205
208206
Args:
209207
name: DLO name.
210208
schema: Optional explicit schema.
211-
row_limit: Maximum rows to fetch, or ``None`` to use the configured default.
212209
213210
Returns:
214211
PySpark DataFrame.
215212
"""
216-
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
213+
pandas_df = self._execute_query(f"SELECT * FROM {name}")
217214
if not schema:
218215
schema = _pandas_to_spark_schema(pandas_df)
219216
return self.spark.createDataFrame(pandas_df, schema)
@@ -222,19 +219,17 @@ def read_dmo(
222219
self,
223220
name: str,
224221
schema: Union[AtomicType, StructType, str, None] = None,
225-
row_limit: Optional[int] = None,
226222
) -> PySparkDataFrame:
227223
"""Read a Data Model Object (DMO) from Data Cloud.
228224
229225
Args:
230226
name: DMO name.
231227
schema: Optional explicit schema.
232-
row_limit: Maximum rows to fetch, or ``None`` to use the configured default.
233228
234229
Returns:
235230
PySpark DataFrame.
236231
"""
237-
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
232+
pandas_df = self._execute_query(f"SELECT * FROM {name}")
238233
if not schema:
239234
schema = _pandas_to_spark_schema(pandas_df)
240235
return self.spark.createDataFrame(pandas_df, schema)

src/datacustomcode/io/writer/print.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def validate_dataframe_columns_against_dlo(
9090
schema.
9191
"""
9292
# Get DLO schema (no data, just schema)
93-
dlo_df = self.reader.read_dlo(dlo_name, row_limit=0)
93+
dlo_df = self.reader.read_dlo(dlo_name).limit(0)
9494
dlo_columns = set(dlo_df.columns)
9595
df_columns = set(dataframe.columns)
9696

tests/io/reader/test_query_api.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -296,30 +296,6 @@ def test_read_dmo_with_schema(
296296
args, _ = reader_without_init.spark.createDataFrame.call_args
297297
assert args[1] is custom_schema
298298

299-
def test_read_dlo_with_custom_row_limit(
300-
self, reader_without_init, mock_connection, mock_pandas_dataframe
301-
):
302-
"""Test read_dlo method with custom row_limit."""
303-
reader_without_init._conn = mock_connection
304-
305-
reader_without_init.read_dlo("test_dlo", row_limit=50)
306-
307-
mock_connection.get_pandas_dataframe.assert_called_once_with(
308-
SQL_QUERY_TEMPLATE.format("test_dlo", 50)
309-
)
310-
311-
def test_read_dmo_with_custom_row_limit(
312-
self, reader_without_init, mock_connection, mock_pandas_dataframe
313-
):
314-
"""Test read_dmo method with custom row_limit."""
315-
reader_without_init._conn = mock_connection
316-
317-
reader_without_init.read_dmo("test_dmo", row_limit=25)
318-
319-
mock_connection.get_pandas_dataframe.assert_called_once_with(
320-
SQL_QUERY_TEMPLATE.format("test_dmo", 25)
321-
)
322-
323299
def test_read_dlo_schema_is_lowercase(
324300
self, reader_without_init, mock_connection, mock_pandas_dataframe
325301
):
@@ -414,12 +390,3 @@ def test_read_dmo_no_limit_when_deployed(
414390
SQL_QUERY_TEMPLATE_NO_LIMIT.format("test_dmo")
415391
)
416392

417-
def test_read_dlo_explicit_limit_still_applied_when_deployed(
418-
self, reader_no_limit, mock_connection, mock_pandas_dataframe
419-
):
420-
"""An explicit row_limit always applies, even without a default."""
421-
reader_no_limit._conn = mock_connection
422-
reader_no_limit.read_dlo("test_dlo", row_limit=500)
423-
mock_connection.get_pandas_dataframe.assert_called_once_with(
424-
SQL_QUERY_TEMPLATE.format("test_dlo", 500)
425-
)

0 commit comments

Comments
 (0)