|
7 | 7 | from sqlmesh.core.console import Console, get_console
|
8 | 8 | from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
|
9 | 9 | from sqlmesh.core.macros import RuntimeStage
|
| 10 | +from sqlmesh.core.model.definition import AuditResult |
10 | 11 | from sqlmesh.core.node import IntervalUnit
|
11 | 12 | from sqlmesh.core.notification_target import (
|
12 | 13 | NotificationEvent,
|
@@ -167,6 +168,7 @@ def evaluate(
|
167 | 168 | batch_index: int,
|
168 | 169 | environment_naming_info: EnvironmentNamingInfo,
|
169 | 170 | default_catalog: t.Optional[str],
|
| 171 | + on_audits_complete: t.Optional[t.Callable[[Snapshot, t.List[AuditResult]], None]] = None, |
170 | 172 | **kwargs: t.Any,
|
171 | 173 | ) -> None:
|
172 | 174 | """Evaluate a snapshot and add the processed interval to the state sync.
|
@@ -207,7 +209,8 @@ def evaluate(
|
207 | 209 | wap_id=wap_id,
|
208 | 210 | **kwargs,
|
209 | 211 | )
|
210 |
| - get_console().store_evaluation_audit_results(snapshot, audit_results) |
| 212 | + if on_audits_complete: |
| 213 | + on_audits_complete(snapshot, audit_results) |
211 | 214 |
|
212 | 215 | audit_errors_to_raise: t.List[AuditError] = []
|
213 | 216 | for audit_result in (result for result in audit_results if result.count):
|
@@ -483,6 +486,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
|
483 | 486 | batch_index=batch_idx,
|
484 | 487 | environment_naming_info=environment_naming_info,
|
485 | 488 | default_catalog=self.default_catalog,
|
| 489 | + on_audits_complete=self.console.store_evaluation_audit_results, |
486 | 490 | )
|
487 | 491 | evaluation_duration_ms = now_timestamp() - execution_start_ts
|
488 | 492 | finally:
|
|
0 commit comments