7
7
from sqlmesh .core .console import Console , get_console
8
8
from sqlmesh .core .environment import EnvironmentNamingInfo , execute_environment_statements
9
9
from sqlmesh .core .macros import RuntimeStage
10
+ from sqlmesh .core .model .definition import AuditResult
10
11
from sqlmesh .core .node import IntervalUnit
11
12
from sqlmesh .core .notification_target import (
12
13
NotificationEvent ,
@@ -167,6 +168,8 @@ def evaluate(
167
168
batch_index : int ,
168
169
environment_naming_info : EnvironmentNamingInfo ,
169
170
default_catalog : t .Optional [str ],
171
+ on_audits_complete : t .Optional [t .Callable [[Snapshot , t .List [AuditResult ]], None ]] = None ,
172
+ on_audit_warning : t .Optional [t .Callable [[str , t .Optional [str ]], None ]] = None ,
170
173
** kwargs : t .Any ,
171
174
) -> None :
172
175
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -207,7 +210,8 @@ def evaluate(
207
210
wap_id = wap_id ,
208
211
** kwargs ,
209
212
)
210
- get_console ().store_evaluation_audit_results (snapshot , audit_results )
213
+ if on_audits_complete :
214
+ on_audits_complete (snapshot , audit_results )
211
215
212
216
audit_errors_to_raise : t .List [AuditError ] = []
213
217
for audit_result in (result for result in audit_results if result .count ):
@@ -232,10 +236,11 @@ def evaluate(
232
236
default_catalog ,
233
237
self .snapshot_evaluator .adapter .dialect ,
234
238
)
235
- get_console ().log_warning (
236
- f"\n { display_name } : { error } ." ,
237
- long_message = f"{ error } . Audit query:\n { error .query .sql (error .adapter_dialect )} " ,
238
- )
239
+ if on_audit_warning :
240
+ on_audit_warning (
241
+ f"\n { display_name } : { error } ." ,
242
+ f"{ error } . Audit query:\n { error .query .sql (error .adapter_dialect )} " ,
243
+ )
239
244
240
245
if audit_errors_to_raise :
241
246
raise NodeAuditsErrors (audit_errors_to_raise )
@@ -483,6 +488,8 @@ def evaluate_node(node: SchedulingUnit) -> None:
483
488
batch_index = batch_idx ,
484
489
environment_naming_info = environment_naming_info ,
485
490
default_catalog = self .default_catalog ,
491
+ on_audits_complete = self .console .store_evaluation_audit_results ,
492
+ on_audit_warning = self .console .log_warning ,
486
493
)
487
494
evaluation_duration_ms = now_timestamp () - execution_start_ts
488
495
finally :
0 commit comments