Skip to content

Commit c088d74

Browse files
committed
Fix!: Change how partitioned_by is parsed so that partition expressions with specialized AST nodes are captured
1 parent f96488c commit c088d74

3 files changed

Lines changed: 60 additions & 3 deletions

File tree

sqlmesh/core/dialect.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,12 @@ def parse(self: Parser) -> t.Optional[exp.Expression]:
609609
value = self.expression(ModelKind, this=kind.value, expressions=props)
610610
elif key == "expression":
611611
value = self._parse_conjunction()
612+
elif key == "partitioned_by":
613+
partitioned_by = self._parse_partitioned_by()
614+
if isinstance(partitioned_by.this, exp.Schema):
615+
value = exp.tuple_(*partitioned_by.this.expressions)
616+
else:
617+
value = partitioned_by.this
612618
else:
613619
value = self._parse_bracket(self._parse_field(any_token=True))
614620

tests/core/engine_adapter/test_athena.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,51 @@ def test_drop_partitions_from_metastore_uses_batches(
435435
# third call 50-62
436436
assert calls[2][1]["PartitionsToDelete"][0]["Values"][0] == "50"
437437
assert calls[2][1]["PartitionsToDelete"][-1]["Values"][0] == "62"
438+
439+
440+
def test_iceberg_partition_transforms(adapter: AthenaEngineAdapter):
441+
expressions = d.parse(
442+
"""
443+
MODEL (
444+
name test_table,
445+
kind FULL,
446+
table_format iceberg,
447+
partitioned_by (month(business_date), bucket(4, colb), colc)
448+
);
449+
450+
SELECT 1::timestamp AS business_date, 2::varchar as colb, 'foo' as colc;
451+
"""
452+
)
453+
model: SqlModel = t.cast(SqlModel, load_sql_based_model(expressions))
454+
455+
assert model.partitioned_by == [
456+
exp.Month(this=exp.column("business_date", quoted=True)),
457+
exp.PartitionedByBucket(
458+
this=exp.column("colb", quoted=True), expression=exp.Literal.number(4)
459+
),
460+
exp.column("colc", quoted=True),
461+
]
462+
463+
adapter.s3_warehouse_location = "s3://bucket/prefix/"
464+
465+
adapter.create_table(
466+
table_name=model.name,
467+
columns_to_types=model.columns_to_types_or_raise,
468+
partitioned_by=model.partitioned_by,
469+
table_format=model.table_format,
470+
)
471+
472+
adapter.ctas(
473+
table_name=model.name,
474+
columns_to_types=model.columns_to_types_or_raise,
475+
partitioned_by=model.partitioned_by,
476+
query_or_df=model.ctas_query(),
477+
table_format=model.table_format,
478+
)
479+
480+
assert to_sql_calls(adapter) == [
481+
# Hive syntax - create table
482+
"""CREATE TABLE IF NOT EXISTS `test_table` (`business_date` TIMESTAMP, `colb` STRING, `colc` STRING) PARTITIONED BY (MONTH(`business_date`), BUCKET(4, `colb`), `colc`) LOCATION 's3://bucket/prefix/test_table/' TBLPROPERTIES ('table_type'='iceberg')""",
483+
# Trino syntax - CTAS
484+
"""CREATE TABLE IF NOT EXISTS "test_table" WITH (table_type='iceberg', partitioning=ARRAY['MONTH(business_date)', 'BUCKET(colb, 4)', 'colc'], location='s3://bucket/prefix/test_table/', is_external=false) AS SELECT CAST("business_date" AS TIMESTAMP) AS "business_date", CAST("colb" AS VARCHAR) AS "colb", CAST("colc" AS VARCHAR) AS "colc" FROM (SELECT CAST(1 AS TIMESTAMP) AS "business_date", CAST(2 AS VARCHAR) AS "colb", 'foo' AS "colc" LIMIT 0) AS "_subquery\"""",
485+
]

tests/core/test_context.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1978,19 +1978,22 @@ def test_plan_audit_intervals(tmp_path: pathlib.Path, capsys, caplog):
19781978
)
19791979
)
19801980

1981-
ctx.plan(
1981+
plan = ctx.plan(
19821982
environment="dev", auto_apply=True, no_prompts=True, start="2025-02-01", end="2025-02-01"
19831983
)
19841984

1985+
date_snapshot = next(s for s in plan.new_snapshots if "date_example" in s.name)
1986+
timestamp_snapshot = next(s for s in plan.new_snapshots if "timestamp_example" in s.name)
1987+
19851988
# Case 1: The timestamp audit should be in the inclusive range ['2025-02-01 00:00:00', '2025-02-01 23:59:59.999999']
19861989
assert (
1987-
"""SELECT COUNT(*) FROM (SELECT ("timestamp_id") AS "timestamp_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__timestamp_example__2797548448" AS "sqlmesh_audit__timestamp_example__2797548448" WHERE "timestamp_id" BETWEEN CAST('2025-02-01 00:00:00' AS TIMESTAMP) AND CAST('2025-02-01 23:59:59.999999' AS TIMESTAMP)) AS "_q_0" WHERE TRUE GROUP BY ("timestamp_id") HAVING COUNT(*) > 1) AS "audit\""""
1990+
f"""SELECT COUNT(*) FROM (SELECT ("timestamp_id") AS "timestamp_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__timestamp_example__{timestamp_snapshot.version}" AS "sqlmesh_audit__timestamp_example__{timestamp_snapshot.version}" WHERE "timestamp_id" BETWEEN CAST('2025-02-01 00:00:00' AS TIMESTAMP) AND CAST('2025-02-01 23:59:59.999999' AS TIMESTAMP)) AS "_q_0" WHERE TRUE GROUP BY ("timestamp_id") HAVING COUNT(*) > 1) AS "audit\""""
19881991
in caplog.text
19891992
)
19901993

19911994
# Case 2: The date audit should be in the inclusive range ['2025-02-01', '2025-02-01']
19921995
assert (
1993-
"""SELECT COUNT(*) FROM (SELECT ("date_id") AS "date_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__date_example__4100277424" AS "sqlmesh_audit__date_example__4100277424" WHERE "date_id" BETWEEN CAST('2025-02-01' AS DATE) AND CAST('2025-02-01' AS DATE)) AS "_q_0" WHERE TRUE GROUP BY ("date_id") HAVING COUNT(*) > 1) AS "audit\""""
1996+
f"""SELECT COUNT(*) FROM (SELECT ("date_id") AS "date_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__date_example__{date_snapshot.version}" AS "sqlmesh_audit__date_example__{date_snapshot.version}" WHERE "date_id" BETWEEN CAST('2025-02-01' AS DATE) AND CAST('2025-02-01' AS DATE)) AS "_q_0" WHERE TRUE GROUP BY ("date_id") HAVING COUNT(*) > 1) AS "audit\""""
19941997
in caplog.text
19951998
)
19961999

0 commit comments

Comments
 (0)