Skip to content

Commit 33fadf8

Browse files
committed
refactor duplicated code, more tests
1 parent 0653e32 commit 33fadf8

4 files changed

Lines changed: 386 additions & 64 deletions

File tree

src/datacustomcode/io/reader/query_api.py

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,51 +22,21 @@
2222
Union,
2323
)
2424

25-
import pandas.api.types as pd_types
26-
from pyspark.sql.types import (
27-
BooleanType,
28-
DoubleType,
29-
LongType,
30-
StringType,
31-
StructField,
32-
StructType,
33-
TimestampType,
34-
)
3525
from salesforcecdpconnector.connection import SalesforceCDPConnection
3626

3727
from datacustomcode.credentials import AuthType, Credentials
3828
from datacustomcode.io.reader.base import BaseDataCloudReader
3929
from datacustomcode.io.reader.sf_cli import SFCLIDataCloudReader
30+
from datacustomcode.io.reader.utils import _pandas_to_spark_schema
4031

4132
if TYPE_CHECKING:
42-
import pandas
4333
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
44-
from pyspark.sql.types import AtomicType
34+
from pyspark.sql.types import AtomicType, StructType
4535

4636
logger = logging.getLogger(__name__)
4737

4838

4939
SQL_QUERY_TEMPLATE: Final = "SELECT * FROM {} LIMIT {}"
50-
PANDAS_TYPE_MAPPING = {
51-
"object": StringType(),
52-
"int64": LongType(),
53-
"float64": DoubleType(),
54-
"bool": BooleanType(),
55-
}
56-
57-
58-
def _pandas_to_spark_schema(
59-
pandas_df: pandas.DataFrame, nullable: bool = True
60-
) -> StructType:
61-
fields = []
62-
for column, dtype in pandas_df.dtypes.items():
63-
spark_type: AtomicType
64-
if pd_types.is_datetime64_any_dtype(dtype):
65-
spark_type = TimestampType()
66-
else:
67-
spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType())
68-
fields.append(StructField(column, spark_type, nullable))
69-
return StructType(fields)
7040

7141

7242
def create_cdp_connection(

src/datacustomcode/io/reader/sf_cli.py

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,47 +25,18 @@
2525
)
2626

2727
import pandas as pd
28-
import pandas.api.types as pd_types
29-
from pyspark.sql.types import (
30-
BooleanType,
31-
DoubleType,
32-
LongType,
33-
StringType,
34-
StructField,
35-
StructType,
36-
TimestampType,
37-
)
3828
import requests
3929

4030
from datacustomcode.io.reader.base import BaseDataCloudReader
31+
from datacustomcode.io.reader.utils import _pandas_to_spark_schema
4132

4233
if TYPE_CHECKING:
4334
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
44-
from pyspark.sql.types import AtomicType
35+
from pyspark.sql.types import AtomicType, StructType
4536

4637
logger = logging.getLogger(__name__)
4738

4839
API_VERSION: Final = "v66.0"
49-
PANDAS_TYPE_MAPPING = {
50-
"object": StringType(),
51-
"int64": LongType(),
52-
"float64": DoubleType(),
53-
"bool": BooleanType(),
54-
}
55-
56-
57-
def _pandas_to_spark_schema(
58-
pandas_df: pd.DataFrame, nullable: bool = True
59-
) -> StructType:
60-
fields = []
61-
for column, dtype in pandas_df.dtypes.items():
62-
spark_type: AtomicType
63-
if pd_types.is_datetime64_any_dtype(dtype):
64-
spark_type = TimestampType()
65-
else:
66-
spark_type = PANDAS_TYPE_MAPPING.get(str(dtype), StringType())
67-
fields.append(StructField(column, spark_type, nullable))
68-
return StructType(fields)
6940

7041

7142
class SFCLIDataCloudReader(BaseDataCloudReader):

tests/io/reader/test_query_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
from datacustomcode.io.reader.query_api import (
2222
SQL_QUERY_TEMPLATE,
2323
QueryAPIDataCloudReader,
24-
_pandas_to_spark_schema,
2524
)
25+
from datacustomcode.io.reader.utils import _pandas_to_spark_schema
2626

2727

2828
class TestPandasToSparkSchema:

0 commit comments

Comments
 (0)