Skip to content

Commit c387f6a

Browse files
committed
PR Feedback 1
1 parent 7d5cdfd commit c387f6a

5 files changed

Lines changed: 101 additions & 58 deletions

File tree

sqlmesh/core/console.py

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,14 @@ def start_evaluation_progress(
329329
batched_intervals: t.Dict[Snapshot, Intervals],
330330
environment_naming_info: EnvironmentNamingInfo,
331331
default_catalog: t.Optional[str],
332+
audit_only: bool = False,
332333
) -> None:
333-
"""Indicates that a new snapshot evaluation progress has begun."""
334+
"""Indicates that a new snapshot evaluation/auditing progress has begun."""
334335

335336
@abc.abstractmethod
336-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
337+
def start_snapshot_evaluation_progress(
338+
self, snapshot: Snapshot, audit_only: bool = False
339+
) -> None:
337340
"""Starts the snapshot evaluation progress."""
338341

339342
@abc.abstractmethod
@@ -345,6 +348,7 @@ def update_snapshot_evaluation_progress(
345348
duration_ms: t.Optional[int],
346349
num_audits_passed: int,
347350
num_audits_failed: int,
351+
audit_only: bool = False,
348352
) -> None:
349353
"""Updates the snapshot evaluation progress."""
350354

@@ -486,10 +490,13 @@ def start_evaluation_progress(
486490
batched_intervals: t.Dict[Snapshot, Intervals],
487491
environment_naming_info: EnvironmentNamingInfo,
488492
default_catalog: t.Optional[str],
493+
audit_only: bool = False,
489494
) -> None:
490495
pass
491496

492-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
497+
def start_snapshot_evaluation_progress(
498+
self, snapshot: Snapshot, audit_only: bool = False
499+
) -> None:
493500
pass
494501

