Skip to content

Commit 81581a5

Browse files
committed
Feat: Lazily create model tables during evaluation
1 parent 50b57db commit 81581a5

7 files changed

Lines changed: 432 additions & 239 deletions

File tree

examples/sushi/config.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
defaults = {"dialect": "duckdb"}
2828
model_defaults = ModelDefaultsConfig(**defaults)
2929
model_defaults_iceberg = ModelDefaultsConfig(**defaults, storage_format="iceberg")
30+
before_all = [
31+
"CREATE SCHEMA IF NOT EXISTS raw",
32+
"DROP VIEW IF EXISTS raw.demographics",
33+
"CREATE VIEW raw.demographics AS (SELECT 1 AS customer_id, '00000' AS zip)",
34+
]
3035

3136

3237
# A DuckDB config, in-memory by default.
@@ -52,6 +57,7 @@
5257
"nomissingexternalmodels",
5358
],
5459
),
60+
before_all=before_all,
5561
)
5662

5763
bigquery_config = Config(
@@ -63,6 +69,7 @@
6369
},
6470
default_gateway="bq",
6571
model_defaults=model_defaults,
72+
before_all=before_all,
6673
)
6774

6875
# A configuration used for SQLMesh tests.
@@ -75,6 +82,7 @@
7582
)
7683
),
7784
model_defaults=model_defaults,
85+
before_all=before_all,
7886
)
7987

8088
# A configuration used for SQLMesh tests with virtual environment mode set to DEV_ONLY.
@@ -84,14 +92,15 @@
8492
"plan": PlanConfig(
8593
auto_categorize_changes=CategorizerConfig.all_full(),
8694
),
87-
}
95+
},
8896
)
8997

9098
# A DuckDB config with a physical schema map.
9199
map_config = Config(
92100
default_connection=DuckDBConnectionConfig(),
93101
physical_schema_mapping={"^sushi$": "company_internal"},
94102
model_defaults=model_defaults,
103+
before_all=before_all,
95104
)
96105

97106
# A config representing isolated systems with a gateway per system
@@ -103,6 +112,7 @@
103112
},
104113
default_gateway="dev",
105114
model_defaults=model_defaults,
115+
before_all=before_all,
106116
)
107117

108118
required_approvers_config = Config(
@@ -137,19 +147,21 @@
137147
),
138148
],
139149
model_defaults=model_defaults,
150+
before_all=before_all,
140151
)
141152

142153

143154
environment_suffix_table_config = Config(
144155
default_connection=DuckDBConnectionConfig(),
145156
model_defaults=model_defaults,
146157
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
158+
before_all=before_all,
147159
)
148160

149161
environment_suffix_catalog_config = environment_suffix_table_config.model_copy(
150162
update={
151163
"environment_suffix_target": EnvironmentSuffixTarget.CATALOG,
152-
}
164+
},
153165
)
154166

155167
CATALOGS = {
@@ -161,6 +173,7 @@
161173
default_connection=DuckDBConnectionConfig(catalogs=CATALOGS),
162174
default_test_connection=DuckDBConnectionConfig(catalogs=CATALOGS),
163175
model_defaults=model_defaults,
176+
before_all=before_all,
164177
)
165178

166179
environment_catalog_mapping_config = Config(
@@ -177,4 +190,5 @@
177190
"^prod$": "prod_catalog",
178191
".*": "dev_catalog",
179192
},
193+
before_all=before_all,
180194
)

