Skip to content

Commit 1417c48

Browse files
committed
Pass audit results directly to evaluation prog bar output
1 parent d020dc3 commit 1417c48

3 files changed

Lines changed: 54 additions & 38 deletions

File tree

sqlmesh/core/console.py

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from sqlmesh.core.environment import EnvironmentNamingInfo
3030
from sqlmesh.core.linter.rule import RuleViolation
3131
from sqlmesh.core.model import Model
32-
from sqlmesh.core.model.definition import AuditResult
3332
from sqlmesh.core.snapshot import (
3433
Snapshot,
3534
SnapshotChangeCategory,
@@ -93,12 +92,6 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
9392
def stop_plan_evaluation(self) -> None:
9493
"""Indicates that the evaluation has ended."""
9594

96-
@abc.abstractmethod
97-
def store_evaluation_audit_results(
98-
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
99-
) -> None:
100-
"""Stores the audit results for the snapshot evaluation."""
101-
10295
@abc.abstractmethod
10396
def start_evaluation_progress(
10497
self,
@@ -114,7 +107,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
114107

115108
@abc.abstractmethod
116109
def update_snapshot_evaluation_progress(
117-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
110+
self,
111+
snapshot: Snapshot,
112+
batch_idx: int,
113+
duration_ms: t.Optional[int],
114+
num_audits_passed: int,
115+
num_audits_failed: int,
118116
) -> None:
119117
"""Updates the snapshot evaluation progress."""
120118

@@ -350,11 +348,6 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
350348
def stop_plan_evaluation(self) -> None:
351349
pass
352350

353-
def store_evaluation_audit_results(
354-
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
355-
) -> None:
356-
pass
357-
358351
def start_evaluation_progress(
359352
self,
360353
batches: t.Dict[Snapshot, Intervals],
@@ -367,7 +360,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
367360
pass
368361

369362
def update_snapshot_evaluation_progress(
370-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
363+
self,
364+
snapshot: Snapshot,
365+
batch_idx: int,
366+
duration_ms: t.Optional[int],
367+
num_audits_passed: int,
368+
num_audits_failed: int,
371369
) -> None:
372370
pass
373371

@@ -551,7 +549,6 @@ def __init__(
551549
self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {}
552550
self.evaluation_model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {}
553551
self.evaluation_model_column_widths: t.Dict[str, int] = {}
554-
self.evaluation_audit_results: t.Dict[Snapshot, t.List[AuditResult]] = {}
555552

556553
# Put in temporary values that are replaced when evaluating
557554
self.environment_naming_info = EnvironmentNamingInfo()
@@ -590,11 +587,6 @@ def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
590587
def stop_plan_evaluation(self) -> None:
591588
pass
592589

593-
def store_evaluation_audit_results(
594-
self, snapshot: Snapshot, audit_results: t.List[AuditResult]
595-
) -> None:
596-
self.evaluation_audit_results[snapshot] = audit_results
597-
598590
def start_evaluation_progress(
599591
self,
600592
batched_intervals: t.Dict[Snapshot, Intervals],
@@ -654,7 +646,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
654646
)
655647

656648
def update_snapshot_evaluation_progress(
657-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
649+
self,
650+
snapshot: Snapshot,
651+
batch_idx: int,
652+
duration_ms: t.Optional[int],
653+
num_audits_passed: int,
654+
num_audits_failed: int,
658655
) -> None:
659656
"""Update the snapshot evaluation progress."""
660657
if (
@@ -672,14 +669,6 @@ def update_snapshot_evaluation_progress(
672669
)
673670

674671
annotation = self.evaluation_model_info[snapshot]["annotation"][batch_idx]
675-
num_audits = 0
676-
num_audits_passed = 0
677-
if snapshot in self.evaluation_audit_results:
678-
num_audits = len(self.evaluation_audit_results[snapshot])
679-
num_audits_passed = sum(
680-
result.count == 0 for result in self.evaluation_audit_results[snapshot]
681-
)
682-
num_audits_failed = num_audits - num_audits_passed
683672
if num_audits_passed:
684673
annotation += f", {num_audits_passed} audits pass"
685674
if num_audits_failed:
@@ -721,7 +710,6 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
721710
self.evaluation_model_batch_sizes = {}
722711
self.evaluation_model_info = {}
723712
self.evaluation_model_column_widths = {}
724-
self.evaluation_audit_results = {}
725713
self.environment_naming_info = EnvironmentNamingInfo()
726714
self.default_catalog = None
727715

@@ -2348,7 +2336,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
23482336
)
23492337

23502338
def update_snapshot_evaluation_progress(
2351-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
2339+
self,
2340+
snapshot: Snapshot,
2341+
batch_idx: int,
2342+
duration_ms: t.Optional[int],
2343+
num_audits_passed: int,
2344+
num_audits_failed: int,
23522345
) -> None:
23532346
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
23542347
total_batches = self.evaluation_model_batch_sizes[snapshot]
@@ -2499,9 +2492,16 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
24992492
self._write(f"Evaluating {snapshot.name}")
25002493

25012494
def update_snapshot_evaluation_progress(
2502-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
2495+
self,
2496+
snapshot: Snapshot,
2497+
batch_idx: int,
2498+
duration_ms: t.Optional[int],
2499+
num_audits_passed: int,
2500+
num_audits_failed: int,
25032501
) -> None:
2504-
self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms")
2502+
self._write(
2503+
f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
2504+
)
25052505

25062506
def stop_evaluation_progress(self, success: bool = True) -> None:
25072507
self._write(f"Stopping evaluation with success={success}")

sqlmesh/core/scheduler.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlmesh.core.console import Console, get_console
88
from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
99
from sqlmesh.core.macros import RuntimeStage
10+
from sqlmesh.core.model.definition import AuditResult
1011
from sqlmesh.core.node import IntervalUnit
1112
from sqlmesh.core.notification_target import (
1213
NotificationEvent,
@@ -168,7 +169,7 @@ def evaluate(
168169
environment_naming_info: EnvironmentNamingInfo,
169170
default_catalog: t.Optional[str],
170171
**kwargs: t.Any,
171-
) -> None:
172+
) -> t.List[AuditResult]:
172173
"""Evaluate a snapshot and add the processed interval to the state sync.
173174
174175
Args:
@@ -180,6 +181,9 @@ def evaluate(
180181
batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
181182
auto_restatement_enabled: Whether to enable auto restatements.
182183
kwargs: Additional kwargs to pass to the renderer.
184+
185+
Returns:
186+
List of audit results from the evaluation.
183187
"""
184188
validate_date_range(start, end)
185189

@@ -207,7 +211,6 @@ def evaluate(
207211
wap_id=wap_id,
208212
**kwargs,
209213
)
210-
self.console.store_evaluation_audit_results(snapshot, audit_results)
211214

212215
audit_errors_to_raise: t.List[AuditError] = []
213216
for audit_result in (result for result in audit_results if result.count):
@@ -241,6 +244,7 @@ def evaluate(
241244
raise NodeAuditsErrors(audit_errors_to_raise)
242245

243246
self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)
247+
return audit_results
244248

245249
def run(
246250
self,
@@ -474,7 +478,8 @@ def evaluate_node(node: SchedulingUnit) -> None:
474478
try:
475479
assert execution_time # mypy
476480
assert deployability_index # mypy
477-
self.evaluate(
481+
audit_results = []
482+
audit_results = self.evaluate(
478483
snapshot=snapshot,
479484
start=start,
480485
end=end,
@@ -486,8 +491,14 @@ def evaluate_node(node: SchedulingUnit) -> None:
486491
)
487492
evaluation_duration_ms = now_timestamp() - execution_start_ts
488493
finally:
494+
num_audits = len(audit_results)
495+
num_audits_failed = sum(1 for result in audit_results if result.count)
489496
self.console.update_snapshot_evaluation_progress(
490-
snapshot, batch_idx, evaluation_duration_ms
497+
snapshot,
498+
batch_idx,
499+
evaluation_duration_ms,
500+
num_audits - num_audits_failed,
501+
num_audits_failed,
491502
)
492503

493504
try:

web/server/console.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,12 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
124124
self.log_event_plan_apply()
125125

126126
def update_snapshot_evaluation_progress(
127-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
127+
self,
128+
snapshot: Snapshot,
129+
batch_idx: int,
130+
duration_ms: t.Optional[int],
131+
audits_passed: int,
132+
audits_failed: int,
128133
) -> None:
129134
if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.backfill:
130135
task = self.plan_apply_stage_tracker.backfill.tasks[snapshot.name]

0 commit comments

Comments
 (0)