Skip to content

Commit 2a426c4

Browse files
committed
Return completion status instead of using callback
1 parent 51d3f94 commit 2a426c4

4 files changed

Lines changed: 33 additions & 30 deletions

File tree

sqlmesh/core/plan/evaluator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
SnapshotInfoLike,
3838
SnapshotTableInfo,
3939
)
40-
from sqlmesh.core.scheduler import CompletionStatus
40+
from sqlmesh.utils import CompletionStatus
4141
from sqlmesh.core.state_sync import StateSync
4242
from sqlmesh.core.state_sync.base import PromotionResult
4343
from sqlmesh.core.user import User
@@ -276,17 +276,20 @@ def _should_create(s: Snapshot) -> bool:
276276
completed = False
277277
progress_stopped = False
278278
try:
279-
self.snapshot_evaluator.create(
279+
completion_status = self.snapshot_evaluator.create(
280280
snapshots_to_create,
281281
snapshots,
282282
allow_destructive_snapshots=plan.allow_destructive_models,
283283
deployability_index=deployability_index,
284284
on_start=lambda x: self.console.start_creation_progress(
285285
x, plan.environment, self.default_catalog
286286
),
287-
on_no_work=self.console.log_status_update,
288287
on_complete=self.console.update_creation_progress,
289288
)
289+
if completion_status.is_nothing_to_do:
290+
self.console.log_status_update(
291+
"\n[green]SKIP: No physical layer updates to perform[/green]\n"
292+
)
290293
completed = True
291294
except NodeExecutionFailedError as ex:
292295
self.console.stop_creation_progress(success=False)

sqlmesh/core/scheduler.py

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from __future__ import annotations
2-
from enum import Enum
32
import logging
43
import typing as t
54
from sqlglot import exp
@@ -31,6 +30,7 @@
3130
parent_snapshots_by_name,
3231
)
3332
from sqlmesh.core.state_sync import StateSync
33+
from sqlmesh.utils import CompletionStatus
3434
from sqlmesh.utils.concurrency import concurrent_apply_to_dag, NodeExecutionFailedError
3535
from sqlmesh.utils.dag import DAG
3636
from sqlmesh.utils.date import (
@@ -48,24 +48,6 @@
4848
SchedulingUnit = t.Tuple[str, t.Tuple[Interval, int]]
4949

5050

51-
class CompletionStatus(Enum):
52-
SUCCESS = "success"
53-
FAILURE = "failure"
54-
NOTHING_TO_DO = "nothing_to_do"
55-
56-
@property
57-
def is_success(self) -> bool:
58-
return self == CompletionStatus.SUCCESS
59-
60-
@property
61-
def is_failure(self) -> bool:
62-
return self == CompletionStatus.FAILURE
63-
64-
@property
65-
def is_nothing_to_do(self) -> bool:
66-
return self == CompletionStatus.NOTHING_TO_DO
67-
68-
6951
class Scheduler:
7052
"""Schedules and manages the evaluation of snapshots.
7153

sqlmesh/core/snapshot/evaluator.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
ViewKind,
5252
CustomKind,
5353
)
54-
54+
from sqlmesh.utils import CompletionStatus
5555
from sqlmesh.core.schema_diff import has_drop_alteration, get_dropped_column_names
5656
from sqlmesh.core.snapshot import (
5757
DeployabilityIndex,
@@ -279,20 +279,21 @@ def create(
279279
snapshots: t.Dict[SnapshotId, Snapshot],
280280
deployability_index: t.Optional[DeployabilityIndex] = None,
281281
on_start: t.Optional[t.Callable] = None,
282-
on_no_work: t.Optional[t.Callable] = None,
283282
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
284283
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
285-
) -> None:
284+
) -> CompletionStatus:
286285
"""Creates a physical snapshot schema and table for the given collection of snapshots.
287286
288287
Args:
289288
target_snapshots: Target snapshots.
290289
snapshots: Mapping of snapshot ID to snapshot.
291290
deployability_index: Determines snapshots that are deployable in the context of this creation.
292291
on_start: A callback to initialize the snapshot creation progress bar.
293-
on_no_work: A callback to call when no snapshots are to be created.
294292
on_complete: A callback to call on each successfully created snapshot.
295293
allow_destructive_snapshots: Set of snapshots that are allowed to have destructive schema changes.
294+
295+
Returns:
296+
CompletionStatus: The status of the creation operation (success, failure, nothing to do).
296297
"""
297298
snapshots_with_table_names = defaultdict(set)
298299
tables_by_schema = defaultdict(set)
@@ -350,9 +351,7 @@ def _get_data_objects(schema: exp.Table, gateway: t.Optional[str] = None) -> t.S
350351
target_deployability_flags[snapshot.name].sort()
351352

352353
if not snapshots_to_create:
353-
if on_no_work:
354-
on_no_work("\n[green]SKIP: No physical layer updates to perform[/green]\n")
355-
return
354+
return CompletionStatus.NOTHING_TO_DO
356355
if on_start:
357356
on_start(len(snapshots_to_create))
358357
self._create_schemas(tables_by_schema, gateway_by_schema)
@@ -364,6 +363,7 @@ def _get_data_objects(schema: exp.Table, gateway: t.Optional[str] = None) -> t.S
364363
on_complete=on_complete,
365364
allow_destructive_snapshots=allow_destructive_snapshots,
366365
)
366+
return CompletionStatus.SUCCESS
367367

368368
def _create_snapshots(
369369
self,

sqlmesh/utils/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from collections import defaultdict
1717
from contextlib import contextmanager
1818
from copy import deepcopy
19-
from enum import IntEnum
19+
from enum import IntEnum, Enum
2020
from functools import lru_cache, reduce, wraps
2121
from pathlib import Path
2222

@@ -347,3 +347,21 @@ class Verbosity(IntEnum):
347347
DEFAULT = 0
348348
VERBOSE = 1
349349
VERY_VERBOSE = 2
350+
351+
352+
class CompletionStatus(Enum):
353+
SUCCESS = "success"
354+
FAILURE = "failure"
355+
NOTHING_TO_DO = "nothing_to_do"
356+
357+
@property
358+
def is_success(self) -> bool:
359+
return self == CompletionStatus.SUCCESS
360+
361+
@property
362+
def is_failure(self) -> bool:
363+
return self == CompletionStatus.FAILURE
364+
365+
@property
366+
def is_nothing_to_do(self) -> bool:
367+
return self == CompletionStatus.NOTHING_TO_DO

0 commit comments

Comments
 (0)