Skip to content

Commit 06e71b3

Browse files
@W-18661856 added 1000 limit for dlo/dmo read, dataframe and output dlo schema validation and replaced static list of std python libs
1 parent 1d6fb56 commit 06e71b3

3 files changed

Lines changed: 57 additions & 56 deletions

File tree

src/datacustomcode/io/reader/query_api.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
logger = logging.getLogger(__name__)
4444

4545

46-
SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {}"
46+
SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}"
4747
PANDAS_TYPE_MAPPING = {
4848
"object": StringType(),
4949
"int64": LongType(),
@@ -85,29 +85,30 @@ def __init__(self, spark: SparkSession) -> None:
8585
)
8686

8787
def read_dlo(
88-
self, name: str, schema: Union[AtomicType, StructType, str, None] = None
88+
self, name: str, schema: Union[AtomicType, StructType, str, None] = None, row_limit: int = 1000
8989
) -> PySparkDataFrame:
9090
"""
91-
Read a Data Lake Object (DLO) from the Data Cloud.
91+
Read a Data Lake Object (DLO) from the Data Cloud, limited to a number of rows.
9292
9393
Args:
9494
name (str): The name of the DLO.
9595
schema (Optional[Union[AtomicType, StructType, str]]): Schema of the DLO.
96+
row_limit (int): Maximum number of rows to fetch.
9697
9798
Returns:
9899
PySparkDataFrame: The PySpark DataFrame.
99100
"""
100-
pandas_df = self._conn.get_pandas_dataframe(SQL_QUERY_TEMPLATE.format(name))
101+
pandas_df = self._conn.get_pandas_dataframe(SQL_QUERY_TEMPLATE.format(name, row_limit))
101102
if not schema:
102103
# auto infer schema
103104
schema = _pandas_to_spark_schema(pandas_df)
104105
spark_dataframe = self.spark.createDataFrame(pandas_df, schema)
105106
return spark_dataframe
106107

107108
def read_dmo(
108-
self, name: str, schema: Union[AtomicType, StructType, str, None] = None
109+
self, name: str, schema: Union[AtomicType, StructType, str, None] = None, row_limit: int = 1000
109110
) -> PySparkDataFrame:
110-
pandas_df = self._conn.get_pandas_dataframe(SQL_QUERY_TEMPLATE.format(name))
111+
pandas_df = self._conn.get_pandas_dataframe(SQL_QUERY_TEMPLATE.format(name, row_limit))
111112
if not schema:
112113
# auto infer schema
113114
schema = _pandas_to_spark_schema(pandas_df)

src/datacustomcode/io/writer/print.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,65 @@
1515

1616

1717
from pyspark.sql import DataFrame as PySparkDataFrame
18+
from pyspark.sql import SparkSession
1819

1920
from datacustomcode.io.writer.base import BaseDataCloudWriter, WriteMode
21+
from datacustomcode.io.reader.query_api import QueryAPIDataCloudReader
2022

2123

2224
class PrintDataCloudWriter(BaseDataCloudWriter):
2325
CONFIG_NAME = "PrintDataCloudWriter"
2426

