Skip to content

Commit c110855

Browse files
committed
Overhaul CLI evaluation progress bar
1 parent 0fc89dd commit c110855

5 files changed

Lines changed: 27 additions & 3 deletions

File tree

sqlmesh/core/console.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from sqlmesh.core.environment import EnvironmentNamingInfo
3131
from sqlmesh.core.linter.rule import RuleViolation
3232
from sqlmesh.core.model import Model
33+
from sqlmesh.core.model.definition import AuditResult
3334
from sqlmesh.core.snapshot import (
3435
Snapshot,
3536
SnapshotChangeCategory,
@@ -96,6 +97,12 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
9697
def stop_plan_evaluation(self) -> None:
9798
"""Indicates that the evaluation has ended."""
9899

100+
@abc.abstractmethod
101+
def store_evaluation_audit_results(
102+
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
103+
) -> None:
104+
"""Stores the audit results for the snapshot evaluation."""
105+
99106
@abc.abstractmethod
100107
def start_evaluation_progress(
101108
self,
@@ -413,6 +420,11 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
413420
def stop_plan_evaluation(self) -> None:
414421
pass
415422

423+
def store_evaluation_audit_results(
424+
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
425+
) -> None:
426+
pass
427+
416428
def start_evaluation_progress(
417429
self,
418430
batch_sizes: t.Dict[Snapshot, int],
@@ -678,6 +690,7 @@ def __init__(
678690
self.evaluation_total_task: t.Optional[TaskID] = None
679691
self.evaluation_model_progress: t.Optional[Progress] = None
680692
self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
693+
self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {}
681694

682695
# Put in temporary values that are replaced when evaluating
683696
self.environment_naming_info = EnvironmentNamingInfo()
@@ -726,6 +739,11 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
726739
def stop_plan_evaluation(self) -> None:
727740
pass
728741

742+
def store_evaluation_audit_results(
743+
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
744+
) -> None:
745+
self.evaluation_audit_results[snapshot] = audit_results
746+
729747
def start_evaluation_progress(
730748
self,
731749
batch_sizes: t.Dict[Snapshot, int],

sqlmesh/core/scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ def evaluate(
166166
execution_time: TimeLike,
167167
deployability_index: DeployabilityIndex,
168168
batch_index: int,
169+
environment_naming_info: EnvironmentNamingInfo,
170+
default_catalog: t.Optional[str],
169171
**kwargs: t.Any,
170172
) -> t.Tuple[t.List[AuditResult], t.List[AuditError]]:
171173
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -209,6 +211,7 @@ def evaluate(
209211
wap_id=wap_id,
210212
**kwargs,
211213
)
214+
get_console().store_evaluation_audit_results(snapshot, audit_results)
212215

213216
audit_errors_to_raise: t.List[AuditError] = []
214217
audit_errors_to_warn: t.List[AuditError] = []
@@ -427,7 +430,7 @@ def run_merged_intervals(
427430
batched_intervals = self.batch_intervals(merged_intervals)
428431

429432
self.console.start_evaluation_progress(
430-
{snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()},
433+
batched_intervals,
431434
environment_naming_info,
432435
self.default_catalog,
433436
)

tests/cli/test_cli.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ def test_run_dev(runner, tmp_path, flag):
719719
# Confirm backfill occurs when we run non-backfilled dev env
720720
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run", "dev"])
721721
assert result.exit_code == 0
722-
assert_model_batches_executed(result)
722+
assert_model_batches_evaluated(result)
723723

724724

725725
@time_machine.travel(FREEZE_TIME)
@@ -751,7 +751,7 @@ def test_run_cron_elapsed(runner, tmp_path):
751751
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run"])
752752

753753
assert result.exit_code == 0
754-
assert_model_batches_executed(result)
754+
assert_model_batches_evaluated(result)
755755

756756

757757
def test_clean(runner, tmp_path):

tests/core/test_scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,8 @@ def _evaluate():
528528
to_datetime("2022-01-03"),
529529
DeployabilityIndex.all_deployable(),
530530
0,
531+
EnvironmentNamingInfo(),
532+
None,
531533
)
532534

533535
evaluator_audit_mock.return_value = [

web/server/console.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
1414
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
15+
from sqlmesh.core.snapshot.definition import Intervals
1516
from sqlmesh.core.test import ModelTest
1617
from sqlmesh.utils.date import now_timestamp
1718
from web.server import models

0 commit comments

Comments
 (0)