Skip to content

Commit 5698441

Browse files
committed
Call console directly in scheduler
1 parent e56127b commit 5698441

1 file changed

Lines changed: 5 additions & 12 deletions

File tree

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from sqlmesh.core.console import Console, get_console
88
from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
99
from sqlmesh.core.macros import RuntimeStage
10-
from sqlmesh.core.model.definition import AuditResult
1110
from sqlmesh.core.node import IntervalUnit
1211
from sqlmesh.core.notification_target import (
1312
NotificationEvent,
@@ -168,8 +167,6 @@ def evaluate(
168167
batch_index: int,
169168
environment_naming_info: EnvironmentNamingInfo,
170169
default_catalog: t.Optional[str],
171-
on_audits_complete: t.Optional[t.Callable[[Snapshot, t.List[AuditResult]], None]] = None,
172-
on_audit_warning: t.Optional[t.Callable[[str, t.Optional[str]], None]] = None,
173170
**kwargs: t.Any,
174171
) -> None:
175172
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -210,8 +207,7 @@ def evaluate(
210207
wap_id=wap_id,
211208
**kwargs,
212209
)
213-
if on_audits_complete:
214-
on_audits_complete(snapshot, audit_results)
210+
self.console.store_evaluation_audit_results(snapshot, audit_results)
215211

216212
audit_errors_to_raise: t.List[AuditError] = []
217213
for audit_result in (result for result in audit_results if result.count):
@@ -236,11 +232,10 @@ def evaluate(
236232
default_catalog,
237233
self.snapshot_evaluator.adapter.dialect,
238234
)
239-
if on_audit_warning:
240-
on_audit_warning(
241-
f"\n{display_name}: {error}.",
242-
f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}",
243-
)
235+
self.console.log_warning(
236+
f"\n{display_name}: {error}.",
237+
f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}",
238+
)
244239

245240
if audit_errors_to_raise:
246241
raise NodeAuditsErrors(audit_errors_to_raise)
@@ -488,8 +483,6 @@ def evaluate_node(node: SchedulingUnit) -> None:
488483
batch_index=batch_idx,
489484
environment_naming_info=environment_naming_info,
490485
default_catalog=self.default_catalog,
491-
on_audits_complete=self.console.store_evaluation_audit_results,
492-
on_audit_warning=self.console.log_warning,
493486
)
494487
evaluation_duration_ms = now_timestamp() - execution_start_ts
495488
finally:

0 commit comments

Comments
 (0)