27+
def validate_dataframe_columns_against_dlo(
28+
self, dataframe: PySparkDataFrame, dlo_name: str, reader: QueryAPIDataCloudReader
29+
) -> None:
30+
"""
31+
Validates that all columns in the given dataframe exist in the DLO schema.
32+
33+
Args:
34+
dataframe (PySparkDataFrame): The DataFrame to validate.
35+
dlo_name (str): The name of the DLO to check against.
36+
reader (QueryAPIDataCloudReader): The reader to use for schema retrieval.
37+
38+
Raises:
39+
ValueError: If any columns in the dataframe are not present in the DLO schema.
40+
"""
41+
# Get DLO schema (no data, just schema)
42+
dlo_df = reader.read_dlo(dlo_name, row_limit=0)
43+
dlo_columns = set(dlo_df.columns)
44+
df_columns = set(dataframe.columns)
45+
46+
# Find columns in dataframe not present in DLO
47+
extra_columns = df_columns - dlo_columns
48+
if extra_columns:
49+
raise ValueError(
50+
f"The following columns are not present in the DLO '{dlo_name}': {sorted(extra_columns)}.\n"
51+
"To fix this error, you can either:\n"
52+
" - Drop these columns from your DataFrame before writing, e.g.,\n"
53+
" dataframe = dataframe.drop({cols})\n"
54+
" - Or, add these columns to the DLO schema in Data Cloud."
55+
.format(cols=sorted(extra_columns))
56+
)
57+
58+
2559
def write_to_dlo(
2660
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode
2761
) -> None:
62+
# Create SparkSession if not already created
63+
spark = SparkSession.builder.appName("YourAppName").getOrCreate()
64+
65+
# Instantiate the reader
66+
reader = QueryAPIDataCloudReader(spark)
67+
68+
# Validate columns before proceeding
69+
self.validate_dataframe_columns_against_dlo(dataframe, name, reader)
70+
2871
dataframe.show()
2972

3073
def write_to_dmo(
3174
self, name: str, dataframe: PySparkDataFrame, write_mode: WriteMode
3275
) -> None:
76+
#The way its validating for DLO and dataframes columns,
77+
# its not going to work for DMO because DMO may not exists, so just show the dataframe.
78+
3379
dataframe.show()

src/datacustomcode/scan.py

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
)
2626

2727
import pydantic
28+
import sys
2829

2930
from datacustomcode.version import get_version
3031

@@ -40,6 +41,7 @@
4041
},
4142
}
4243

44+
STANDARD_LIBS = set(sys.stdlib_module_names)
4345

4446
class DataAccessLayerCalls(pydantic.BaseModel):
4547
read_dlo: frozenset[str]
@@ -137,54 +139,6 @@ def found(self) -> DataAccessLayerCalls:
137139
class ImportVisitor(ast.NodeVisitor):
138140
"""AST Visitor that extracts external package imports from Python code."""
139141

140-
# Standard library modules that should be excluded from requirements
141-
STANDARD_LIBS: ClassVar[set[str]] = {
142-
"abc",
143-
"argparse",
144-
"ast",
145-
"asyncio",
146-
"base64",
147-
"collections",
148-
"configparser",
149-
"contextlib",
150-
"copy",
151-
"csv",
152-
"datetime",
153-
"enum",
154-
"functools",
155-
"glob",
156-
"hashlib",
157-
"http",
158-
"importlib",
159-
"inspect",
160-
"io",
161-
"itertools",
162-
"json",
163-
"logging",
164-
"math",
165-
"os",
166-
"pathlib",
167-
"pickle",
168-
"random",
169-
"re",
170-
"shutil",
171-
"site",
172-
"socket",
173-
"sqlite3",
174-
"string",
175-
"subprocess",
176-
"sys",
177-
"tempfile",
178-
"threading",
179-
"time",
180-
"traceback",
181-
"typing",
182-
"uuid",
183-
"warnings",
184-
"xml",
185-
"zipfile",
186-
}
187-
188142
# Additional packages to exclude from requirements.txt
189143
EXCLUDED_PACKAGES: ClassVar[set[str]] = {
190144
"datacustomcode", # Internal package
@@ -200,7 +154,7 @@ def visit_Import(self, node: ast.Import) -> None:
200154
# Get the top-level package name
201155
package = name.name.split(".")[0]
202156
if (
203-
package not in self.STANDARD_LIBS
157+
package not in STANDARD_LIBS
204158
and package not in self.EXCLUDED_PACKAGES
205159
and not package.startswith("_")
206160
):
@@ -213,7 +167,7 @@ def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
213167
# Get the top-level package
214168
package = node.module.split(".")[0]
215169
if (
216-
package not in self.STANDARD_LIBS
170+
package not in STANDARD_LIBS
217171
and package not in self.EXCLUDED_PACKAGES
218172
and not package.startswith("_")
219173
):

0 commit comments

Comments
 (0)