sqlmesh/core/engine_adapter/base.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,14 +443,20 @@ def replace_query(
443443
target_table=target_table,
444444
source_columns=source_columns,
445445
)
446+
if not target_columns_to_types and table_exists:
447+
target_columns_to_types = self.columns(target_table)
446448
query = source_queries[0].query_factory()
447-
target_columns_to_types = target_columns_to_types or self.columns(target_table)
448449
self_referencing = any(
449450
quote_identifiers(table) == quote_identifiers(target_table)
450451
for table in query.find_all(exp.Table)
451452
)
452453
# If a query references itself then it must have a table created regardless of approach used.
453454
if self_referencing:
455+
if not target_columns_to_types:
456+
raise SQLMeshError(
457+
f"Cannot create a self-referencing table {target_table.sql(dialect=self.dialect)} without knowing the column types. "
458+
"Try casting the columns to an expected type or defining the columns in the model metadata. "
459+
)
454460
self._create_table_from_columns(
455461
target_table,
456462
target_columns_to_types,
@@ -472,6 +478,7 @@ def replace_query(
472478
**kwargs,
473479
)
474480
if self_referencing:
481+
assert target_columns_to_types is not None
475482
with self.temp_table(
476483
self._select_columns(target_columns_to_types).from_(target_table),
477484
name=target_table,

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ def columns(
5353
self.execute(sql)
5454
resp = self.cursor.fetchall()
5555
if not resp:
56-
raise SQLMeshError("Could not get columns for table '%s'. Table not found.", table_name)
56+
raise SQLMeshError(
57+
f"Could not get columns for table '{table.sql(dialect=self.dialect)}'. Table not found."
58+
)
5759
return {
5860
column_name: exp.DataType.build(data_type, dialect=self.dialect, udt=True)
5961
for column_name, data_type in resp

sqlmesh/core/plan/evaluator.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,16 @@ def visit_physical_layer_update_stage(
205205
success=completion_status is not None and completion_status.is_success
206206
)
207207

208+
def visit_physical_layer_schema_creation_stage(
209+
self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
210+
) -> None:
211+
try:
212+
self.snapshot_evaluator.create_physical_schemas(
213+
stage.snapshots, stage.deployability_index
214+
)
215+
except Exception as ex:
216+
raise PlanError("Plan application failed.") from ex
217+
208218
def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePlan) -> None:
209219
if plan.empty_backfill:
210220
intervals_to_add = []

sqlmesh/core/plan/stages.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,19 @@ class PhysicalLayerUpdateStage:
7171
deployability_index: DeployabilityIndex
7272

7373

74+
@dataclass
75+
class PhysicalLayerSchemaCreationStage:
76+
"""Create the physical schemas for the given snapshots.
77+
78+
Args:
79+
snapshots: Snapshots to create physical schemas for.
80+
deployability_index: Deployability index for this stage.
81+
"""
82+
83+
snapshots: t.List[Snapshot]
84+
deployability_index: DeployabilityIndex
85+
86+
7487
@dataclass
7588
class AuditOnlyRunStage:
7689
"""Run audits only for given snapshots.
@@ -185,6 +198,7 @@ class FinalizeEnvironmentStage:
185198
AfterAllStage,
186199
CreateSnapshotRecordsStage,
187200
PhysicalLayerUpdateStage,
201+
PhysicalLayerSchemaCreationStage,
188202
AuditOnlyRunStage,
189203
RestatementStage,
190204
BackfillStage,
@@ -283,11 +297,19 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
283297
if plan.new_snapshots:
284298
stages.append(CreateSnapshotRecordsStage(snapshots=plan.new_snapshots))
285299

286-
stages.append(
287-
self._get_physical_layer_update_stage(
288-
plan, snapshots, snapshots_to_intervals, deployability_index_for_creation
300+
if plan.skip_backfill or plan.empty_backfill:
301+
stages.append(
302+
self._get_physical_layer_update_stage(
303+
plan, snapshots, snapshots_to_intervals, deployability_index_for_creation
304+
)
305+
)
306+
else:
307+
stages.append(
308+
PhysicalLayerSchemaCreationStage(
309+
snapshots=self._get_snapshots_to_create(plan, snapshots),
310+
deployability_index=deployability_index,
311+
)
289312
)
290-
)
291313

292314
audit_only_snapshots = self._get_audit_only_snapshots(new_snapshots)
293315
if audit_only_snapshots:

0 commit comments

Comments
 (0)