77from sqlmesh .core .console import Console , get_console
88from sqlmesh .core .environment import EnvironmentNamingInfo , execute_environment_statements
99from sqlmesh .core .macros import RuntimeStage
10+ from sqlmesh .core .model .definition import AuditResult
1011from sqlmesh .core .node import IntervalUnit
1112from sqlmesh .core .notification_target import (
1213 NotificationEvent ,
@@ -167,6 +168,8 @@ def evaluate(
167168 batch_index : int ,
168169 environment_naming_info : EnvironmentNamingInfo ,
169170 default_catalog : t .Optional [str ],
171+ on_audits_complete : t .Optional [t .Callable [[Snapshot , t .List [AuditResult ]], None ]] = None ,
172+ on_audit_warning : t .Optional [t .Callable [[str , t .Optional [str ]], None ]] = None ,
170173 ** kwargs : t .Any ,
171174 ) -> None :
172175 """Evaluate a snapshot and add the processed interval to the state sync.
@@ -207,7 +210,8 @@ def evaluate(
207210 wap_id = wap_id ,
208211 ** kwargs ,
209212 )
210- get_console ().store_evaluation_audit_results (snapshot , audit_results )
213+ if on_audits_complete :
214+ on_audits_complete (snapshot , audit_results )
211215
212216 audit_errors_to_raise : t .List [AuditError ] = []
213217 for audit_result in (result for result in audit_results if result .count ):
@@ -232,10 +236,11 @@ def evaluate(
232236 default_catalog ,
233237 self .snapshot_evaluator .adapter .dialect ,
234238 )
235- get_console ().log_warning (
236- f"\n { display_name } : { error } ." ,
237- long_message = f"{ error } . Audit query:\n { error .query .sql (error .adapter_dialect )} " ,
238- )
239+ if on_audit_warning :
240+ on_audit_warning (
241+ f"\n { display_name } : { error } ." ,
242+ f"{ error } . Audit query:\n { error .query .sql (error .adapter_dialect )} " ,
243+ )
239244
240245 if audit_errors_to_raise :
241246 raise NodeAuditsErrors (audit_errors_to_raise )
@@ -483,6 +488,8 @@ def evaluate_node(node: SchedulingUnit) -> None:
483488 batch_index = batch_idx ,
484489 environment_naming_info = environment_naming_info ,
485490 default_catalog = self .default_catalog ,
491+ on_audits_complete = self .console .store_evaluation_audit_results ,
492+ on_audit_warning = self .console .log_warning ,
486493 )
487494 evaluation_duration_ms = now_timestamp () - execution_start_ts
488495 finally :
0 commit comments