495502
def update_snapshot_evaluation_progress(
@@ -500,6 +507,7 @@ def update_snapshot_evaluation_progress(
500507
duration_ms: t.Optional[int],
501508
num_audits_passed: int,
502509
num_audits_failed: int,
510+
audit_only: bool = False,
503511
) -> None:
504512
pass
505513

@@ -864,11 +872,12 @@ def start_evaluation_progress(
864872
batched_intervals: t.Dict[Snapshot, Intervals],
865873
environment_naming_info: EnvironmentNamingInfo,
866874
default_catalog: t.Optional[str],
875+
audit_only: bool = False,
867876
) -> None:
868-
"""Indicates that a new snapshot evaluation progress has begun."""
877+
"""Indicates that a new snapshot evaluation/auditing progress has begun."""
869878
if not self.evaluation_progress_live:
870879
self.evaluation_total_progress = make_progress_bar(
871-
"Executing model batches", self.console
880+
"Executing model batches" if not audit_only else "Auditing models", self.console
872881
)
873882

874883
self.evaluation_model_progress = Progress(
@@ -889,8 +898,9 @@ def start_evaluation_progress(
889898
batch_sizes = {
890899
snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()
891900
}
901+
message = "Executing" if not audit_only else "Auditing"
892902
self.evaluation_total_task = self.evaluation_total_progress.add_task(
893-
"Executing models...", total=sum(batch_sizes.values())
903+
f"{message} models...", total=sum(batch_sizes.values())
894904
)
895905

896906
# determine column widths
@@ -916,15 +926,17 @@ def start_evaluation_progress(
916926
self.environment_naming_info = environment_naming_info
917927
self.default_catalog = default_catalog
918928

919-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
929+
def start_snapshot_evaluation_progress(
930+
self, snapshot: Snapshot, audit_only: bool = False
931+
) -> None:
920932
if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
921933
display_name = snapshot.display_name(
922934
self.environment_naming_info,
923935
self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
924936
dialect=self.dialect,
925937
)
926938
self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task(
927-
f"Evaluating {display_name}...",
939+
f"{'Evaluating' if not audit_only else 'Auditing'} {display_name}...",
928940
view_name=display_name,
929941
total=self.evaluation_model_batch_sizes[snapshot],
930942
)
@@ -937,6 +949,7 @@ def update_snapshot_evaluation_progress(
937949
duration_ms: t.Optional[int],
938950
num_audits_passed: int,
939951
num_audits_failed: int,
952+
audit_only: bool = False,
940953
) -> None:
941954
"""Update the snapshot evaluation progress."""
942955
if (
@@ -976,7 +989,7 @@ def update_snapshot_evaluation_progress(
976989
self.evaluation_column_widths["duration"]
977990
)
978991

979-
msg = f"{batch} {display_name} {annotation} {duration}".replace(
992+
msg = f"{f'{batch} ' if not audit_only else ''}{display_name} {annotation} {duration}".replace(
980993
self.AUDIT_PASS_MARK, self.GREEN_AUDIT_PASS_MARK
981994
)
982995

@@ -988,7 +1001,10 @@ def update_snapshot_evaluation_progress(
9881001

9891002
model_task_id = self.evaluation_model_tasks[snapshot.name]
9901003
self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1)
991-
if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches:
1004+
if (
1005+
self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches
1006+
or audit_only
1007+
):
9921008
self.evaluation_model_progress.remove_task(model_task_id)
9931009

9941010
def stop_evaluation_progress(self, success: bool = True) -> None:
@@ -3156,14 +3172,17 @@ def start_evaluation_progress(
31563172
batched_intervals: t.Dict[Snapshot, Intervals],
31573173
environment_naming_info: EnvironmentNamingInfo,
31583174
default_catalog: t.Optional[str],
3175+
audit_only: bool = False,
31593176
) -> None:
31603177
self.evaluation_model_batch_sizes = {
31613178
snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()
31623179
}
31633180
self.evaluation_environment_naming_info = environment_naming_info
31643181
self.default_catalog = default_catalog
31653182

3166-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
3183+
def start_snapshot_evaluation_progress(
3184+
self, snapshot: Snapshot, audit_only: bool = False
3185+
) -> None:
31673186
if not self.evaluation_batch_progress.get(snapshot.snapshot_id):
31683187
display_name = snapshot.display_name(
31693188
self.evaluation_environment_naming_info,
@@ -3183,7 +3202,11 @@ def update_snapshot_evaluation_progress(
31833202
duration_ms: t.Optional[int],
31843203
num_audits_passed: int,
31853204
num_audits_failed: int,
3205+
audit_only: bool = False,
31863206
) -> None:
3207+
if audit_only:
3208+
return
3209+
31873210
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
31883211
total_batches = self.evaluation_model_batch_sizes[snapshot]
31893212

@@ -3326,13 +3349,17 @@ def start_evaluation_progress(
33263349
batched_intervals: t.Dict[Snapshot, Intervals],
33273350
environment_naming_info: EnvironmentNamingInfo,
33283351
default_catalog: t.Optional[str],
3352+
audit_only: bool = False,
33293353
) -> None:
3354+
message = "evaluation" if not audit_only else "auditing"
33303355
self._write(
3331-
f"Starting evaluation for {sum(len(intervals) for intervals in batched_intervals.values())} snapshots"
3356+
f"Starting {message} for {sum(len(intervals) for intervals in batched_intervals.values())} snapshots"
33323357
)
33333358

3334-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
3335-
self._write(f"Evaluating {snapshot.name}")
3359+
def start_snapshot_evaluation_progress(
3360+
self, snapshot: Snapshot, audit_only: bool = False
3361+
) -> None:
3362+
self._write(f"{'Evaluating' if not audit_only else 'Auditing'} {snapshot.name}")
33363363

33373364
def update_snapshot_evaluation_progress(
33383365
self,
@@ -3342,10 +3369,14 @@ def update_snapshot_evaluation_progress(
33423369
duration_ms: t.Optional[int],
33433370
num_audits_passed: int,
33443371
num_audits_failed: int,
3372+
audit_only: bool = False,
33453373
) -> None:
3346-
self._write(
3347-
f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3348-
)
3374+
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3375+
3376+
if audit_only:
3377+
message = f"Auditing {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3378+
3379+
self._write(message)
33493380

33503381
def stop_evaluation_progress(self, success: bool = True) -> None:
33513382
self._write(f"Stopping evaluation with success={success}")

sqlmesh/core/model/definition.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,9 +1097,8 @@ def _audit_metadata_hash_values(self) -> t.List[str]:
10971097

10981098
return metadata
10991099

1100-
def audit_metadata_hash(self) -> t.Tuple[t.List[str], str]:
1101-
hash_values = self._audit_metadata_hash_values()
1102-
return hash_values, hash_data(hash_values)
1100+
def audit_metadata_hash(self) -> str:
1101+
return hash_data(self._audit_metadata_hash_values())
11031102

11041103
@property
11051104
def metadata_hash(self) -> str:

sqlmesh/core/plan/evaluator.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def evaluate(
128128
execution_time=plan.execution_time,
129129
)
130130

131-
self._run_audits_for_metadata_snapshots(plan, snapshots, new_snapshots)
131+
self._run_audits_for_metadata_snapshots(plan, new_snapshots)
132132

133133
push_completion_status = self._push(plan, snapshots, deployability_index_for_creation)
134134
if push_completion_status.is_nothing_to_do:
@@ -550,18 +550,17 @@ def _restatement_intervals_across_all_environments(
550550
def _run_audits_for_metadata_snapshots(
551551
self,
552552
plan: EvaluatablePlan,
553-
snapshots: t.Dict[SnapshotId, Snapshot],
554553
new_snapshots: t.Dict[SnapshotId, Snapshot],
555554
) -> None:
556-
# Step 1: Filter out snapshots that are not categorized as metadata changes on models
555+
# Filter out snapshots that are not categorized as metadata changes on models
557556
metadata_snapshots = []
558557
for snapshot in new_snapshots.values():
559558
if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable:
560559
continue
561560

562561
metadata_snapshots.append(snapshot)
563562

564-
# Step 2: Bulk load their previous snapshots from state
563+
# Bulk load all the previous snapshots
565564
previous_snapshots = self.state_sync.get_snapshots(
566565
[
567566
s.previous_version.snapshot_id(s.name)
@@ -570,29 +569,25 @@ def _run_audits_for_metadata_snapshots(
570569
]
571570
).values()
572571

573-
# Step 3: Compare the audit metadata hashes to determine if there was a change in the audits field
574-
to_be_audited_snapshots = {}
572+
# Check if any of the snapshots have modifications to the audits field by comparing the hashes
573+
audit_snapshots = {}
575574
for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots):
576-
new_audits, new_audits_hash = snapshot.model.audit_metadata_hash()
577-
_, previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
575+
new_audits_hash = snapshot.model.audit_metadata_hash()
576+
previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
578577

579-
if previous_audit_hash != new_audits_hash and new_audits:
580-
snapshot_start = min(i[0] for i in snapshot.intervals)
581-
snapshot_end = max(i[1] for i in snapshot.intervals)
582-
to_be_audited_snapshots[snapshot.snapshot_id] = (snapshot_start, snapshot_end)
578+
if snapshot.model.audits and previous_audit_hash != new_audits_hash:
579+
audit_snapshots[snapshot.snapshot_id] = snapshot
583580

584-
if not to_be_audited_snapshots:
581+
if not audit_snapshots:
585582
return
586583

587-
# Step 4: If there are any snapshots to be audited, we'll reuse the scheduler's
588-
# internals to audit them by utilizing the restatement logic
589-
scheduler = self.create_scheduler(snapshots.values())
584+
# If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
585+
scheduler = self.create_scheduler(audit_snapshots.values())
590586
completion_status = scheduler.audit(
591587
plan.environment,
592588
plan.start,
593589
plan.end,
594590
execution_time=plan.execution_time,
595-
restatements=to_be_audited_snapshots,
596591
end_bounded=plan.end_bounded,
597592
interval_end_per_model=plan.interval_end_per_model,
598593
)

0 commit comments

Comments
 (0)