Skip to content

Commit aa87f55

Browse files
Merging from main and adding testcase
2 parents e24ae3e + 243fbb5 commit aa87f55

12 files changed

Lines changed: 216 additions & 113 deletions

File tree

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
11
# Changelog
22

3+
## 2.0.0
4+
5+
### Breaking Changes
6+
7+
- **Removed the `row_limit` parameter from `read_dlo()` and `read_dmo()`.**
8+
9+
These methods no longer accept a `row_limit` argument. When running locally, reads are automatically capped at 1000 rows to prevent accidentally fetching large datasets during development. When deployed to Data Cloud, no limit is applied and all records are returned.
10+
11+
**Why:** The `row_limit` parameter duplicated PySpark's built-in `.limit()` and created a behavioral difference between local and deployed environments. The 1000-row safety net is now handled internally via the `default_row_limit` setting in `config.yaml`, and deployed environments naturally omit it.
12+
13+
**Migration:** Remove any `row_limit` arguments from your `read_dlo()` and `read_dmo()` calls. If you need a specific number of rows, use PySpark's `.limit()` on the returned DataFrame:
14+
15+
```python
16+
# Before
17+
df = client.read_dlo("MyObject__dll", row_limit=500)
18+
19+
# After
20+
df = client.read_dlo("MyObject__dll").limit(500)
21+
```
22+
323
## 1.0.0
424

525
### Breaking Changes

src/datacustomcode/client.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,31 +185,29 @@ def _new_function_client(cls) -> Client:
185185
)
186186
return cls._instance
187187

188-
def read_dlo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
188+
def read_dlo(self, name: str) -> PySparkDataFrame:
189189
"""Read a DLO from Data Cloud.
190190
191191
Args:
192192
name: The name of the DLO to read.
193-
row_limit: Maximum number of rows to fetch (default: 1000).
194193
195194
Returns:
196195
A PySpark DataFrame containing the DLO data.
197196
"""
198197
self._record_dlo_access(name)
199-
return self._reader.read_dlo(name, row_limit=row_limit) # type: ignore[no-any-return]
198+
return self._reader.read_dlo(name) # type: ignore[no-any-return]
200199

201-
def read_dmo(self, name: str, row_limit: int = 1000) -> PySparkDataFrame:
200+
def read_dmo(self, name: str) -> PySparkDataFrame:
202201
"""Read a DMO from Data Cloud.
203202
204203
Args:
205204
name: The name of the DMO to read.
206-
row_limit: Maximum number of rows to fetch (default: 1000).
207205
208206
Returns:
209207
A PySpark DataFrame containing the DMO data.
210208
"""
211209
self._record_dmo_access(name)
212-
return self._reader.read_dmo(name, row_limit=row_limit) # type: ignore[no-any-return]
210+
return self._reader.read_dmo(name) # type: ignore[no-any-return]
213211

214212
def write_to_dlo(
215213
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode, **kwargs

src/datacustomcode/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ reader_config:
22
type_config_name: QueryAPIDataCloudReader
33
options:
44
credentials_profile: default
5+
default_row_limit: 1000
56

67
writer_config:
78
type_config_name: PrintDataCloudWriter

src/datacustomcode/io/reader/base.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,11 @@ def read_dlo(
3333
self,
3434
name: str,
3535
schema: Union[AtomicType, StructType, str, None] = None,
36-
row_limit: int = 1000,
3736
) -> PySparkDataFrame: ...
3837

3938
@abstractmethod
4039
def read_dmo(
4140
self,
4241
name: str,
4342
schema: Union[AtomicType, StructType, str, None] = None,
44-
row_limit: int = 1000,
4543
) -> PySparkDataFrame: ...

src/datacustomcode/io/reader/query_api.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838

3939
SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}"
40+
SQL_QUERY_TEMPLATE_NO_LIMIT: Final = "SELECT * FROM {}"
4041

4142

