Skip to content

Commit f4326da

Browse files
committed
simplify create 2
1 parent 79869df commit f4326da

2 files changed

Lines changed: 82 additions & 206 deletions

File tree

sqlmesh/core/snapshot/evaluator.py

Lines changed: 82 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def evaluate(
140140
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
141141
deployability_index: t.Optional[DeployabilityIndex] = None,
142142
batch_index: int = 0,
143+
target_table_exists: t.Optional[bool] = None,
143144
**kwargs: t.Any,
144145
) -> t.Optional[str]:
145146
"""Renders the snapshot's model, executes it and stores the result in the snapshot's physical table.
@@ -153,6 +154,7 @@ def evaluate(
153154
allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
154155
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
155156
batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
157+
target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
156158
kwargs: Additional kwargs to pass to the renderer.
157159
158160
Returns:
@@ -167,6 +169,7 @@ def evaluate(
167169
allow_destructive_snapshots=allow_destructive_snapshots or set(),
168170
deployability_index=deployability_index,
169171
batch_index=batch_index,
172+
target_table_exists=target_table_exists,
170173
**kwargs,
171174
)
172175
if result is None or isinstance(result, str):
@@ -345,23 +348,63 @@ def create(
345348
Returns:
346349
CompletionStatus: The status of the creation operation (success, failure, nothing to do).
347350
"""
351+
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
352+
353+
snapshots_to_create = self.get_snapshots_to_create(target_snapshots, deployability_index)
354+
if not snapshots_to_create:
355+
return CompletionStatus.NOTHING_TO_DO
356+
if on_start:
357+
on_start(snapshots_to_create)
358+
359+
self.create_physical_schemas(snapshots_to_create, deployability_index)
360+
self._create_snapshots(
361+
snapshots_to_create=snapshots_to_create,
362+
snapshots={s.name: s for s in snapshots.values()},
363+
deployability_index=deployability_index,
364+
on_complete=on_complete,
365+
allow_destructive_snapshots=allow_destructive_snapshots or set(),
366+
)
367+
return CompletionStatus.SUCCESS
368+
369+
def create_physical_schemas(
370+
self, snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex
371+
) -> None:
372+
"""Creates the physical schemas for the given snapshots.
373+
374+
Args:
375+
snapshots: Snapshots to create physical schemas for.
376+
deployability_index: Determines snapshots that are deployable in the context of this creation.
377+
"""
378+
tables_by_gateway: t.Dict[t.Optional[str], t.List[str]] = defaultdict(list)
379+
for snapshot in snapshots:
380+
if snapshot.is_model and snapshot.is_materialized:
381+
tables_by_gateway[snapshot.model_gateway].append(
382+
snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
383+
)
384+
385+
for gateway, tables in tables_by_gateway.items():
386+
self._create_schemas(tables=tables, gateway=gateway)
387+
388+
def get_snapshots_to_create(
389+
self, target_snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex
390+
) -> t.List[Snapshot]:
391+
"""Returns a list of snapshots that need to have their physical tables created.
392+
393+
Args:
394+
target_snapshots: Target snapshots.
395+
deployability_index: Determines snapshots that are deployable / representative in the context of this creation.
396+
"""
348397
snapshots_with_table_names = defaultdict(set)
349398
tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = (
350399
defaultdict(lambda: defaultdict(set))
351400
)
352-
table_deployability: t.Dict[str, bool] = {}
353-
allow_destructive_snapshots = allow_destructive_snapshots or set()
354-
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
355401

356402
for snapshot in target_snapshots:
357403
if not snapshot.is_model or snapshot.is_symbolic:
358404
continue
359-
is_representative = deployability_index.is_representative(snapshot)
360-
table = exp.to_table(
361-
snapshot.table_name(is_representative), dialect=snapshot.model.dialect
362-
)
405+
is_deployable = deployability_index.is_deployable(snapshot)
406+
table = exp.to_table(snapshot.table_name(is_deployable), dialect=snapshot.model.dialect)
363407
snapshots_with_table_names[snapshot].add(table.name)
364-
table_deployability[table.name] = is_representative
365408
table_schema = d.schema_(table.db, catalog=table.catalog)
366409
tables_by_gateway_and_schema[snapshot.model_gateway][table_schema].add(table.name)
367410

