-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_query_api.py
More file actions
303 lines (251 loc) · 10.8 KB
/
test_query_api.py
File metadata and controls
303 lines (251 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
from __future__ import annotations
from unittest.mock import (
MagicMock,
PropertyMock,
patch,
)
import pandas as pd
from pyspark.sql.types import (
BooleanType,
DoubleType,
LongType,
StringType,
StructField,
StructType,
TimestampType,
)
import pytest
from datacustomcode.io.reader.query_api import (
SQL_QUERY_TEMPLATE,
QueryAPIDataCloudReader,
_pandas_to_spark_schema,
)
class TestPandasToSparkSchema:
def test_pandas_to_spark_schema(self):
"""Test conversion of pandas DataFrame schema to Spark schema."""
# Create a test pandas DataFrame with various types
data = {
"string_col": ["a", "b"],
"int_col": [1, 2],
"float_col": [1.0, 2.0],
"bool_col": [True, False],
}
df = pd.DataFrame(data)
# Convert to Spark schema
schema = _pandas_to_spark_schema(df)
# Verify the schema
assert isinstance(schema, StructType)
assert len(schema.fields) == 4
# Check each column's type mapping
field_dict = {field.name: field for field in schema.fields}
assert isinstance(field_dict["string_col"].dataType, StringType)
assert isinstance(field_dict["int_col"].dataType, LongType)
assert isinstance(field_dict["float_col"].dataType, DoubleType)
assert isinstance(field_dict["bool_col"].dataType, BooleanType)
assert all(field.nullable for field in schema.fields)
def test_pandas_to_spark_schema_nullable(self):
"""Test setting nullable parameter in schema conversion."""
df = pd.DataFrame({"col": [1, 2]})
# Test with nullable=False
schema = _pandas_to_spark_schema(df, nullable=False)
assert not schema.fields[0].nullable
def test_pandas_to_spark_schema_datetime_types(self):
"""Test conversion of pandas datetime types to Spark TimestampType."""
# Create test data with different datetime types
data = {
"datetime_ns": pd.to_datetime(
["2023-01-01 10:00:00", "2023-01-02 11:00:00"]
),
"datetime_ns_utc": pd.to_datetime(
["2023-01-01 10:00:00", "2023-01-02 11:00:00"], utc=True
),
"datetime_ms": pd.to_datetime(
["2023-01-01 10:00:00", "2023-01-02 11:00:00"]
).astype("datetime64[ms]"),
"datetime_ms_utc": pd.to_datetime(
["2023-01-01 10:00:00", "2023-01-02 11:00:00"], utc=True
)
.tz_localize(None)
.astype("datetime64[ms]"),
}
df = pd.DataFrame(data)
# Convert to Spark schema
schema = _pandas_to_spark_schema(df)
# Verify the schema
assert isinstance(schema, StructType)
assert len(schema.fields) == 4
# Check that all datetime columns map to TimestampType
field_dict = {field.name: field for field in schema.fields}
for field_name in [
"datetime_ns",
"datetime_ns_utc",
"datetime_ms",
"datetime_ms_utc",
]:
assert isinstance(field_dict[field_name].dataType, TimestampType), (
f"Field {field_name} should be TimestampType, "
f"got {type(field_dict[field_name].dataType)}"
)
assert field_dict[field_name].nullable
# Verify the actual pandas dtypes to ensure our test data has the expected types
assert str(df["datetime_ns"].dtype) == "datetime64[ns]"
assert str(df["datetime_ns_utc"].dtype) == "datetime64[ns, UTC]"
assert str(df["datetime_ms"].dtype) == "datetime64[ms]"
assert str(df["datetime_ms_utc"].dtype) == "datetime64[ms]"
# Completely isolated test class for QueryAPIDataCloudReader
@pytest.mark.usefixtures("patch_all_requests")
class TestQueryAPIDataCloudReader:
# Test-level patch to prevent any HTTP requests during the entire test class
@pytest.fixture(scope="class", autouse=True)
def patch_all_requests(self, request):
"""Patch all potential HTTP request methods to prevent real network calls."""
patches = []
# Patch requests methods
for target in [
"requests.get",
"requests.post",
"requests.session",
"requests.adapters.HTTPAdapter.send",
"urllib3.connectionpool.HTTPConnectionPool.urlopen",
]:
patcher = patch(target)
patches.append(patcher)
patcher.start()
def fin():
for patcher in patches:
patcher.stop()
request.addfinalizer(fin)
@pytest.fixture
def mock_spark_session(self):
"""Create a mock Spark session."""
spark = MagicMock()
# Setup createDataFrame to return itself for chaining
spark.createDataFrame.return_value = spark
return spark
@pytest.fixture
def mock_pandas_dataframe(self):
"""Create a sample pandas DataFrame for testing."""
return pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
@pytest.fixture
def mock_connection(self, mock_pandas_dataframe):
"""Create a completely mocked SalesforceCDPConnection."""
mock_conn = MagicMock()
mock_conn.get_pandas_dataframe.return_value = mock_pandas_dataframe
# Ensure no real authentication happens
mock_auth = MagicMock()
mock_auth.get_token.return_value = ("fake_token", "fake_instance_url")
type(mock_conn).authentication_helper = PropertyMock(return_value=mock_auth)
return mock_conn
@pytest.fixture
def reader_without_init(self, mock_spark_session):
"""
Special fixture that creates a partially-constructed reader instance.
This avoids calling __init__ which tries to make connections.
"""
with patch.object(QueryAPIDataCloudReader, "__init__", return_value=None):
reader = QueryAPIDataCloudReader(None) # None is ignored due to mock
reader.spark = mock_spark_session
yield reader
def test_pandas_to_spark_schema_function(self):
"""Test the pandas to spark schema conversion function directly."""
df = pd.DataFrame({"col": [1, 2]})
schema = _pandas_to_spark_schema(df)
assert isinstance(schema, StructType)
assert len(schema.fields) == 1
def test_read_dlo(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dlo method."""
# Set the mock connection on the reader
reader_without_init._conn = mock_connection
# Call read_dlo - this will use our mock connection now
reader_without_init.read_dlo("test_dlo")
# Verify get_pandas_dataframe was called with the right SQL
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dlo", 1000)
)
# Verify DataFrame was created with auto-inferred schema
reader_without_init.spark.createDataFrame.assert_called_once()
args, _ = reader_without_init.spark.createDataFrame.call_args
assert args[0] is mock_pandas_dataframe # First arg is the pandas DataFrame
assert isinstance(args[1], StructType) # Second arg is the schema
def test_read_dlo_with_schema(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dlo method with explicit schema."""
# Set the mock connection on the reader
reader_without_init._conn = mock_connection
# Create custom schema
custom_schema = StructType(
[
StructField("col1", LongType(), True),
StructField("col2", StringType(), True),
]
)
# Call read_dlo with schema
reader_without_init.read_dlo("test_dlo", schema=custom_schema)
# Verify get_pandas_dataframe was called with the right SQL
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dlo", 1000)
)
# Verify DataFrame was created with provided schema
reader_without_init.spark.createDataFrame.assert_called_once()
args, _ = reader_without_init.spark.createDataFrame.call_args
assert args[1] is custom_schema
def test_read_dmo(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dmo method."""
# Set the mock connection on the reader
reader_without_init._conn = mock_connection
# Call read_dmo
reader_without_init.read_dmo("test_dmo")
# Verify get_pandas_dataframe was called with the right SQL
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dmo", 1000)
)
# Verify DataFrame was created
reader_without_init.spark.createDataFrame.assert_called_once()
args, _ = reader_without_init.spark.createDataFrame.call_args
assert args[0] is mock_pandas_dataframe
def test_read_dmo_with_schema(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dmo method with explicit schema."""
# Set the mock connection on the reader
reader_without_init._conn = mock_connection
# Create custom schema
custom_schema = StructType(
[
StructField("col1", LongType(), True),
StructField("col2", StringType(), True),
]
)
# Call read_dmo with schema
reader_without_init.read_dmo("test_dmo", schema=custom_schema)
# Verify get_pandas_dataframe was called with the right SQL
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dmo", 1000)
)
# Verify DataFrame was created with provided schema
reader_without_init.spark.createDataFrame.assert_called_once()
args, _ = reader_without_init.spark.createDataFrame.call_args
assert args[1] is custom_schema
def test_read_dlo_with_custom_row_limit(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dlo method with custom row_limit."""
reader_without_init._conn = mock_connection
reader_without_init.read_dlo("test_dlo", row_limit=50)
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dlo", 50)
)
def test_read_dmo_with_custom_row_limit(
self, reader_without_init, mock_connection, mock_pandas_dataframe
):
"""Test read_dmo method with custom row_limit."""
reader_without_init._conn = mock_connection
reader_without_init.read_dmo("test_dmo", row_limit=25)
mock_connection.get_pandas_dataframe.assert_called_once_with(
SQL_QUERY_TEMPLATE.format("test_dmo", 25)
)