Skip to content

Commit 1a02f70

Browse files
committed
PR Feedback 1
1 parent bbf8607 commit 1a02f70

5 files changed

Lines changed: 101 additions & 58 deletions

File tree

sqlmesh/core/console.py

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -350,11 +350,14 @@ def start_evaluation_progress(
350350
batched_intervals: t.Dict[Snapshot, Intervals],
351351
environment_naming_info: EnvironmentNamingInfo,
352352
default_catalog: t.Optional[str],
353+
audit_only: bool = False,
353354
) -> None:
354-
"""Indicates that a new snapshot evaluation progress has begun."""
355+
"""Indicates that a new snapshot evaluation/auditing progress has begun."""
355356

356357
@abc.abstractmethod
357-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
358+
def start_snapshot_evaluation_progress(
359+
self, snapshot: Snapshot, audit_only: bool = False
360+
) -> None:
358361
"""Starts the snapshot evaluation progress."""
359362

360363
@abc.abstractmethod
@@ -366,6 +369,7 @@ def update_snapshot_evaluation_progress(
366369
duration_ms: t.Optional[int],
367370
num_audits_passed: int,
368371
num_audits_failed: int,
372+
audit_only: bool = False,
369373
) -> None:
370374
"""Updates the snapshot evaluation progress."""
371375

@@ -507,10 +511,13 @@ def start_evaluation_progress(
507511
batched_intervals: t.Dict[Snapshot, Intervals],
508512
environment_naming_info: EnvironmentNamingInfo,
509513
default_catalog: t.Optional[str],
514+
audit_only: bool = False,
510515
) -> None:
511516
pass
512517

513-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
518+
def start_snapshot_evaluation_progress(
519+
self, snapshot: Snapshot, audit_only: bool = False
520+
) -> None:
514521
pass
515522

