Skip to content

Commit 9b4a1a2

Browse files
committed
Overhaul CLI evaluation progress bar
1 parent 3ea0493 commit 9b4a1a2

5 files changed

Lines changed: 27 additions & 3 deletions

File tree

sqlmesh/core/console.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from sqlmesh.core.environment import EnvironmentNamingInfo
3030
from sqlmesh.core.linter.rule import RuleViolation
3131
from sqlmesh.core.model import Model
32+
from sqlmesh.core.model.definition import AuditResult
3233
from sqlmesh.core.snapshot import (
3334
Snapshot,
3435
SnapshotChangeCategory,
@@ -93,6 +94,12 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
9394
def stop_plan_evaluation(self) -> None:
9495
"""Indicates that the evaluation has ended."""
9596

97+
@abc.abstractmethod
98+
def store_evaluation_audit_results(
99+
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
100+
) -> None:
101+
"""Stores the audit results for the snapshot evaluation."""
102+
96103
@abc.abstractmethod
97104
def start_evaluation_progress(
98105
self,
@@ -350,6 +357,11 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
350357
def stop_plan_evaluation(self) -> None:
351358
pass
352359

360+
def store_evaluation_audit_results(
361+
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
362+
) -> None:
363+
pass
364+
353365
def start_evaluation_progress(
354366
self,
355367
batch_sizes: t.Dict[Snapshot, int],
@@ -560,6 +572,7 @@ def __init__(
560572
self.evaluation_total_task: t.Optional[TaskID] = None
561573
self.evaluation_model_progress: t.Optional[Progress] = None
562574
self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
575+
self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {}
563576

564577
# Put in temporary values that are replaced when evaluating
565578
self.environment_naming_info = EnvironmentNamingInfo()
@@ -598,6 +611,11 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
598611
def stop_plan_evaluation(self) -> None:
599612
pass
600613

614+
def store_evaluation_audit_results(
615+
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
616+
) -> None:
617+
self.evaluation_audit_results[snapshot] = audit_results
618+
601619
def start_evaluation_progress(
602620
self,
603621
batch_sizes: t.Dict[Snapshot, int],

sqlmesh/core/scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ def evaluate(
166166
execution_time: TimeLike,
167167
deployability_index: DeployabilityIndex,
168168
batch_index: int,
169+
environment_naming_info: EnvironmentNamingInfo,
170+
default_catalog: t.Optional[str],
169171
**kwargs: t.Any,
170172
) -> t.Tuple[t.List[AuditResult], t.List[AuditError]]:
171173
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -209,6 +211,7 @@ def evaluate(
209211
wap_id=wap_id,
210212
**kwargs,
211213
)
214+
get_console().store_evaluation_audit_results(snapshot, audit_results)
212215

213216
audit_errors_to_raise: t.List[AuditError] = []
214217
audit_errors_to_warn: t.List[AuditError] = []
@@ -427,7 +430,7 @@ def run_merged_intervals(
427430
batched_intervals = self.batch_intervals(merged_intervals)
428431

429432
self.console.start_evaluation_progress(
430-
{snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()},
433+
batched_intervals,
431434
environment_naming_info,
432435
self.default_catalog,
433436
)

tests/cli/test_cli.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ def test_run_dev(runner, tmp_path, flag):
718718
# Confirm backfill occurs when we run non-backfilled dev env
719719
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run", "dev"])
720720
assert result.exit_code == 0
721-
assert_model_batches_executed(result)
721+
assert_model_batches_evaluated(result)
722722

723723

724724
@time_machine.travel(FREEZE_TIME)
@@ -750,7 +750,7 @@ def test_run_cron_elapsed(runner, tmp_path):
750750
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run"])
751751

752752
assert result.exit_code == 0
753-
assert_model_batches_executed(result)
753+
assert_model_batches_evaluated(result)
754754

755755

756756
def test_clean(runner, tmp_path):

tests/core/test_scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,8 @@ def _evaluate():
528528
to_datetime("2022-01-03"),
529529
DeployabilityIndex.all_deployable(),
530530
0,
531+
EnvironmentNamingInfo(),
532+
None,
531533
)
532534

533535
evaluator_audit_mock.return_value = [

web/server/console.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
1414
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
15+
from sqlmesh.core.snapshot.definition import Intervals
1516
from sqlmesh.core.test import ModelTest
1617
from sqlmesh.utils.date import now_timestamp
1718
from web.server import models

0 commit comments

Comments
 (0)