Skip to content

Commit 5409a22

Browse files
committed
Overhaul CLI evaluation progress bar
1 parent 633d8c2 commit 5409a22

8 files changed

Lines changed: 263 additions & 63 deletions

File tree

sqlmesh/core/config/root.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class Config(BaseConfig):
147147
}
148148

149149
_connection_config_validator = connection_config_validator
150-
_scheduler_config_validator = scheduler_config_validator
150+
_scheduler_config_validator = scheduler_config_validator # type: ignore
151151
_variables_validator = variables_validator
152152

153153
@field_validator("gateways", mode="before")

sqlmesh/core/console.py

Lines changed: 215 additions & 30 deletions
Large diffs are not rendered by default.

sqlmesh/core/scheduler.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ def evaluate(
165165
execution_time: TimeLike,
166166
deployability_index: DeployabilityIndex,
167167
batch_index: int,
168+
environment_naming_info: EnvironmentNamingInfo,
169+
default_catalog: t.Optional[str],
168170
**kwargs: t.Any,
169171
) -> None:
170172
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -205,6 +207,7 @@ def evaluate(
205207
wap_id=wap_id,
206208
**kwargs,
207209
)
210+
get_console().store_evaluation_audit_results(snapshot, audit_results)
208211

209212
audit_errors_to_raise: t.List[AuditError] = []
210213
for audit_result in (result for result in audit_results if result.count):
@@ -224,8 +227,13 @@ def evaluate(
224227
if audit_result.blocking:
225228
audit_errors_to_raise.append(error)
226229
else:
230+
display_name = snapshot.display_name(
231+
environment_naming_info,
232+
default_catalog,
233+
self.snapshot_evaluator.adapter.dialect,
234+
)
227235
get_console().log_warning(
228-
f"\n{error}.",
236+
f"\n{display_name}: {error}.",
229237
long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}",
230238
)
231239

@@ -424,7 +432,7 @@ def run_merged_intervals(
424432
batched_intervals = self.batch_intervals(merged_intervals)
425433

426434
self.console.start_evaluation_progress(
427-
{snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()},
435+
batched_intervals,
428436
environment_naming_info,
429437
self.default_catalog,
430438
)
@@ -473,6 +481,8 @@ def evaluate_node(node: SchedulingUnit) -> None:
473481
execution_time=execution_time,
474482
deployability_index=deployability_index,
475483
batch_index=batch_idx,
484+
environment_naming_info=environment_naming_info,
485+
default_catalog=self.default_catalog,
476486
)
477487
evaluation_duration_ms = now_timestamp() - execution_start_ts
478488
finally:

tests/cli/test_cli.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -132,18 +132,18 @@ def assert_model_versions_created(result) -> None:
132132
assert "Model versions created successfully" in result.output
133133

134134

135-
def assert_model_batches_executed(result) -> None:
136-
assert "Model batches executed successfully" in result.output
135+
def assert_model_batches_evaluated(result) -> None:
136+
assert "Model batches evaluated successfully" in result.output
137137

138138

139-
def assert_target_env_updated(result) -> None:
140-
assert "Target environment updated successfully" in result.output
139+
def assert_env_views_updated(result) -> None:
140+
assert "Environment views updated successfully" in result.output
141141

142142

143143
def assert_backfill_success(result) -> None:
144144
assert_model_versions_created(result)
145-
assert_model_batches_executed(result)
146-
assert_target_env_updated(result)
145+
assert_model_batches_evaluated(result)
146+
assert_env_views_updated(result)
147147

148148

149149
def assert_plan_success(result, new_env="prod", from_env="prod") -> None:
@@ -220,9 +220,9 @@ def test_plan_restate_model(runner, tmp_path):
220220
assert result.exit_code == 0
221221
assert_duckdb_test(result)
222222
assert "No changes to plan: project files match the `prod` environment" in result.output
223-
assert "sqlmesh_example.full_model evaluated in" in result.output
224-
assert_model_batches_executed(result)
225-
assert_target_env_updated(result)
223+
assert "sqlmesh_example.full_model [full refresh" in result.output
224+
assert_model_batches_evaluated(result)
225+
assert_env_views_updated(result)
226226

227227

228228
@pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"])
@@ -260,7 +260,7 @@ def test_plan_auto_apply(runner, tmp_path):
260260

261261
# confirm verbose output not present
262262
assert "sqlmesh_example.seed_model created" not in result.output
263-
assert "sqlmesh_example.seed_model promoted" not in result.output
263+
assert "sqlmesh_example.seed_model updated" not in result.output
264264

265265

266266
def test_plan_verbose(runner, tmp_path):
@@ -272,7 +272,7 @@ def test_plan_verbose(runner, tmp_path):
272272
)
273273
assert_plan_success(result)
274274
assert "sqlmesh_example.seed_model created" in result.output
275-
assert "sqlmesh_example.seed_model promoted" in result.output
275+
assert "sqlmesh_example.seed_model updated" in result.output
276276

277277

278278
def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path):
@@ -374,7 +374,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path):
374374
)
375375
assert result.exit_code == 0
376376
assert_new_env(result, "dev2", "dev", initialize=False)
377-
assert_target_env_updated(result)
377+
assert_env_views_updated(result)
378378
assert_virtual_update(result)
379379

380380

@@ -511,7 +511,7 @@ def test_plan_dev_no_changes(runner, tmp_path):
511511
)
512512
assert result.exit_code == 0
513513
assert_new_env(result, "dev", initialize=False)
514-
assert_target_env_updated(result)
514+
assert_env_views_updated(result)
515515
assert_virtual_update(result)
516516

517517