@@ -392,74 +435,31 @@ def _get_data_objects(
392435
existing_objects.update(objs_for_gateway)
393436

394437
snapshots_to_create = []
395-
target_deployability_flags: t.Dict[str, t.List[bool]] = defaultdict(list)
396438
for snapshot, table_names in snapshots_with_table_names.items():
397439
missing_tables = table_names - existing_objects
398440
if missing_tables or (snapshot.is_seed and not snapshot.intervals):
399441
snapshots_to_create.append(snapshot)
400-
for table_name in missing_tables or table_names:
401-
target_deployability_flags[snapshot.name].append(
402-
table_deployability[table_name]
403-
)
404-
target_deployability_flags[snapshot.name].sort()
405-
406-
if not snapshots_to_create:
407-
return CompletionStatus.NOTHING_TO_DO
408-
if on_start:
409-
on_start(snapshots_to_create)
410442

411-
for gateway, tables_by_schema in tables_by_gateway_and_schema.items():
412-
self._create_schemas(tables=tables_by_schema, gateway=gateway)
413-
414-
self._create_snapshots(
415-
snapshots_to_create=snapshots_to_create,
416-
snapshots={s.name: s for s in snapshots.values()},
417-
target_deployability_flags=target_deployability_flags,
418-
deployability_index=deployability_index,
419-
on_complete=on_complete,
420-
allow_destructive_snapshots=allow_destructive_snapshots,
421-
)
422-
return CompletionStatus.SUCCESS
423-
424-
def create_physical_schemas(
425-
self, snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex
426-
) -> None:
427-
"""Creates the physical schemas for the given snapshots.
428-
429-
Args:
430-
snapshots: Snapshots to create physical schemas for.
431-
deployability_index: Determines snapshots that are deployable in the context of this creation.
432-
"""
433-
tables_by_gateway: t.Dict[t.Optional[str], t.List[str]] = defaultdict(list)
434-
for snapshot in snapshots:
435-
if snapshot.is_model and snapshot.is_materialized:
436-
tables_by_gateway[snapshot.model_gateway].append(
437-
snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
438-
)
439-
440-
for gateway, tables in tables_by_gateway.items():
441-
self._create_schemas(tables=tables, gateway=gateway)
443+
return snapshots_to_create
442444

443445
def _create_snapshots(
444446
self,
445447
snapshots_to_create: t.Iterable[Snapshot],
446448
snapshots: t.Dict[str, Snapshot],
447-
target_deployability_flags: t.Dict[str, t.List[bool]],
448-
deployability_index: t.Optional[DeployabilityIndex],
449+
deployability_index: DeployabilityIndex,
449450
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
450451
allow_destructive_snapshots: t.Set[str],
451452
) -> None:
452453
"""Internal method to create tables in parallel."""
453454
with self.concurrent_context():
454455
errors, skipped = concurrent_apply_to_snapshots(
455456
snapshots_to_create,
456-
lambda s: self._create_snapshot(
457+
lambda s: self.create_snapshot(
457458
s,
458459
snapshots=snapshots,
459-
deployability_flags=target_deployability_flags[s.name],
460460
deployability_index=deployability_index,
461-
on_complete=on_complete,
462461
allow_destructive_snapshots=allow_destructive_snapshots,
462+
on_complete=on_complete,
463463
),
464464
self.ddl_concurrent_tasks,
465465
raise_on_error=False,
@@ -666,6 +666,7 @@ def _evaluate_snapshot(
666666
allow_destructive_snapshots: t.Set[str],
667667
deployability_index: t.Optional[DeployabilityIndex],
668668
batch_index: int,
669+
target_table_exists: t.Optional[bool],
669670
**kwargs: t.Any,
670671
) -> t.Optional[str]:
671672
"""Renders the snapshot's model and executes it. The return value depends on whether the limit was specified.
@@ -679,6 +680,7 @@ def _evaluate_snapshot(
679680
allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
680681
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
681682
batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
683+
target_table_exists: Whether the target table exists. If None, the table will be checked for existence.
682684
kwargs: Additional kwargs to pass to the renderer.
683685
"""
684686
if not snapshot.is_model:
@@ -692,10 +694,11 @@ def _evaluate_snapshot(
692694
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
693695
is_snapshot_deployable = deployability_index.is_deployable(snapshot)
694696
target_table_name = snapshot.table_name(is_deployable=is_snapshot_deployable)
695-
target_table_exists = adapter.table_exists(target_table_name)
696697
# https://github.com/TobikoData/sqlmesh/issues/2609
697698
# If there are no existing intervals yet; only consider this a first insert for the first snapshot in the batch
698699
is_first_insert = not _intervals(snapshot, deployability_index) and batch_index == 0
700+
if target_table_exists is None:
701+
target_table_exists = adapter.table_exists(target_table_name)
699702

700703
common_render_kwargs = dict(
701704
start=start,
@@ -785,20 +788,26 @@ def _evaluate_snapshot(
785788

786789
return wap_id
787790

788-
def _create_snapshot(
791+
def create_snapshot(
789792
self,
790793
snapshot: Snapshot,
791794
snapshots: t.Dict[str, Snapshot],
792-
deployability_flags: t.List[bool],
793-
deployability_index: t.Optional[DeployabilityIndex],
794-
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
795+
deployability_index: DeployabilityIndex,
795796
allow_destructive_snapshots: t.Set[str],
797+
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
796798
) -> None:
799+
"""Creates a physical table for the given snapshot.
800+
801+
Args:
802+
snapshot: Snapshot to create.
803+
snapshots: All upstream snapshots to use for expansion and mapping of physical locations.
804+
deployability_index: Determines snapshots that are deployable in the context of this creation.
805+
on_complete: A callback to call on each successfully created database object.
806+
allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
807+
"""
797808
if not snapshot.is_model:
798809
return
799810

800-
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
801-
802811
adapter = self.get_adapter(snapshot.model.gateway)
803812
create_render_kwargs: t.Dict[str, t.Any] = dict(
804813
engine_adapter=adapter,
@@ -823,7 +832,7 @@ def _create_snapshot(
823832
# managed models cannot have their schema mutated because theyre based on queries, so clone + alter wont work
824833
and not snapshot.is_managed
825834
# If the deployable table is missing we can't clone it
826-
and True not in deployability_flags
835+
and not deployability_index.is_deployable(snapshot)
827836
):
828837
self._clone_snapshot_in_dev(
829838
snapshot=snapshot,
@@ -834,30 +843,16 @@ def _create_snapshot(
834843
allow_destructive_snapshots=allow_destructive_snapshots,
835844
)
836845
else:
837-
dry_run = len(deployability_flags) == 1
838-
for is_table_deployable in deployability_flags:
839-
if (
840-
is_table_deployable
841-
and snapshot.model.forward_only
842-
and not deployability_index.is_representative(snapshot)
843-
):
844-
logger.info(
845-
"Skipping creation of the deployable table '%s' for the forward-only model %s. "
846-
"The table will be created when the snapshot is deployed to production",
847-
snapshot.table_name(is_deployable=is_table_deployable),
848-
snapshot.snapshot_id,
849-
)
850-
continue
851-
852-
self._execute_create(
853-
snapshot=snapshot,
854-
table_name=snapshot.table_name(is_deployable=is_table_deployable),
855-
is_table_deployable=is_table_deployable,
856-
deployability_index=deployability_index,
857-
create_render_kwargs=create_render_kwargs,
858-
rendered_physical_properties=rendered_physical_properties,
859-
dry_run=dry_run,
860-
)
846+
is_table_deployable = deployability_index.is_deployable(snapshot)
847+
self._execute_create(
848+
snapshot=snapshot,
849+
table_name=snapshot.table_name(is_deployable=is_table_deployable),
850+
is_table_deployable=is_table_deployable,
851+
deployability_index=deployability_index,
852+
create_render_kwargs=create_render_kwargs,
853+
rendered_physical_properties=rendered_physical_properties,
854+
dry_run=True,
855+
)
861856

862857
if on_complete is not None:
863858
on_complete(snapshot)
@@ -2262,17 +2257,6 @@ def create(
22622257
render_kwargs: t.Dict[str, t.Any],
22632258
**kwargs: t.Any,
22642259
) -> None:
2265-
is_snapshot_representative: bool = kwargs["is_snapshot_representative"]
2266-
if not is_snapshot_representative and is_table_deployable:
2267-
# If the snapshot is not representative, the query may contain references to non-deployable tables or views.
2268-
# This may happen if there was a forward-only change upstream which now requires the view query to point at dev preview tables.
2269-
# Therefore, we postpone the creation of the deployable view until the snapshot is deployed to production.
2270-
logger.info(
2271-
"Skipping creation of the deployable view '%s' for the non-representative snapshot",
2272-
table_name,
2273-
)
2274-
return
2275-
22762260
if self.adapter.table_exists(table_name):
22772261
# Make sure we don't recreate the view to prevent deletion of downstream views in engines with no late
22782262
# binding support (because of DROP CASCADE).

0 commit comments

Comments
 (0)