Skip to content

Commit 5379f8f

Browse files
committed
ensure unique_key is included in physical properties for incremental unique key model
1 parent ab39dc1 commit 5379f8f

1 file changed

Lines changed: 40 additions & 16 deletions

File tree

sqlmesh/core/snapshot/evaluator.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,28 @@ def run_post_statements(self, snapshot: Snapshot, render_kwargs: t.Any) -> None:
20182018
self.adapter.execute(snapshot.model.render_post_statements(**render_kwargs))
20192019

20202020

2021+
def _add_unique_key_to_physical_properties_for_doris(
2022+
model: Model, physical_properties: t.Dict[str, t.Any]
2023+
) -> t.Dict[str, t.Any]:
2024+
"""
2025+
For Doris dialect with INCREMENTAL_BY_UNIQUE_KEY models, ensure unique_key is added
2026+
to physical properties if not already present.
2027+
"""
2028+
if (
2029+
model.dialect == "doris"
2030+
and model.kind.is_incremental_by_unique_key
2031+
and model.unique_key
2032+
and "unique_key" not in physical_properties
2033+
):
2034+
physical_properties = dict(physical_properties)
2035+
physical_properties["unique_key"] = (
2036+
model.unique_key[0]
2037+
if len(model.unique_key) == 1
2038+
else exp.Tuple(expressions=model.unique_key)
2039+
)
2040+
return physical_properties
2041+
2042+
20212043
class MaterializableStrategy(PromotableStrategy, abc.ABC):
20222044
def create(
20232045
self,
@@ -2030,19 +2052,9 @@ def create(
20302052
) -> None:
20312053
ctas_query = model.ctas_query(**render_kwargs)
20322054
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2033-
# If Doris and incremental-by-unique-key, ensure unique_key is present for creation
2034-
if (
2035-
model.dialect == "doris"
2036-
and getattr(model.kind, "is_incremental_by_unique_key", False)
2037-
and model.unique_key
2038-
and "unique_key" not in physical_properties
2039-
):
2040-
physical_properties = dict(physical_properties)
2041-
physical_properties["unique_key"] = (
2042-
model.unique_key[0]
2043-
if len(model.unique_key) == 1
2044-
else exp.Tuple(expressions=model.unique_key)
2045-
)
2055+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
2056+
model, physical_properties
2057+
)
20462058

20472059
logger.info("Creating table '%s'", table_name)
20482060
if model.annotated:
@@ -2157,6 +2169,10 @@ def _replace_query_for_model(
21572169
except Exception:
21582170
columns_to_types, source_columns = None, None
21592171

2172+
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2173+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
2174+
model, physical_properties
2175+
)
21602176
self.adapter.replace_query(
21612177
name,
21622178
query_or_df,
@@ -2165,7 +2181,7 @@ def _replace_query_for_model(
21652181
partitioned_by=model.partitioned_by,
21662182
partition_interval_unit=model.partition_interval_unit,
21672183
clustered_by=model.clustered_by,
2168-
table_properties=kwargs.get("physical_properties", model.physical_properties),
2184+
table_properties=physical_properties,
21692185
table_description=model.description,
21702186
column_descriptions=model.column_descriptions,
21712187
target_columns_to_types=columns_to_types,
@@ -2293,6 +2309,10 @@ def insert(
22932309
table_name,
22942310
render_kwargs=render_kwargs,
22952311
)
2312+
physical_properties = kwargs.get("physical_properties", model.physical_properties)
2313+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
2314+
model, physical_properties
2315+
)
22962316
self.adapter.merge(
22972317
table_name,
22982318
query_or_df,
@@ -2304,7 +2324,7 @@ def insert(
23042324
end=kwargs.get("end"),
23052325
execution_time=kwargs.get("execution_time"),
23062326
),
2307-
physical_properties=kwargs.get("physical_properties", model.physical_properties),
2327+
physical_properties=physical_properties,
23082328
source_columns=source_columns,
23092329
)
23102330

@@ -3100,13 +3120,17 @@ def create(
31003120
if is_table_deployable and is_snapshot_deployable:
31013121
# We could deploy this to prod; create a proper managed table
31023122
logger.info("Creating managed table: %s", table_name)
3123+
physical_properties = kwargs.get("physical_properties", model.physical_properties)
3124+
physical_properties = _add_unique_key_to_physical_properties_for_doris(
3125+
model, physical_properties
3126+
)
31033127
self.adapter.create_managed_table(
31043128
table_name=table_name,
31053129
query=model.render_query_or_raise(**render_kwargs),
31063130
target_columns_to_types=model.columns_to_types,
31073131
partitioned_by=model.partitioned_by,
31083132
clustered_by=model.clustered_by,
3109-
table_properties=kwargs.get("physical_properties", model.physical_properties),
3133+
table_properties=physical_properties,
31103134
table_description=model.description,
31113135
column_descriptions=model.column_descriptions,
31123136
table_format=model.table_format,

0 commit comments

Comments
 (0)