516523
def update_snapshot_evaluation_progress(
@@ -521,6 +528,7 @@ def update_snapshot_evaluation_progress(
521528
duration_ms: t.Optional[int],
522529
num_audits_passed: int,
523530
num_audits_failed: int,
531+
audit_only: bool = False,
524532
) -> None:
525533
pass
526534

@@ -891,11 +899,12 @@ def start_evaluation_progress(
891899
batched_intervals: t.Dict[Snapshot, Intervals],
892900
environment_naming_info: EnvironmentNamingInfo,
893901
default_catalog: t.Optional[str],
902+
audit_only: bool = False,
894903
) -> None:
895-
"""Indicates that a new snapshot evaluation progress has begun."""
904+
"""Indicates that a new snapshot evaluation/auditing progress has begun."""
896905
if not self.evaluation_progress_live:
897906
self.evaluation_total_progress = make_progress_bar(
898-
"Executing model batches", self.console
907+
"Executing model batches" if not audit_only else "Auditing models", self.console
899908
)
900909

901910
self.evaluation_model_progress = Progress(
@@ -916,8 +925,9 @@ def start_evaluation_progress(
916925
batch_sizes = {
917926
snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()
918927
}
928+
message = "Executing" if not audit_only else "Auditing"
919929
self.evaluation_total_task = self.evaluation_total_progress.add_task(
920-
"Executing models...", total=sum(batch_sizes.values())
930+
f"{message} models...", total=sum(batch_sizes.values())
921931
)
922932

923933
# determine column widths
@@ -943,15 +953,17 @@ def start_evaluation_progress(
943953
self.environment_naming_info = environment_naming_info
944954
self.default_catalog = default_catalog
945955

946-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
956+
def start_snapshot_evaluation_progress(
957+
self, snapshot: Snapshot, audit_only: bool = False
958+
) -> None:
947959
if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
948960
display_name = snapshot.display_name(
949961
self.environment_naming_info,
950962
self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
951963
dialect=self.dialect,
952964
)
953965
self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task(
954-
f"Evaluating {display_name}...",
966+
f"{'Evaluating' if not audit_only else 'Auditing'} {display_name}...",
955967
view_name=display_name,
956968
total=self.evaluation_model_batch_sizes[snapshot],
957969
)
@@ -964,6 +976,7 @@ def update_snapshot_evaluation_progress(
964976
duration_ms: t.Optional[int],
965977
num_audits_passed: int,
966978
num_audits_failed: int,
979+
audit_only: bool = False,
967980
) -> None:
968981
"""Update the snapshot evaluation progress."""
969982
if (
@@ -1003,7 +1016,7 @@ def update_snapshot_evaluation_progress(
10031016
self.evaluation_column_widths["duration"]
10041017
)
10051018

1006-
msg = f"{batch} {display_name} {annotation} {duration}".replace(
1019+
msg = f"{f'{batch} ' if not audit_only else ''}{display_name} {annotation} {duration}".replace(
10071020
self.AUDIT_PASS_MARK, self.GREEN_AUDIT_PASS_MARK
10081021
)
10091022

@@ -1015,7 +1028,10 @@ def update_snapshot_evaluation_progress(
10151028

10161029
model_task_id = self.evaluation_model_tasks[snapshot.name]
10171030
self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1)
1018-
if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches:
1031+
if (
1032+
self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches
1033+
or audit_only
1034+
):
10191035
self.evaluation_model_progress.remove_task(model_task_id)
10201036

10211037
def stop_evaluation_progress(self, success: bool = True) -> None:
@@ -3208,14 +3224,17 @@ def start_evaluation_progress(
32083224
batched_intervals: t.Dict[Snapshot, Intervals],
32093225
environment_naming_info: EnvironmentNamingInfo,
32103226
default_catalog: t.Optional[str],
3227+
audit_only: bool = False,
32113228
) -> None:
32123229
self.evaluation_model_batch_sizes = {
32133230
snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()
32143231
}
32153232
self.evaluation_environment_naming_info = environment_naming_info
32163233
self.default_catalog = default_catalog
32173234

3218-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
3235+
def start_snapshot_evaluation_progress(
3236+
self, snapshot: Snapshot, audit_only: bool = False
3237+
) -> None:
32193238
if not self.evaluation_batch_progress.get(snapshot.snapshot_id):
32203239
display_name = snapshot.display_name(
32213240
self.evaluation_environment_naming_info,
@@ -3235,7 +3254,11 @@ def update_snapshot_evaluation_progress(
32353254
duration_ms: t.Optional[int],
32363255
num_audits_passed: int,
32373256
num_audits_failed: int,
3257+
audit_only: bool = False,
32383258
) -> None:
3259+
if audit_only:
3260+
return
3261+
32393262
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
32403263
total_batches = self.evaluation_model_batch_sizes[snapshot]
32413264

@@ -3378,13 +3401,17 @@ def start_evaluation_progress(
33783401
batched_intervals: t.Dict[Snapshot, Intervals],
33793402
environment_naming_info: EnvironmentNamingInfo,
33803403
default_catalog: t.Optional[str],
3404+
audit_only: bool = False,
33813405
) -> None:
3406+
message = "evaluation" if not audit_only else "auditing"
33823407
self._write(
3383-
f"Starting evaluation for {sum(len(intervals) for intervals in batched_intervals.values())} snapshots"
3408+
f"Starting {message} for {sum(len(intervals) for intervals in batched_intervals.values())} snapshots"
33843409
)
33853410

3386-
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
3387-
self._write(f"Evaluating {snapshot.name}")
3411+
def start_snapshot_evaluation_progress(
3412+
self, snapshot: Snapshot, audit_only: bool = False
3413+
) -> None:
3414+
self._write(f"{'Evaluating' if not audit_only else 'Auditing'} {snapshot.name}")
33883415

33893416
def update_snapshot_evaluation_progress(
33903417
self,
@@ -3394,10 +3421,14 @@ def update_snapshot_evaluation_progress(
33943421
duration_ms: t.Optional[int],
33953422
num_audits_passed: int,
33963423
num_audits_failed: int,
3424+
audit_only: bool = False,
33973425
) -> None:
3398-
self._write(
3399-
f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3400-
)
3426+
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3427+
3428+
if audit_only:
3429+
message = f"Auditing {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3430+
3431+
self._write(message)
34013432

34023433
def stop_evaluation_progress(self, success: bool = True) -> None:
34033434
self._write(f"Stopping evaluation with success={success}")

sqlmesh/core/model/definition.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,9 +1097,8 @@ def _audit_metadata_hash_values(self) -> t.List[str]:
10971097

10981098
return metadata
10991099

1100-
def audit_metadata_hash(self) -> t.Tuple[t.List[str], str]:
1101-
hash_values = self._audit_metadata_hash_values()
1102-
return hash_values, hash_data(hash_values)
1100+
def audit_metadata_hash(self) -> str:
1101+
return hash_data(self._audit_metadata_hash_values())
11031102

11041103
@property
11051104
def metadata_hash(self) -> str:

sqlmesh/core/plan/evaluator.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def evaluate(
128128
execution_time=plan.execution_time,
129129
)
130130

131-
self._run_audits_for_metadata_snapshots(plan, snapshots, new_snapshots)
131+
self._run_audits_for_metadata_snapshots(plan, new_snapshots)
132132

133133
push_completion_status = self._push(plan, snapshots, deployability_index_for_creation)
134134
if push_completion_status.is_nothing_to_do:
@@ -550,18 +550,17 @@ def _restatement_intervals_across_all_environments(
550550
def _run_audits_for_metadata_snapshots(
551551
self,
552552
plan: EvaluatablePlan,
553-
snapshots: t.Dict[SnapshotId, Snapshot],
554553
new_snapshots: t.Dict[SnapshotId, Snapshot],
555554
) -> None:
556-
# Step 1: Filter out snapshots that are not categorized as metadata changes on models
555+
# Filter out snapshots that are not categorized as metadata changes on models
557556
metadata_snapshots = []
558557
for snapshot in new_snapshots.values():
559558
if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable:
560559
continue
561560

562561
metadata_snapshots.append(snapshot)
563562

564-
# Step 2: Bulk load their previous snapshots from state
563+
# Bulk load all the previous snapshots
565564
previous_snapshots = self.state_sync.get_snapshots(
566565
[
567566
s.previous_version.snapshot_id(s.name)
@@ -570,29 +569,25 @@ def _run_audits_for_metadata_snapshots(
570569
]
571570
).values()
572571

573-
# Step 3: Compare the audit metadata hashes to determine if there was a change in the audits field
574-
to_be_audited_snapshots = {}
572+
# Check if any of the snapshots have modifications to the audits field by comparing the hashes
573+
audit_snapshots = {}
575574
for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots):
576-
new_audits, new_audits_hash = snapshot.model.audit_metadata_hash()
577-
_, previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
575+
new_audits_hash = snapshot.model.audit_metadata_hash()
576+
previous_audit_hash = previous_snapshot.model.audit_metadata_hash()
578577

579-
if previous_audit_hash != new_audits_hash and new_audits:
580-
snapshot_start = min(i[0] for i in snapshot.intervals)
581-
snapshot_end = max(i[1] for i in snapshot.intervals)
582-
to_be_audited_snapshots[snapshot.snapshot_id] = (snapshot_start, snapshot_end)
578+
if snapshot.model.audits and previous_audit_hash != new_audits_hash:
579+
audit_snapshots[snapshot.snapshot_id] = snapshot
583580

584-
if not to_be_audited_snapshots:
581+
if not audit_snapshots:
585582
return
586583

587-
# Step 4: If there are any snapshots to be audited, we'll reuse the scheduler's
588-
# internals to audit them by utilizing the restatement logic
589-
scheduler = self.create_scheduler(snapshots.values())
584+
# If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
585+
scheduler = self.create_scheduler(audit_snapshots.values())
590586
completion_status = scheduler.audit(
591587
plan.environment,
592588
plan.start,
593589
plan.end,
594590
execution_time=plan.execution_time,
595-
restatements=to_be_audited_snapshots,
596591
end_bounded=plan.end_bounded,
597592
interval_end_per_model=plan.interval_end_per_model,
598593
)

0 commit comments

Comments
 (0)