Skip to content

Commit 06a87d4

Browse files
committed
Overhaul CLI evaluation progress bar
1 parent bb9826a commit 06a87d4

8 files changed

Lines changed: 263 additions & 63 deletions

File tree

sqlmesh/core/config/root.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class Config(BaseConfig):
147147
}
148148

149149
_connection_config_validator = connection_config_validator
150-
_scheduler_config_validator = scheduler_config_validator
150+
_scheduler_config_validator = scheduler_config_validator # type: ignore
151151
_variables_validator = variables_validator
152152

153153
@field_validator("gateways", mode="before")

sqlmesh/core/console.py

Lines changed: 215 additions & 30 deletions
Large diffs are not rendered by default.

sqlmesh/core/scheduler.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ def evaluate(
165165
execution_time: TimeLike,
166166
deployability_index: DeployabilityIndex,
167167
batch_index: int,
168+
environment_naming_info: EnvironmentNamingInfo,
169+
default_catalog: t.Optional[str],
168170
**kwargs: t.Any,
169171
) -> None:
170172
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -205,6 +207,7 @@ def evaluate(
205207
wap_id=wap_id,
206208
**kwargs,
207209
)
210+
get_console().store_evaluation_audit_results(snapshot, audit_results)
208211

209212
audit_errors_to_raise: t.List[AuditError] = []
210213
for audit_result in (result for result in audit_results if result.count):
@@ -224,8 +227,13 @@ def evaluate(
224227
if audit_result.blocking:
225228
audit_errors_to_raise.append(error)
226229
else:
230+
display_name = snapshot.display_name(
231+
environment_naming_info,
232+
default_catalog,
233+
self.snapshot_evaluator.adapter.dialect,
234+
)
227235
get_console().log_warning(
228-
f"\n{error}.",
236+
f"\n{display_name}: {error}.",
229237
long_message=f"{error}. Audit query:\n{error.query.sql(error.adapter_dialect)}",
230238
)
231239

@@ -424,7 +432,7 @@ def run_merged_intervals(
424432
batched_intervals = self.batch_intervals(merged_intervals)
425433

426434
self.console.start_evaluation_progress(
427-
{snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()},
435+
batched_intervals,
428436
environment_naming_info,
429437
self.default_catalog,
430438
)
@@ -473,6 +481,8 @@ def evaluate_node(node: SchedulingUnit) -> None:
473481
execution_time=execution_time,
474482
deployability_index=deployability_index,
475483
batch_index=batch_idx,
484+
environment_naming_info=environment_naming_info,
485+
default_catalog=self.default_catalog,
476486
)
477487
evaluation_duration_ms = now_timestamp() - execution_start_ts
478488
finally:

tests/cli/test_cli.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -132,18 +132,18 @@ def assert_model_versions_created(result) -> None:
132132
assert "Model versions created successfully" in result.output
133133

134134

135-
def assert_model_batches_executed(result) -> None:
136-
assert "Model batches executed successfully" in result.output
135+
def assert_model_batches_evaluated(result) -> None:
136+
assert "Model batches evaluated successfully" in result.output
137137

138138

139-
def assert_target_env_updated(result) -> None:
140-
assert "Target environment updated successfully" in result.output
139+
def assert_env_views_updated(result) -> None:
140+
assert "Environment views updated successfully" in result.output
141141

142142

143143
def assert_backfill_success(result) -> None:
144144
assert_model_versions_created(result)
145-
assert_model_batches_executed(result)
146-
assert_target_env_updated(result)
145+
assert_model_batches_evaluated(result)
146+
assert_env_views_updated(result)
147147

148148

149149
def assert_plan_success(result, new_env="prod", from_env="prod") -> None:
@@ -242,9 +242,9 @@ def test_plan_restate_model(runner, tmp_path):
242242
assert result.exit_code == 0
243243
assert_duckdb_test(result)
244244
assert "No changes to plan: project files match the `prod` environment" in result.output
245-
assert "sqlmesh_example.full_model evaluated in" in result.output
246-
assert_model_batches_executed(result)
247-
assert_target_env_updated(result)
245+
assert "sqlmesh_example.full_model [full refresh" in result.output
246+
assert_model_batches_evaluated(result)
247+
assert_env_views_updated(result)
248248

249249

250250
@pytest.mark.parametrize("flag", ["--skip-backfill", "--dry-run"])
@@ -282,7 +282,7 @@ def test_plan_auto_apply(runner, tmp_path):
282282

283283
# confirm verbose output not present
284284
assert "sqlmesh_example.seed_model created" not in result.output
285-
assert "sqlmesh_example.seed_model promoted" not in result.output
285+
assert "sqlmesh_example.seed_model updated" not in result.output
286286

287287

288288
def test_plan_verbose(runner, tmp_path):
@@ -294,7 +294,7 @@ def test_plan_verbose(runner, tmp_path):
294294
)
295295
assert_plan_success(result)
296296
assert "sqlmesh_example.seed_model created" in result.output
297-
assert "sqlmesh_example.seed_model promoted" in result.output
297+
assert "sqlmesh_example.seed_model updated" in result.output
298298

