Skip to content

Commit d746f4d

Browse files
treyspsungchun12
andauthored
Feat: improve CLI evaluation progress bar (#4000)
Co-authored-by: Sung Won Chung <sungwonchung3@gmail.com>
1 parent 1ac7032 commit d746f4d

8 files changed

Lines changed: 326 additions & 120 deletions

File tree

sqlmesh/core/config/root.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class Config(BaseConfig):
147147
}
148148

149149
_connection_config_validator = connection_config_validator
150-
_scheduler_config_validator = scheduler_config_validator
150+
_scheduler_config_validator = scheduler_config_validator # type: ignore
151151
_variables_validator = variables_validator
152152

153153
@field_validator("gateways", mode="before")

sqlmesh/core/console.py

Lines changed: 238 additions & 73 deletions
Large diffs are not rendered by default.

sqlmesh/core/scheduler.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlmesh.core.console import Console, get_console
88
from sqlmesh.core.environment import EnvironmentNamingInfo, execute_environment_statements
99
from sqlmesh.core.macros import RuntimeStage
10+
from sqlmesh.core.model.definition import AuditResult
1011
from sqlmesh.core.node import IntervalUnit
1112
from sqlmesh.core.notification_target import (
1213
NotificationEvent,
@@ -166,7 +167,7 @@ def evaluate(
166167
deployability_index: DeployabilityIndex,
167168
batch_index: int,
168169
**kwargs: t.Any,
169-
) -> None:
170+
) -> t.Tuple[t.List[AuditResult], t.List[AuditError]]:
170171
"""Evaluate a snapshot and add the processed interval to the state sync.
171172
172173
Args:
@@ -178,6 +179,9 @@ def evaluate(
178179
batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
179180
auto_restatement_enabled: Whether to enable auto restatements.
180181
kwargs: Additional kwargs to pass to the renderer.
182+
183+
Returns:
184+
Tuple of list of all audit results from the evaluation and list of non-blocking audit errors to warn.
181185
"""
182186
validate_date_range(start, end)
183187

@@ -207,6 +211,7 @@ def evaluate(
207211
)
208212