@@ -530,8 +530,8 @@ def test_plan_nonbreaking(runner, tmp_path):
530530
assert "+ 'a' AS new_col" in result.output
531531
assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output
532532
assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output
533-
assert "sqlmesh_example.incremental_model evaluated in" in result.output
534-
assert "sqlmesh_example.full_model evaluated in" not in result.output
533+
assert "sqlmesh_example.incremental_model [insert" in result.output
534+
assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output
535535
assert_backfill_success(result)
536536

537537

@@ -588,8 +588,8 @@ def test_plan_breaking(runner, tmp_path):
588588
assert result.exit_code == 0
589589
assert "+ item_id + 1 AS item_id," in result.output
590590
assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output
591-
assert "sqlmesh_example.full_model evaluated in" in result.output
592-
assert "sqlmesh_example.incremental_model evaluated in" not in result.output
591+
assert "sqlmesh_example.full_model [full refresh" in result.output
592+
assert "sqlmesh_example.incremental_model [insert" not in result.output
593593
assert_backfill_success(result)
594594

595595

@@ -627,8 +627,8 @@ def test_plan_dev_select(runner, tmp_path):
627627
assert "+ item_id + 1 AS item_id," not in result.output
628628
assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output
629629
# only incremental_model backfilled
630-
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
631-
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
630+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
631+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
632632
assert_backfill_success(result)
633633

634634

@@ -666,8 +666,8 @@ def test_plan_dev_backfill(runner, tmp_path):
666666
"Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output
667667
)
668668
# only incremental_model backfilled
669-
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
670-
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
669+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
670+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
671671
assert_backfill_success(result)
672672

673673

@@ -696,7 +696,7 @@ def test_run_dev(runner, tmp_path, flag):
696696
# Confirm backfill occurs when we run non-backfilled dev env
697697
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run", "dev"])
698698
assert result.exit_code == 0
699-
assert_model_batches_executed(result)
699+
assert_model_batches_evaluated(result)
700700

701701

702702
@time_machine.travel(FREEZE_TIME)
@@ -728,7 +728,7 @@ def test_run_cron_elapsed(runner, tmp_path):
728728
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run"])
729729

730730
assert result.exit_code == 0
731-
assert_model_batches_executed(result)
731+
assert_model_batches_evaluated(result)
732732

733733

734734
def test_clean(runner, tmp_path):

tests/core/test_context.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,9 @@ def test_override_builtin_audit_blocking_mode():
470470
plan = context.plan(auto_apply=True, no_prompts=True)
471471
new_snapshot = next(iter(plan.context_diff.new_snapshots.values()))
472472

473-
assert mock_logger.call_args_list[0][0][0] == "\n'not_null' audit error: 1 row failed."
473+
assert (
474+
mock_logger.call_args_list[0][0][0] == "\ndb.x: 'not_null' audit error: 1 row failed."
475+
)
474476

475477
# Even though there are two builtin audits referenced in the above definition, we only
476478
# store the one that overrides `blocking` in the snapshot; the other one isn't needed
@@ -1401,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog
14011403
log = caplog.text
14021404
assert "'not_null' audit error:" in log
14031405
assert "'at_least_one_non_blocking' audit error:" in log
1404-
assert "Target environment updated successfully" in stdout
1406+
assert "Environment views updated successfully" in stdout
14051407

14061408

14071409
def test_environment_statements(tmp_path: pathlib.Path):

tests/core/test_scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,8 @@ def _evaluate():
528528
to_datetime("2022-01-03"),
529529
DeployabilityIndex.all_deployable(),
530530
0,
531+
EnvironmentNamingInfo(),
532+
None,
531533
)
532534

533535
evaluator_audit_mock.return_value = [

tests/integrations/jupyter/test_magics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ def test_plan(
291291

292292
# TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for
293293
# the models and how long it took
294-
assert len(output.stdout.strip().split("\n")) == 24
294+
assert len(output.stdout.strip().split("\n")) == 46
295295
assert not output.stderr
296296
assert len(output.outputs) == 4
297297
text_output = convert_all_html_output_to_text(output)
@@ -326,7 +326,7 @@ def test_run_dag(
326326
assert not output.stderr
327327
assert len(output.outputs) == 2
328328
assert convert_all_html_output_to_text(output) == [
329-
"Model batches executed successfully",
329+
"Model batches evaluated successfully",
330330
"Run finished for environment 'prod'",
331331
]
332332
assert get_all_html_output(output) == [
@@ -337,7 +337,7 @@ def test_run_dag(
337337
h(
338338
"span",
339339
{"style": SUCCESS_STYLE},
340-
"Model batches executed successfully",
340+
"Model batches evaluated successfully",
341341
autoescape=False,
342342
),
343343
autoescape=False,

web/server/console.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
1414
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
15+
from sqlmesh.core.snapshot.definition import Intervals
1516
from sqlmesh.core.test import ModelTest
1617
from sqlmesh.utils.date import now_timestamp
1718
from web.server import models
@@ -91,7 +92,7 @@ def stop_restate_progress(self, success: bool) -> None:
9192

9293
def start_evaluation_progress(
9394
self,
94-
batches: t.Dict[Snapshot, int],
95+
batched_intervals: t.Dict[Snapshot, Intervals],
9596
environment_naming_info: EnvironmentNamingInfo,
9697
default_catalog: t.Optional[str],
9798
) -> None:
@@ -104,7 +105,7 @@ def start_evaluation_progress(
104105
name=snapshot.name,
105106
view_name=snapshot.display_name(environment_naming_info, default_catalog),
106107
)
107-
for snapshot, total_tasks in batches.items()
108+
for snapshot, total_tasks in batched_intervals.items()
108109
}
109110
self.plan_apply_stage_tracker.add_stage(
110111
models.PlanStage.backfill,

0 commit comments

Comments
 (0)