299299

300300
def test_plan_very_verbose(runner, tmp_path, copy_to_temp_path):
@@ -396,7 +396,7 @@ def test_plan_dev_create_from_virtual(runner, tmp_path):
396396
)
397397
assert result.exit_code == 0
398398
assert_new_env(result, "dev2", "dev", initialize=False)
399-
assert_target_env_updated(result)
399+
assert_env_views_updated(result)
400400
assert_virtual_update(result)
401401

402402

@@ -533,7 +533,7 @@ def test_plan_dev_no_changes(runner, tmp_path):
533533
)
534534
assert result.exit_code == 0
535535
assert_new_env(result, "dev", initialize=False)
536-
assert_target_env_updated(result)
536+
assert_env_views_updated(result)
537537
assert_virtual_update(result)
538538

539539

@@ -552,8 +552,8 @@ def test_plan_nonbreaking(runner, tmp_path):
552552
assert "+ 'a' AS new_col" in result.output
553553
assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output
554554
assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output
555-
assert "sqlmesh_example.incremental_model evaluated in" in result.output
556-
assert "sqlmesh_example.full_model evaluated in" not in result.output
555+
assert "sqlmesh_example.incremental_model [insert" in result.output
556+
assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output
557557
assert_backfill_success(result)
558558

559559

@@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path):
610610
assert result.exit_code == 0
611611
assert "+ item_id + 1 AS item_id," in result.output
612612
assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output
613-
assert "sqlmesh_example.full_model evaluated in" in result.output
614-
assert "sqlmesh_example.incremental_model evaluated in" not in result.output
613+
assert "sqlmesh_example.full_model [full refresh" in result.output
614+
assert "sqlmesh_example.incremental_model [insert" not in result.output
615615
assert_backfill_success(result)
616616

617617

@@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path):
649649
assert "+ item_id + 1 AS item_id," not in result.output
650650
assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output
651651
# only incremental_model backfilled
652-
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
653-
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
652+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
653+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
654654
assert_backfill_success(result)
655655

656656

@@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path):
688688
"Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output
689689
)
690690
# only incremental_model backfilled
691-
assert "sqlmesh_example__dev.incremental_model evaluated in" in result.output
692-
assert "sqlmesh_example__dev.full_model evaluated in" not in result.output
691+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
692+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
693693
assert_backfill_success(result)
694694

695695

@@ -718,7 +718,7 @@ def test_run_dev(runner, tmp_path, flag):
718718
# Confirm backfill occurs when we run non-backfilled dev env
719719
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run", "dev"])
720720
assert result.exit_code == 0
721-
assert_model_batches_executed(result)
721+
assert_model_batches_evaluated(result)
722722

723723

724724
@time_machine.travel(FREEZE_TIME)
@@ -750,7 +750,7 @@ def test_run_cron_elapsed(runner, tmp_path):
750750
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "run"])
751751