209213
audit_errors_to_raise: t.List[AuditError] = []
214+
audit_errors_to_warn: t.List[AuditError] = []
210215
for audit_result in (result for result in audit_results if result.count):
211216
error = AuditError(
212217
audit_name=audit_result.audit.name,
@@ -224,15 +229,13 @@ def evaluate(
224229
if audit_result.blocking:
225230
audit_errors_to_raise.append(error)
226231
else:
227-
get_console().log_warning(
228-
f"\n{error}.",
229-
long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}",
230-
)
232+
audit_errors_to_warn.append(error)
231233

232234
if audit_errors_to_raise:
233235
raise NodeAuditsErrors(audit_errors_to_raise)
234236

235237
self.state_sync.add_interval(snapshot, start, end, is_dev=not is_deployable)
238+
return audit_results, audit_errors_to_warn
236239

237240
def run(
238241
self,
@@ -463,21 +466,42 @@ def evaluate_node(node: SchedulingUnit) -> None:
463466
execution_start_ts = now_timestamp()
464467
evaluation_duration_ms: t.Optional[int] = None
465468

469+
audit_results: t.List[AuditResult] = []
470+
audit_errors_to_warn: t.List[AuditError] = []
466471
try:
467472
assert execution_time # mypy
468473
assert deployability_index # mypy
469-
self.evaluate(
474+
audit_results, audit_errors_to_warn = self.evaluate(
470475
snapshot=snapshot,
471476
start=start,
472477
end=end,
473478
execution_time=execution_time,
474479
deployability_index=deployability_index,
475480
batch_index=batch_idx,
476481
)
482+
483+
for audit_error in audit_errors_to_warn:
484+
display_name = snapshot.display_name(
485+
environment_naming_info,
486+
self.default_catalog,
487+
self.snapshot_evaluator.adapter.dialect,
488+
)
489+
self.console.log_warning(
490+
f"\n{display_name}: {audit_error}.",
491+
f"{audit_error}. Audit query:\n{audit_error.query.sql(audit_error.adapter_dialect)}",
492+
)
493+
477494
evaluation_duration_ms = now_timestamp() - execution_start_ts
478495
finally:
496+
num_audits = len(audit_results)
497+
num_audits_failed = sum(1 for result in audit_results if result.count)
479498
self.console.update_snapshot_evaluation_progress(
480-
snapshot, batch_idx, evaluation_duration_ms
499+
snapshot,
500+
batched_intervals[snapshot][batch_idx],
501+
batch_idx,
502+
evaluation_duration_ms,
503+
num_audits - num_audits_failed,
504+
num_audits_failed,
481505
)
482506

483507
try:

sqlmesh/core/snapshot/definition.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,6 +1559,15 @@ def display_name(
15591559
Returns the model name as a qualified view name.
15601560
This is just used for presenting information back to the user and `qualified_view_name` should be used
15611561
when wanting a view name in all other cases.
1562+
1563+
Args:
1564+
snapshot_info_like: The snapshot info object to get the display name for
1565+
environment_naming_info: Environment naming info to use for display name formatting
1566+
default_catalog: Optional default catalog name to use. If None, the default catalog will always be included in the display name.
1567+
dialect: Optional dialect type to use for name formatting
1568+
1569+
Returns:
1570+
The formatted display name as a string
15621571
"""
15631572
if snapshot_info_like.is_audit:
15641573
return snapshot_info_like.name

tests/cli/test_cli.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,22 @@ def assert_new_env(result, new_env="prod", from_env="prod", initialize=True) ->
128128
) in result.output
129129

130130

131-
def assert_model_versions_created(result) -> None:
132-
assert "Model versions created successfully" in result.output
131+
def assert_physical_layer_updated(result) -> None:
132+
assert "Physical layer updated" in result.output
133133

134134

135135
def assert_model_batches_executed(result) -> None:
136-
assert "Model batches executed successfully" in result.output
136+
assert "Model batches executed" in result.output
137137

138138

139-
def assert_target_env_updated(result) -> None:
140-
assert "Target environment updated successfully" in result.output
139+
def assert_virtual_layer_updated(result) -> None:
140+
assert "Virtual layer updated" in result.output
141141

142142

143143
def assert_backfill_success(result) -> None:
144-
assert_model_versions_created(result)
144+
assert_physical_layer_updated(result)
145145
assert_model_batches_executed(result)
146-
assert_target_env_updated(result)
146+
assert_virtual_layer_updated(result)
147147

148148

149149
def assert_plan_success(result, new_env="prod", from_env="prod") -> None:
@@ -154,7 +154,7 @@ def assert_plan_success(result, new_env="prod", from_env="prod") -> None:
154154

155155

156156
def assert_virtual_update(result) -> None:
157-
assert "Virtual Update executed successfully" in result.output
157+
assert "Virtual Update executed" in result.output
158158

159159

160160
def test_version(runner, tmp_path):
@@ -242,9 +242,9 @@ def test_plan_restate_model(runner, tmp_path):
242242
assert result.exit_code == 0
243243
assert_duckdb_test(result)
244244
assert "No changes to plan: project files match the `prod` environment" in result.output
245-
assert "sqlmesh_example.full_model evaluated in" in result.output
245+
assert "sqlmesh_example.full_model [full refresh" in result.output
246246
assert_model_batches_executed(result)
247-
assert_target_env_updated(result)
247+
assert_virtual_layer_updated(result)
248248

249249

250250
@pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"])
@@ -268,7 +268,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag):
268268
)
269269
assert result.exit_code == 0
270270
assert_virtual_update(result)
271-
assert "Model batches executed successfully" not in result.output
271+
assert "Model batches executed" not in result.output
272272

273273

274274
def test_plan_auto_apply(runner, tmp_path):
@@ -282,7 +282,7 @@ def test_plan_auto_apply(runner, tmp_path):
282282

283283
# confirm verbose output not present
284284
assert "sqlmesh_example.seed_model created" not in result.output
285-
assert "sqlmesh_example.seed_model promoted" not in result.output
285+
assert "sqlmesh_example.seed_model updated" not in result.output
286286

287287

288288
def test_plan_verbose(runner, tmp_path):
@@ -293,8 +293,8 @@ def test_plan_verbose(runner, tmp_path):
293293
cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "--verbose"], input="y\n"
294294
)
295295
assert_plan_success(result)
296-
assert "sqlmesh_example.seed_model created" in result.output
297-
assert "sqlmesh_example.seed_model promoted" in result.output
296+
assert "sqlmesh_example.seed_model created" in result.output
297+
assert "sqlmesh_example.seed_model promoted" in result.output
298298

299299

300300
def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path):
@@ -396,7 +396,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path):
396396
)
397397
assert result.exit_code == 0
398398
assert_new_env(result, "dev2", "dev", initialize=False)
399-
assert_target_env_updated(result)
399+
assert_virtual_layer_updated(result)
400400
assert_virtual_update(result)
401401

402402

@@ -495,9 +495,9 @@ def test_plan_dev_no_prompts(runner, tmp_path):
495495
cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "dev", "--no-prompts"]
496496
)
497497
assert "Apply - Backfill Tables [y/n]: " in result.output
498-
assert "Model versions created successfully" not in result.output
499-
assert "Model batches executed successfully" not in result.output
500-
assert "The target environment has been updated successfully" not in result.output
498+
assert "Physical layer updated" not in result.output
499+
assert "Model batches executed" not in result.output
500+
assert "The target environment has been updated" not in result.output
501501

502502

503503
def test_plan_dev_auto_apply(runner, tmp_path):
@@ -533,7 +533,7 @@ def test_plan_dev_no_changes(runner, tmp_path):
533533
)
534534
assert result.exit_code == 0
535535
assert_new_env(result, "dev", initialize=False)
536-
assert_target_env_updated(result)
536+
assert_virtual_layer_updated(result)
537537
assert_virtual_update(result)
538538

539539

@@ -552,8 +552,8 @@ def test_plan_nonbreaking(runner, tmp_path):
552552
assert "+ 'a' AS new_col" in result.output
553553
assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output
554554
assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output
555-
assert "sqlmesh_example.incremental_model evaluated in" in result.output
556-
assert "sqlmesh_example.full_model evaluated in" not in result.output
555+
assert "sqlmesh_example.incremental_model [insert" in result.output
556+
assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output
557557
assert_backfill_success(result)
558558

559559

@@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path):
610610
assert result.exit_code == 0
611611
assert "+ item_id + 1 AS item_id," in result.output
612612
assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output
613-
assert "sqlmesh_example.full_model evaluated in" in result.output
614-
assert "sqlmesh_example.incremental_model evaluated in" not in result.output
613+
assert "sqlmesh_example.full_model [full refresh" in result.output
614+
assert "sqlmesh_example.incremental_model [insert" not in result.output
615615
assert_backfill_success(result)
616616

617617

@@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path):
649649
assert "+ item_id + 1 AS item_id," not in result.output
650650
assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output
651651
# only incremental_model backfilled
652-
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
653-
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
652+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
653+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
654654
assert_backfill_success(result)
655655

656656

@@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path):
688688
"Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output
689689
)
690690
# only incremental_model backfilled
691-
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
692-
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
691+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
692+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
693693
assert_backfill_success(result)
694694

695695

tests/core/test_context.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,9 @@ def test_override_builtin_audit_blocking_mode():
470470
plan = context.plan(auto_apply=True, no_prompts=True)
471471
new_snapshot = next(iter(plan.context_diff.new_snapshots.values()))
472472

473-
assert mock_logger.call_args_list[0][0][0] == "\n'not_null' audit error: 1 row failed."
473+
assert (
474+
mock_logger.call_args_list[0][0][0] == "\ndb.x: 'not_null' audit error: 1 row failed."
475+
)
474476

475477
# Even though there are two builtin audits referenced in the above definition, we only
476478
# store the one that overrides `blocking` in the snapshot; the other one isn't needed
@@ -1401,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog
14011403
log = caplog.text
14021404
assert "'not_null' audit error:" in log
14031405
assert "'at_least_one_non_blocking' audit error:" in log
1404-
assert "Target environment updated successfully" in stdout
1406+
assert "Virtual layer updated" in stdout
14051407

14061408

14071409
def test_environment_statements(tmp_path: pathlib.Path):

tests/integrations/jupyter/test_magics.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,22 +291,22 @@ def test_plan(
291291

292292
# TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for
293293
# the models and how long it took
294-
assert len(output.stdout.strip().split("\n")) == 24
294+
assert len(output.stdout.strip().split("\n")) == 46
295295
assert not output.stderr
296296
assert len(output.outputs) == 4
297297
text_output = convert_all_html_output_to_text(output)
298298
# TODO: Is this what we expect?
299299
# This has minor differences between CI/CD and local.
300300
assert "[2K" in text_output[0]
301301
assert text_output[1].startswith(
302-
"Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%"
302+
"Updating virtual layer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0%"
303303
)
304304
# TODO: Is this what we expect?
305305
assert text_output[2] == ""
306-
assert text_output[3] == "Target environment updated successfully"
306+
assert text_output[3] == "✔ Virtual layer updated"
307307
assert convert_all_html_output_to_tags(output) == [
308308
["pre", "span"],
309-
["pre"] + ["span"] * 4,
309+
["pre"] + ["span"] * 5,
310310
["pre"],
311311
["pre", "span"],
312312
]
@@ -326,7 +326,7 @@ def test_run_dag(
326326
assert not output.stderr
327327
assert len(output.outputs) == 2
328328
assert convert_all_html_output_to_text(output) == [
329-
"Model batches executed successfully",
329+
"Model batches executed",
330330
"Run finished for environment 'prod'",
331331
]
332332
assert get_all_html_output(output) == [
@@ -337,7 +337,7 @@ def test_run_dag(
337337
h(
338338
"span",
339339
{"style": SUCCESS_STYLE},
340-
"Model batches executed successfully",
340+
"Model batches executed",
341341
autoescape=False,
342342
),
343343
autoescape=False,

web/server/console.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from fastapi.encoders import jsonable_encoder
99
from sse_starlette.sse import ServerSentEvent
10-
10+
from sqlmesh.core.snapshot.definition import Interval
1111
from sqlmesh.core.console import TerminalConsole
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
@@ -91,7 +91,7 @@ def stop_restate_progress(self, success: bool) -> None:
9191

9292
def start_evaluation_progress(
9393
self,
94-
batches: t.Dict[Snapshot, int],
94+
batch_sizes: t.Dict[Snapshot, int],
9595
environment_naming_info: EnvironmentNamingInfo,
9696
default_catalog: t.Optional[str],
9797
) -> None:
@@ -104,7 +104,7 @@ def start_evaluation_progress(
104104
name=snapshot.name,
105105
view_name=snapshot.display_name(environment_naming_info, default_catalog),
106106
)
107-
for snapshot, total_tasks in batches.items()
107+
for snapshot, total_tasks in batch_sizes.items()
108108
}
109109
self.plan_apply_stage_tracker.add_stage(
110110
models.PlanStage.backfill,
@@ -123,7 +123,13 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
123123
self.log_event_plan_apply()
124124

125125
def update_snapshot_evaluation_progress(
126-
self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
126+
self,
127+
snapshot: Snapshot,
128+
interval: Interval,
129+
batch_idx: int,
130+
duration_ms: t.Optional[int],
131+
num_audits_passed: int,
132+
num_audits_failed: int,
127133
) -> None:
128134
if self.plan_apply_stage_tracker and self.plan_apply_stage_tracker.backfill:
129135
task = self.plan_apply_stage_tracker.backfill.tasks[snapshot.name]

0 commit comments

Comments
 (0)