1515
1616
1717from pyspark .sql import DataFrame as PySparkDataFrame
18- from pyspark .sql import SparkSession
1918
20- from datacustomcode .io .writer .base import BaseDataCloudWriter , WriteMode
2119from datacustomcode .io .reader .query_api import QueryAPIDataCloudReader
20+ from datacustomcode .io .writer .base import BaseDataCloudWriter , WriteMode
2221
2322
2423class PrintDataCloudWriter (BaseDataCloudWriter ):
2524 CONFIG_NAME = "PrintDataCloudWriter"
2625
2726 def validate_dataframe_columns_against_dlo (
28- self , dataframe : PySparkDataFrame , dlo_name : str , reader : QueryAPIDataCloudReader
27+ self ,
28+ dataframe : PySparkDataFrame ,
29+ dlo_name : str ,
30+ reader : QueryAPIDataCloudReader ,
2931 ) -> None :
3032 """
3133 Validates that all columns in the given dataframe exist in the DLO schema.
@@ -36,7 +38,8 @@ def validate_dataframe_columns_against_dlo(
3638 reader (QueryAPIDataCloudReader): The reader to use for schema retrieval.
3739
3840 Raises:
39- ValueError: If any columns in the dataframe are not present in the DLO schema.
41+ ValueError: If any columns in the dataframe are not present in the DLO
42+ schema.
4043 """
4144 # Get DLO schema (no data, just schema)
4245 dlo_df = reader .read_dlo (dlo_name , row_limit = 0 )
@@ -47,23 +50,22 @@ def validate_dataframe_columns_against_dlo(
4750 extra_columns = df_columns - dlo_columns
4851 if extra_columns :
4952 raise ValueError (
50- f"The following columns are not present in the DLO '{ dlo_name } ': { sorted (extra_columns )} .\n "
53+ "The following columns are not present in the \n "
54+ f"DLO '{ dlo_name } ': { sorted (extra_columns )} .\n "
5155 "To fix this error, you can either:\n "
5256 " - Drop these columns from your DataFrame before writing, e.g.,\n "
5357 " dataframe = dataframe.drop({cols})\n "
54- " - Or, add these columns to the DLO schema in Data Cloud."
55- .format (cols = sorted (extra_columns ))
58+ " - Or, add these columns to the DLO schema in Data Cloud." .format (
59+ cols = sorted (extra_columns )
60+ )
5661 )
5762
58-
5963 def write_to_dlo (
6064 self , name : str , dataframe : PySparkDataFrame , write_mode : WriteMode
6165 ) -> None :
62- # Create SparkSession if not already created
63- spark = SparkSession .builder .appName ("YourAppName" ).getOrCreate ()
6466
6567 # Instantiate the reader
66- reader = QueryAPIDataCloudReader (spark )
68+ reader = QueryAPIDataCloudReader (self . spark )
6769
6870 # Validate columns before proceeding
6971 self .validate_dataframe_columns_against_dlo (dataframe , name , reader )
@@ -73,7 +75,8 @@ def write_to_dlo(
7375 def write_to_dmo (
7476 self , name : str , dataframe : PySparkDataFrame , write_mode : WriteMode
7577 ) -> None :
76- #The way its validating for DLO and dataframes columns,
77- # its not going to work for DMO because DMO may not exists, so just show the dataframe.
78+ # The way its validating for DLO and dataframes columns,
79+ # its not going to work for DMO because DMO may not exists,
80+ # so just show the dataframe.
7881
7982 dataframe .show ()
0 commit comments