Skip to content

Commit 016b4f8

Browse files
committed
Lowercase columns for local , to match Data Cloud
1 parent f914a25 commit 016b4f8

4 files changed

Lines changed: 59 additions & 7 deletions

File tree

src/datacustomcode/io/reader/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,5 @@ def _pandas_to_spark_schema(
4949
spark_type = TimestampType()
5050
else:
5151
spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType())
52-
fields.append(StructField(column, spark_type, nullable))
52+
fields.append(StructField(column.lower(), spark_type, nullable))
5353
return StructType(fields)

src/datacustomcode/templates/script/payload/entrypoint.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ def main():
1010
df = client.read_dlo("Account_std__dll")
1111

1212
# Perform transformations on the DataFrame
13-
df_upper1 = df.withColumn("Description__c", upper(col("Description__c")))
13+
df_upper1 = df.withColumn("description__c", upper(col("description__c")))
1414

1515
# Drop specific columns related to relationships
16-
df_upper1 = df_upper1.drop("SfdcOrganizationId__c")
17-
df_upper1 = df_upper1.drop("KQ_Id__c")
16+
df_upper1 = df_upper1.drop("sfdcorganizationid__c")
17+
df_upper1 = df_upper1.drop("kq_id__c")
1818

1919
# Save the transformed DataFrame
2020
dlo_name = "Account_std_copy__dll"

tests/io/reader/test_query_api.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ def test_pandas_to_spark_schema_nullable(self):
6060
schema = _pandas_to_spark_schema(df, nullable=False)
6161
assert not schema.fields[0].nullable
6262

63+
def test_pandas_to_spark_schema_lowercases_columns(self):
64+
"""Column names from the API are lowercased to match Data Cloud."""
65+
df = pd.DataFrame({"UnitPrice__c": [1.0], "Quantity__c": [2], "Name__c": ["a"]})
66+
schema = _pandas_to_spark_schema(df)
67+
assert [f.name for f in schema.fields] == [
68+
"unitprice__c",
69+
"quantity__c",
70+
"name__c",
71+
]
72+
73+
def test_pandas_to_spark_schema_already_lowercase_is_idempotent(self):
74+
"""Already-lowercase column names are returned unchanged."""
75+
df = pd.DataFrame({"unitprice__c": [1.0], "quantity__c": [2]})
76+
schema = _pandas_to_spark_schema(df)
77+
assert [f.name for f in schema.fields] == ["unitprice__c", "quantity__c"]
78+
6379
def test_pandas_to_spark_schema_datetime_types(self):
6480
"""Test conversion of pandas datetime types to Spark TimestampType."""
6581

@@ -147,8 +163,8 @@ def mock_spark_session(self):
147163

148164
@pytest.fixture
149165
def mock_pandas_dataframe(self):
150-
"""Create a sample pandas DataFrame for testing."""
151-
return pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
166+
"""Sample pandas DataFrame with PascalCase columns, as the QueryAPI returns."""
167+
return pd.DataFrame({"Col1__c": [1, 2], "Col2__c": ["a", "b"]})
152168

153169
@pytest.fixture
154170
def mock_connection(self, mock_pandas_dataframe):
@@ -301,3 +317,27 @@ def test_read_dmo_with_custom_row_limit(
301317
mock_connection.get_pandas_dataframe.assert_called_once_with(
302318
SQL_QUERY_TEMPLATE.format("test_dmo", 25)
303319
)
320+
321+
def test_read_dlo_schema_is_lowercase(
322+
self, reader_without_init, mock_connection, mock_pandas_dataframe
323+
):
324+
"""read_dlo returns a schema with all-lowercase field names even when the
325+
QueryAPI returns PascalCase column names."""
326+
reader_without_init._conn = mock_connection
327+
328+
reader_without_init.read_dlo("test_dlo")
329+
330+
_, schema_arg = reader_without_init.spark.createDataFrame.call_args[0]
331+
assert all(f.name == f.name.lower() for f in schema_arg.fields)
332+
333+
def test_read_dmo_schema_is_lowercase(
334+
self, reader_without_init, mock_connection, mock_pandas_dataframe
335+
):
336+
"""read_dmo returns a schema with all-lowercase field names even when the
337+
QueryAPI returns PascalCase column names."""
338+
reader_without_init._conn = mock_connection
339+
340+
reader_without_init.read_dmo("test_dmo")
341+
342+
_, schema_arg = reader_without_init.spark.createDataFrame.call_args[0]
343+
assert all(f.name == f.name.lower() for f in schema_arg.fields)

tests/io/reader/test_sf_cli.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ def reader(self):
311311

312312
@pytest.fixture
313313
def sample_df(self):
314-
return pd.DataFrame({"id": [1, 2], "name": ["a", "b"]})
314+
"""DataFrame with PascalCase columns, as the REST API metadata returns."""
315+
return pd.DataFrame({"Id__c": [1, 2], "Name__c": ["a", "b"]})
315316

316317
@pytest.mark.parametrize(
317318
"method,obj_name",
@@ -348,6 +349,17 @@ def test_auto_infers_schema_when_none_given(self, reader, sample_df, method):
348349
_, schema_arg = reader.spark.createDataFrame.call_args[0]
349350
assert isinstance(schema_arg, StructType)
350351

352+
@pytest.mark.parametrize("method", ["read_dlo", "read_dmo"])
353+
def test_auto_infers_schema_lowercases_pascal_case_columns(
354+
self, reader, sample_df, method
355+
):
356+
"""Schema is lowercased so local results match Data Cloud column names."""
357+
with patch.object(reader, "_execute_query", return_value=sample_df):
358+
getattr(reader, method)("SomeObj")
359+
360+
_, schema_arg = reader.spark.createDataFrame.call_args[0]
361+
assert all(f.name == f.name.lower() for f in schema_arg.fields)
362+
351363
@pytest.mark.parametrize("method", ["read_dlo", "read_dmo"])
352364
def test_uses_provided_schema(self, reader, sample_df, method):
353365
from pyspark.sql.types import (

0 commit comments

Comments
 (0)