4243
def create_cdp_connection(
@@ -122,6 +123,7 @@ def __init__(
122123
credentials_profile: str = "default",
123124
dataspace: Optional[str] = None,
124125
sf_cli_org: Optional[str] = None,
126+
default_row_limit: Optional[int] = None,
125127
) -> None:
126128
"""Initialize QueryAPIDataCloudReader.
127129
@@ -137,8 +139,12 @@ def __init__(
137139
reader delegates to :class:`SFCLIDataCloudReader` which calls
138140
the Data Cloud REST API directly using the token obtained from
139141
``sf org display``, bypassing the CDP token-exchange flow.
142+
default_row_limit: Maximum number of rows to fetch automatically.
143+
When ``None``, no limit is applied (all rows are returned).
144+
Set via ``default_row_limit`` in ``config.yaml`` reader options.
140145
"""
141146
self.spark = spark
147+
self._default_row_limit = default_row_limit
142148
if sf_cli_org:
143149
logger.debug(
144150
f"Initializing QueryAPIDataCloudReader with SF CLI org '{sf_cli_org}'"
@@ -147,6 +153,7 @@ def __init__(
147153
spark=spark,
148154
sf_cli_org=sf_cli_org,
149155
dataspace=dataspace,
156+
default_row_limit=default_row_limit,
150157
)
151158
self._conn = None
152159
else:
@@ -158,19 +165,30 @@ def __init__(
158165
)
159166
self._conn = create_cdp_connection(credentials, dataspace)
160167

168+
def _build_query(self, name: str) -> str:
169+
"""Build a SQL query, applying the configured default row limit.
170+
171+
Args:
172+
name: Object name to query.
173+
174+
Returns:
175+
SQL query string.
176+
"""
177+
if self._default_row_limit is not None:
178+
return SQL_QUERY_TEMPLATE.format(name, self._default_row_limit)
179+
return SQL_QUERY_TEMPLATE_NO_LIMIT.format(name)
180+
161181
def read_dlo(
162182
self,
163183
name: str,
164184
schema: Union[AtomicType, StructType, str, None] = None,
165-
row_limit: int = 1000,
166185
) -> PySparkDataFrame:
167186
"""
168-
Read a Data Lake Object (DLO) from the Data Cloud, limited to a number of rows.
187+
Read a Data Lake Object (DLO) from the Data Cloud.
169188
170189
Args:
171190
name (str): The name of the DLO.
172191
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO.
173-
row_limit (int): Maximum number of rows to fetch.
174192
175193
Returns:
176194
PySparkDataFrame: The PySpark DataFrame.
@@ -179,9 +197,9 @@ def read_dlo(
179197
self, "_sf_cli_reader", None
180198
)
181199
if sf_cli_reader is not None:
182-
return sf_cli_reader.read_dlo(name, schema, row_limit) # type: ignore[no-any-return]
200+
return sf_cli_reader.read_dlo(name, schema) # type: ignore[no-any-return]
183201

184-
query = SQL_QUERY_TEMPLATE.format(name, row_limit)
202+
query = self._build_query(name)
185203

186204
assert self._conn is not None
187205
pandas_df = self._conn.get_pandas_dataframe(query)
@@ -197,15 +215,13 @@ def read_dmo(
197215
self,
198216
name: str,
199217
schema: Union[AtomicType, StructType, str, None] = None,
200-
row_limit: int = 1000,
201218
) -> PySparkDataFrame:
202219
"""
203-
Read a Data Model Object (DMO) from the Data Cloud, limited to a number of rows.
220+
Read a Data Model Object (DMO) from the Data Cloud.
204221
205222
Args:
206223
name (str): The name of the DMO.
207224
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DMO.
208-
row_limit (int): Maximum number of rows to fetch.
209225
210226
Returns:
211227
PySparkDataFrame: The PySpark DataFrame.
@@ -214,9 +230,9 @@ def read_dmo(
214230
self, "_sf_cli_reader", None
215231
)
216232
if sf_cli_reader is not None:
217-
return sf_cli_reader.read_dmo(name, schema, row_limit) # type: ignore[no-any-return]
233+
return sf_cli_reader.read_dmo(name, schema) # type: ignore[no-any-return]
218234

219-
query = SQL_QUERY_TEMPLATE.format(name, row_limit)
235+
query = self._build_query(name)
220236

221237
assert self._conn is not None
222238
pandas_df = self._conn.get_pandas_dataframe(query)

src/datacustomcode/io/reader/sf_cli.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(
5555
spark: SparkSession,
5656
sf_cli_org: str,
5757
dataspace: Optional[str] = None,
58+
default_row_limit: Optional[int] = None,
5859
) -> None:
5960
"""Initialize SFCLIDataCloudReader.
6061
@@ -64,9 +65,13 @@ def __init__(
6465
(e.g. the alias given to ``sf org login web --alias dev1``).
6566
dataspace: Optional dataspace identifier. If ``None`` or
6667
``"default"`` the query runs against the default dataspace.
68+
default_row_limit: Maximum number of rows to fetch automatically.
69+
When ``None``, no limit is applied (all rows are returned).
70+
Set via ``default_row_limit`` in ``config.yaml`` reader options.
6771
"""
6872
self.spark = spark
6973
self.sf_cli_org = sf_cli_org
74+
self._default_row_limit = default_row_limit
7075
self.dataspace = (
7176
dataspace if dataspace and dataspace != "default" else "default"
7277
)
@@ -132,12 +137,14 @@ def _get_token(self) -> tuple[str, str]:
132137
logger.debug(f"Fetched token from SF CLI for org '{self.sf_cli_org}'")
133138
return access_token, instance_url
134139

135-
def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
140+
def _execute_query(self, sql: str) -> pd.DataFrame:
136141
"""Execute *sql* against the Data Cloud REST endpoint.
137142
143+
The configured ``default_row_limit`` is automatically appended as a
144+
``LIMIT`` clause when set (typically for local development).
145+
138146
Args:
139147
sql: Base SQL query (no ``LIMIT`` clause).
140-
row_limit: Maximum rows to return.
141148
142149
Returns:
143150
Pandas DataFrame with query results.
@@ -150,7 +157,10 @@ def _execute_query(self, sql: str, row_limit: int) -> pd.DataFrame:
150157
url = f"{instance_url}/services/data/{API_VERSION}/ssot/query-sql"
151158
headers = {"Authorization": f"Bearer {access_token}"}
152159
params = {"dataspace": self.dataspace}
153-
body = {"sql": f"{sql} LIMIT {row_limit}"}
160+
if self._default_row_limit is not None:
161+
body = {"sql": f"{sql} LIMIT {self._default_row_limit}"}
162+
else:
163+
body = {"sql": sql}
154164

155165
logger.debug(f"Executing Data Cloud query: {body['sql']}")
156166

@@ -190,19 +200,17 @@ def read_dlo(
190200
self,
191201
name: str,
192202
schema: Union[AtomicType, StructType, str, None] = None,
193-
row_limit: int = 1000,
194203
) -> PySparkDataFrame:
195204
"""Read a Data Lake Object (DLO) from Data Cloud.
196205
197206
Args:
198207
name: DLO name.
199208
schema: Optional explicit schema.
200-
row_limit: Maximum rows to fetch.
201209
202210
Returns:
203211
PySpark DataFrame.
204212
"""
205-
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
213+
pandas_df = self._execute_query(f"SELECT * FROM {name}")
206214
if not schema:
207215
schema = _pandas_to_spark_schema(pandas_df)
208216
return self.spark.createDataFrame(pandas_df, schema)
@@ -211,19 +219,17 @@ def read_dmo(
211219
self,
212220
name: str,
213221
schema: Union[AtomicType, StructType, str, None] = None,
214-
row_limit: int = 1000,
215222
) -> PySparkDataFrame:
216223
"""Read a Data Model Object (DMO) from Data Cloud.
217224
218225
Args:
219226
name: DMO name.
220227
schema: Optional explicit schema.
221-
row_limit: Maximum rows to fetch.
222228
223229
Returns:
224230
PySpark DataFrame.
225231
"""
226-
pandas_df = self._execute_query(f"SELECT * FROM {name}", row_limit)
232+
pandas_df = self._execute_query(f"SELECT * FROM {name}")
227233
if not schema:
228234
schema = _pandas_to_spark_schema(pandas_df)
229235
return self.spark.createDataFrame(pandas_df, schema)

src/datacustomcode/io/writer/print.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def validate_dataframe_columns_against_dlo(
9090
schema.
9191
"""
9292
# Get DLO schema (no data, just schema)
93-
dlo_df = self.reader.read_dlo(dlo_name, row_limit=0)
93+
dlo_df = self.reader.read_dlo(dlo_name).limit(0)
9494
dlo_columns = set(dlo_df.columns)
9595
df_columns = set(dataframe.columns)
9696

src/datacustomcode/llm_gateway/types/generate_text_request_builder.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ def set_localization(
5656
if localization is not None:
5757
self._localization = localization
5858
elif locale is not None:
59-
self._localization = {"defaultLocale": locale}
59+
self._localization = {
60+
"defaultLocale": locale,
61+
"inputLocales": [{"locale": locale, "probability": 1.0}],
62+
"expectedLocales": [locale],
63+
}
6064
else:
6165
raise ValueError("Must provide either localization or locale")
6266

0 commit comments

Comments
 (0)