Skip to content

Commit ca2066f

Browse files
committed
Fix(athena): Fix SCD_TYPE_2 when using table_format=iceberg
1 parent a151747 commit ca2066f

3 files changed

Lines changed: 75 additions & 9 deletions

File tree

.circleci/continue_config.yml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -308,16 +308,16 @@ workflows:
308308
matrix:
309309
parameters:
310310
engine:
311-
- snowflake
312-
- databricks
313-
- redshift
314-
- bigquery
315-
- clickhouse-cloud
311+
#- snowflake
312+
#- databricks
313+
#- redshift
314+
#- bigquery
315+
#- clickhouse-cloud
316316
- athena
317-
filters:
318-
branches:
319-
only:
320-
- main
317+
#filters:
318+
# branches:
319+
# only:
320+
# - main
321321
- trigger_private_tests:
322322
requires:
323323
- style_and_cicd_tests

sqlmesh/core/snapshot/evaluator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,6 +1711,7 @@ def insert(
17111711
invalidate_hard_deletes=model.kind.invalidate_hard_deletes,
17121712
updated_at_as_valid_from=model.kind.updated_at_as_valid_from,
17131713
columns_to_types=columns_to_types,
1714+
table_format=model.table_format,
17141715
table_description=model.description,
17151716
column_descriptions=model.column_descriptions,
17161717
truncate=is_first_insert,
@@ -1727,6 +1728,7 @@ def insert(
17271728
invalidate_hard_deletes=model.kind.invalidate_hard_deletes,
17281729
execution_time_as_valid_from=model.kind.execution_time_as_valid_from,
17291730
columns_to_types=columns_to_types,
1731+
table_format=model.table_format,
17301732
table_description=model.description,
17311733
column_descriptions=model.column_descriptions,
17321734
truncate=is_first_insert,
@@ -1756,6 +1758,7 @@ def append(
17561758
invalidate_hard_deletes=model.kind.invalidate_hard_deletes,
17571759
updated_at_as_valid_from=model.kind.updated_at_as_valid_from,
17581760
columns_to_types=columns_to_types,
1761+
table_format=model.table_format,
17591762
table_description=model.description,
17601763
column_descriptions=model.column_descriptions,
17611764
**kwargs,
@@ -1769,6 +1772,7 @@ def append(
17691772
valid_to_col=model.kind.valid_to_name,
17701773
check_columns=model.kind.columns,
17711774
columns_to_types=columns_to_types,
1775+
table_format=model.table_format,
17721776
invalidate_hard_deletes=model.kind.invalidate_hard_deletes,
17731777
execution_time_as_valid_from=model.kind.execution_time_as_valid_from,
17741778
table_description=model.description,

tests/core/engine_adapter/integration/test_integration_athena.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,3 +466,65 @@ def time_formatter(time: TimeLike, _: t.Optional[t.Dict[str, exp.DataType]]) ->
466466
result = engine_adapter.fetchdf(exp.select("*").from_(table))
467467
assert len(result) == 4
468468
assert sorted(result["id"].tolist()) == [1, 2, 4, 5]
469+
470+
471+
def test_scd_type_2_iceberg_timestamps(
472+
ctx: TestContext, engine_adapter: AthenaEngineAdapter
473+
) -> None:
474+
src_table = ctx.table("src_table")
475+
scd_model_table = ctx.table("scd_model")
476+
477+
base_data = pd.DataFrame(
478+
[
479+
{"id": 1, "ts": datetime.datetime(2023, 1, 1, 12, 13, 14)},
480+
{"id": 2, "ts": datetime.datetime(2023, 1, 1, 8, 13, 14)},
481+
{"id": 3, "ts": datetime.datetime(2023, 1, 2, 11, 10, 0)},
482+
{"id": 4, "ts": datetime.datetime(2023, 1, 2, 8, 10, 0)},
483+
{"id": 5, "ts": datetime.datetime(2023, 1, 3, 16, 5, 14)},
484+
{"id": 6, "ts": datetime.datetime(2023, 1, 3, 16, 5, 14)},
485+
]
486+
)
487+
488+
engine_adapter.ctas(
489+
table_name=src_table,
490+
query_or_df=base_data,
491+
)
492+
493+
sqlmesh_context, model = ctx.upsert_sql_model(
494+
f"""
495+
MODEL (
496+
name {scd_model_table},
497+
kind SCD_TYPE_2_BY_TIME (
498+
unique_key id,
499+
updated_at_name ts,
500+
time_data_type timestamp(6)
501+
),
502+
start '2020-01-01',
503+
cron '@daily',
504+
table_format iceberg
505+
);
506+
507+
SELECT
508+
id, ts::timestamp(6) as ts
509+
FROM {src_table};
510+
"""
511+
)
512+
513+
assert model.table_format == "iceberg"
514+
515+
# throws if the temp tables created by the SCD Type 2 strategy are Hive tables instead of Iceberg
516+
# because the Iceberg timestamp(6) type isnt supported in Hive
517+
plan = sqlmesh_context.plan(auto_apply=True)
518+
519+
assert len(plan.snapshots) == 1
520+
test_table_snapshot = list(plan.snapshots.values())[0]
521+
test_table_physical_name = exp.to_table(test_table_snapshot.table_name())
522+
523+
assert engine_adapter._query_table_type(test_table_physical_name) == "iceberg"
524+
timestamp_columns = [
525+
v
526+
for k, v in engine_adapter.columns(test_table_physical_name).items()
527+
if k in {"ts", "valid_from", "valid_to"}
528+
]
529+
assert len(timestamp_columns) == 3
530+
assert all([v.sql(dialect="athena").lower() == "timestamp(6)" for v in timestamp_columns])

0 commit comments

Comments
 (0)