Skip to content

Commit 73383f7

Browse files
committed
Return completion status instead of using callback
1 parent 4322f17 commit 73383f7

4 files changed

Lines changed: 33 additions & 30 deletions

File tree

sqlmesh/core/plan/evaluator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
SnapshotInfoLike,
3838
SnapshotTableInfo,
3939
)
40-
from sqlmesh.core.scheduler import CompletionStatus
40+
from sqlmesh.utils import CompletionStatus
4141
from sqlmesh.core.state_sync import StateSync
4242
from sqlmesh.core.state_sync.base import PromotionResult
4343
from sqlmesh.core.user import User
@@ -276,17 +276,20 @@ def _should_create(s: Snapshot) -> bool:
276276
completed = False
277277
progress_stopped = False
278278
try:
279-
self.snapshot_evaluator.create(
279+
completion_status = self.snapshot_evaluator.create(
280280
snapshots_to_create,
281281
snapshots,
282282
allow_destructive_snapshots=plan.allow_destructive_models,
283283
deployability_index=deployability_index,
284284
on_start=lambda x: self.console.start_creation_progress(
285285
x, plan.environment, self.default_catalog
286286
),
287-
on_no_work=self.console.log_status_update,
288287
on_complete=self.console.update_creation_progress,
289288
)
289+
if completion_status.is_nothing_to_do:
290+
self.console.log_status_update(
291+
"\n[green]SKIP: No physical layer updates to perform[/green]\n"
292+
)
290293
completed = True
291294
except NodeExecutionFailedError as ex:
292295
self.console.stop_creation_progress(success=False)

sqlmesh/core/scheduler.py

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from __future__ import annotations
2-
from enum import Enum
32
import logging
43
import typing as t
54
from sqlglot import exp
@@ -31,6 +30,7 @@
3130
parent_snapshots_by_name,
3231
)
3332
from sqlmesh.core.state_sync import StateSync
33+
from sqlmesh.utils import CompletionStatus
3434
from sqlmesh.utils.concurrency import concurrent_apply_to_dag, NodeExecutionFailedError
3535
from sqlmesh.utils.dag import DAG
3636
from sqlmesh.utils.date import (
@@ -48,24 +48,6 @@
4848
SchedulingUnit = t.Tuple[str, t.Tuple[Interval, int]]
4949

5050

51-
class CompletionStatus(Enum):
52-
SUCCESS = "success"
53-
FAILURE = "failure"
54-
NOTHING_TO_DO = "nothing_to_do"
55-
56-
@property
57-
def is_success(self) -> bool:
58-
return self == CompletionStatus.SUCCESS
59-
60-
@property
61-
def is_failure(self) -> bool:
62-
return self == CompletionStatus.FAILURE
63-
64-
@property
65-
def is_nothing_to_do(self) -> bool:
66-
return self == CompletionStatus.NOTHING_TO_DO
67-
68-
6951
class Scheduler:
7052
"""Schedules and manages the evaluation of snapshots.
7153

sqlmesh/core/snapshot/evaluator.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
ViewKind,
5252
CustomKind,
5353
)
54-
54+
from sqlmesh.utils import CompletionStatus
5555
from sqlmesh.core.schema_diff import has_drop_alteration, get_dropped_column_names
5656
from sqlmesh.core.snapshot import (
5757
DeployabilityIndex,
@@ -287,20 +287,21 @@ def create(
287287
snapshots: t.Dict[SnapshotId, Snapshot],
288288
deployability_index: t.Optional[DeployabilityIndex] = None,
289289
on_start: t.Optional[t.Callable] = None,
290-
on_no_work: t.Optional[t.Callable] = None,
291290
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
292291
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
293-
) -> None:
292+
) -> CompletionStatus:
294293
"""Creates a physical snapshot schema and table for the given collection of snapshots.
295294
296295
Args:
297296
target_snapshots: Target snapshots.
298297
snapshots: Mapping of snapshot ID to snapshot.
299298
deployability_index: Determines snapshots that are deployable in the context of this creation.
300299
on_start: A callback to initialize the snapshot creation progress bar.
301-
on_no_work: A callback to call when no snapshots are to be created.
302300
on_complete: A callback to call on each successfully created snapshot.
303301
allow_destructive_snapshots: Set of snapshots that are allowed to have destructive schema changes.
302+
303+
Returns:
304+
CompletionStatus: The status of the creation operation (success, failure, nothing to do).
304305
"""
305306
snapshots_with_table_names = defaultdict(set)
306307
tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = (
@@ -368,9 +369,7 @@ def _get_data_objects(
368369
target_deployability_flags[snapshot.name].sort()
369370

370371
if not snapshots_to_create:
371-
if on_no_work:
372-
on_no_work("\n[green]SKIP: No physical layer updates to perform[/green]\n")
373-
return
372+
return CompletionStatus.NOTHING_TO_DO
374373
if on_start:
375374
on_start(len(snapshots_to_create))
376375

@@ -385,6 +384,7 @@ def _get_data_objects(
385384
on_complete=on_complete,
386385
allow_destructive_snapshots=allow_destructive_snapshots,
387386
)
387+
return CompletionStatus.SUCCESS
388388

389389
def _create_snapshots(
390390
self,

sqlmesh/utils/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from collections import defaultdict
1717
from contextlib import contextmanager
1818
from copy import deepcopy
19-
from enum import IntEnum
19+
from enum import IntEnum, Enum
2020
from functools import lru_cache, reduce, wraps
2121
from pathlib import Path
2222

@@ -346,3 +346,21 @@ class Verbosity(IntEnum):
346346
DEFAULT = 0
347347
VERBOSE = 1
348348
VERY_VERBOSE = 2
349+
350+
351+
class CompletionStatus(Enum):
352+
SUCCESS = "success"
353+
FAILURE = "failure"
354+
NOTHING_TO_DO = "nothing_to_do"
355+
356+
@property
357+
def is_success(self) -> bool:
358+
return self == CompletionStatus.SUCCESS
359+
360+
@property
361+
def is_failure(self) -> bool:
362+
return self == CompletionStatus.FAILURE
363+
364+
@property
365+
def is_nothing_to_do(self) -> bool:
366+
return self == CompletionStatus.NOTHING_TO_DO

0 commit comments

Comments
 (0)