752752
assert result.exit_code == 0
753-
assert_model_batches_executed(result)
753+
assert_model_batches_evaluated(result)
754754

755755

756756
def test_clean(runner, tmp_path):

tests/core/test_context.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,9 @@ def test_override_builtin_audit_blocking_mode():
470470
plan = context.plan(auto_apply=True, no_prompts=True)
471471
new_snapshot = next(iter(plan.context_diff.new_snapshots.values()))
472472

473-
assert mock_logger.call_args_list[0][0][0] == "\n'not_null' audit error: 1 row failed."
473+
assert (
474+
mock_logger.call_args_list[0][0][0] == "\ndb.x: 'not_null' audit error: 1 row failed."
475+
)
474476

475477
# Even though there are two builtin audits referenced in the above definition, we only
476478
# store the one that overrides `blocking` in the snapshot; the other one isn't needed
@@ -1401,7 +1403,7 @@ def test_plan_runs_audits_on_dev_previews(sushi_context: Context, capsys, caplog
14011403
log = caplog.text
14021404
assert "'not_null' audit error:" in log
14031405
assert "'at_least_one_non_blocking' audit error:" in log
1404-
assert "Target environment updated successfully" in stdout
1406+
assert "Environment views updated successfully" in stdout
14051407

14061408

14071409
def test_environment_statements(tmp_path: pathlib.Path):

tests/core/test_scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,8 @@ def _evaluate():
528528
to_datetime("2022-01-03"),
529529
DeployabilityIndex.all_deployable(),
530530
0,
531+
EnvironmentNamingInfo(),
532+
None,
531533
)
532534

533535
evaluator_audit_mock.return_value = [

tests/integrations/jupyter/test_magics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ def test_plan(
291291

292292
# TODO: Should this be going to stdout? This is printing the status updates for when each batch finishes for
293293
# the models and how long it took
294-
assert len(output.stdout.strip().split("\n")) == 24
294+
assert len(output.stdout.strip().split("\n")) == 46
295295
assert not output.stderr
296296
assert len(output.outputs) == 4
297297
text_output = convert_all_html_output_to_text(output)
@@ -326,7 +326,7 @@ def test_run_dag(
326326
assert not output.stderr
327327
assert len(output.outputs) == 2
328328
assert convert_all_html_output_to_text(output) == [
329-
"Model batches executed successfully",
329+
"Model batches evaluated successfully",
330330
"Run finished for environment 'prod'",
331331
]
332332
assert get_all_html_output(output) == [
@@ -337,7 +337,7 @@ def test_run_dag(
337337
h(
338338
"span",
339339
{"style": SUCCESS_STYLE},
340-
"Model batches executed successfully",
340+
"Model batches evaluated successfully",
341341
autoescape=False,
342342
),
343343
autoescape=False,

web/server/console.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
1414
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
15+
from sqlmesh.core.snapshot.definition import Intervals
1516
from sqlmesh.core.test import ModelTest
1617
from sqlmesh.utils.date import now_timestamp
1718
from web.server import models
@@ -91,7 +92,7 @@ def stop_restate_progress(self, success: bool) -> None:
9192

9293
def start_evaluation_progress(
9394
self,
94-
batches: t.Dict[Snapshot, int],
95+
batched_intervals: t.Dict[Snapshot, Intervals],
9596
environment_naming_info: EnvironmentNamingInfo,
9697
default_catalog: t.Optional[str],
9798
) -> None:
@@ -104,7 +105,7 @@ def start_evaluation_progress(
104105
name=snapshot.name,
105106
view_name=snapshot.display_name(environment_naming_info, default_catalog),
106107
)
107-
for snapshot, total_tasks in batches.items()
108+
for snapshot, total_tasks in batched_intervals.items()
108109
}
109110
self.plan_apply_stage_tracker.add_stage(
110111
models.PlanStage.backfill,

0 commit comments

Comments
 (0)