Skip to content

Commit fe672c6

Browse files
Feat: Add support for concurrent table diff across all impacted models
1 parent 4041b42 commit fe672c6

3 files changed

Lines changed: 325 additions & 136 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -896,14 +896,21 @@ def create_external_models(obj: Context, **kwargs: t.Any) -> None:
896896
def table_diff(
897897
obj: Context, source_to_target: str, model: t.Optional[str], **kwargs: t.Any
898898
) -> None:
899-
"""Show the diff between two tables."""
899+
"""Show the diff between two tables or all impacted when no model is specified."""
900900
source, target = source_to_target.split(":")
901-
obj.table_diff(
902-
source=source,
903-
target=target,
904-
model_or_snapshot=model,
905-
**kwargs,
906-
)
901+
if model:
902+
obj.table_diff(
903+
source=source,
904+
target=target,
905+
model_or_snapshot=model,
906+
**kwargs,
907+
)
908+
else:
909+
obj.table_diff_impacted_models(
910+
source=source,
911+
target=target,
912+
**kwargs,
913+
)
907914

908915

909916
@cli.command("rewrite")

sqlmesh/core/console.py

Lines changed: 149 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def show_intervals(self, snapshot_intervals: t.Dict[Snapshot, SnapshotIntervals]
197197

198198

199199
class DifferenceConsole(abc.ABC):
200-
"""Console for displaying environment differences"""
200+
"""Console for displaying differences"""
201201

202202
@abc.abstractmethod
203203
def show_environment_difference_summary(
@@ -217,6 +217,20 @@ def show_model_difference_summary(
217217
) -> None:
218218
"""Displays a summary of differences for the given models."""
219219

220+
@abc.abstractmethod
221+
def show_table_diff_summary(self, table_diff: TableDiff) -> None:
222+
"""Display information about the tables being diffed and how they are being joined"""
223+
224+
@abc.abstractmethod
225+
def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
226+
"""Show table schema diff."""
227+
228+
@abc.abstractmethod
229+
def show_row_diff(
230+
self, row_diff: RowDiff, show_sample: bool = True, skip_grain_check: bool = False
231+
) -> None:
232+
"""Show table summary diff."""
233+
220234

221235
class BaseConsole(abc.ABC):
222236
@abc.abstractmethod
@@ -424,20 +438,6 @@ def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
424438
def loading_stop(self, id: uuid.UUID) -> None:
425439
"""Stop loading for the given id."""
426440

427-
@abc.abstractmethod
428-
def show_table_diff_summary(self, table_diff: TableDiff) -> None:
429-
"""Display information about the tables being diffed and how they are being joined"""
430-
431-
@abc.abstractmethod
432-
def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
433-
"""Show table schema diff."""
434-
435-
@abc.abstractmethod
436-
def show_row_diff(
437-
self, row_diff: RowDiff, show_sample: bool = True, skip_grain_check: bool = False
438-
) -> None:
439-
"""Show table summary diff."""
440-
441441

442442
class NoopConsole(Console):
443443
def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
@@ -697,6 +697,7 @@ class TerminalConsole(Console):
697697
"""A rich based implementation of the console."""
698698

699699
TABLE_DIFF_SOURCE_BLUE = "#0248ff"
700+
TABLE_DIFF_TARGET_GREEN = "green"
700701

701702
def __init__(
702703
self,
@@ -1544,39 +1545,58 @@ def _show_summary_tree_for(
15441545
)
15451546
tree.add(self._limit_model_names(removed_tree, self.verbosity))
15461547
if modified_snapshot_ids:
1547-
direct = Tree("[bold][direct]Directly Modified:")
1548-
indirect = Tree("[bold][indirect]Indirectly Modified:")
1549-
metadata = Tree("[bold][metadata]Metadata Updated:")
1550-
for s_id in modified_snapshot_ids:
1551-
name = s_id.name
1552-
display_name = context_diff.snapshots[s_id].display_name(
1553-
environment_naming_info,
1554-
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
1555-
dialect=self.dialect,
1556-
)
1557-
if context_diff.directly_modified(name):
1558-
direct.add(
1559-
f"[direct]{display_name}"
1560-
if no_diff
1561-
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1562-
)
1563-
elif context_diff.indirectly_modified(name):
1564-
indirect.add(f"[indirect]{display_name}")
1565-
elif context_diff.metadata_updated(name):
1566-
metadata.add(
1567-
f"[metadata]{display_name}"
1568-
if no_diff
1569-
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1570-
)
1548+
tree = self._add_modified_models(
1549+
context_diff,
1550+
modified_snapshot_ids,
1551+
tree,
1552+
environment_naming_info,
1553+
default_catalog,
1554+
no_diff,
1555+
)
15711556

1572-
if direct.children:
1573-
tree.add(direct)
1574-
if indirect.children:
1575-
tree.add(self._limit_model_names(indirect, self.verbosity))
1576-
if metadata.children:
1577-
tree.add(metadata)
15781557
self._print(tree)
15791558

1559+
def _add_modified_models(
1560+
self,
1561+
context_diff: ContextDiff,
1562+
modified_snapshot_ids: t.Set[SnapshotId],
1563+
tree: Tree,
1564+
environment_naming_info: EnvironmentNamingInfo,
1565+
default_catalog: t.Optional[str] = None,
1566+
no_diff: bool = True,
1567+
) -> Tree:
1568+
direct = Tree("[bold][direct]Directly Modified:")
1569+
indirect = Tree("[bold][indirect]Indirectly Modified:")
1570+
metadata = Tree("[bold][metadata]Metadata Updated:")
1571+
for s_id in modified_snapshot_ids:
1572+
name = s_id.name
1573+
display_name = context_diff.snapshots[s_id].display_name(
1574+
environment_naming_info,
1575+
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
1576+
dialect=self.dialect,
1577+
)
1578+
if context_diff.directly_modified(name):
1579+
direct.add(
1580+
f"[direct]{display_name}"
1581+
if no_diff
1582+
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1583+
)
1584+
elif context_diff.indirectly_modified(name):
1585+
indirect.add(f"[indirect]{display_name}")
1586+
elif context_diff.metadata_updated(name):
1587+
metadata.add(
1588+
f"[metadata]{display_name}"
1589+
if no_diff
1590+
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1591+
)
1592+
if direct.children:
1593+
tree.add(direct)
1594+
if indirect.children:
1595+
tree.add(self._limit_model_names(indirect, self.verbosity))
1596+
if metadata.children:
1597+
tree.add(metadata)
1598+
return tree
1599+
15801600
def _show_options_after_categorization(
15811601
self,
15821602
plan_builder: PlanBuilder,
@@ -1897,7 +1917,9 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
18971917
)
18981918
envs.add(source)
18991919

1900-
target = Tree(f"Target: [green]{table_diff.target_alias}[/green]")
1920+
target = Tree(
1921+
f"Target: [{self.TABLE_DIFF_TARGET_GREEN}]{table_diff.target_alias}[/{self.TABLE_DIFF_TARGET_GREEN}]"
1922+
)
19011923
envs.add(target)
19021924

19031925
tree.add(envs)
@@ -1907,7 +1929,9 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
19071929
tables.add(
19081930
f"Source: [{self.TABLE_DIFF_SOURCE_BLUE}]{table_diff.source}[/{self.TABLE_DIFF_SOURCE_BLUE}]"
19091931
)
1910-
tables.add(f"Target: [green]{table_diff.target}[/green]")
1932+
tables.add(
1933+
f"Target: [{self.TABLE_DIFF_TARGET_GREEN}]{table_diff.target}[/{self.TABLE_DIFF_TARGET_GREEN}]"
1934+
)
19111935

19121936
tree.add(tables)
19131937

@@ -1928,7 +1952,7 @@ def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
19281952
if schema_diff.target_alias:
19291953
target_name = schema_diff.target_alias.upper()
19301954

1931-
first_line = f"\n[b]Schema Diff Between '[{self.TABLE_DIFF_SOURCE_BLUE}]{source_name}[/{self.TABLE_DIFF_SOURCE_BLUE}]' and '[green]{target_name}[/green]'"
1955+
first_line = f"\n[b]Schema Diff Between '[{self.TABLE_DIFF_SOURCE_BLUE}]{source_name}[/{self.TABLE_DIFF_SOURCE_BLUE}]' and '[{self.TABLE_DIFF_TARGET_GREEN}]{target_name}[/{self.TABLE_DIFF_TARGET_GREEN}]'"
19321956
if schema_diff.model_name:
19331957
first_line = (
19341958
first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'"
@@ -2032,7 +2056,7 @@ def show_row_diff(
20322056

20332057
column_styles = {
20342058
source_name: self.TABLE_DIFF_SOURCE_BLUE,
2035-
target_name: "green",
2059+
target_name: self.TABLE_DIFF_TARGET_GREEN,
20362060
}
20372061

20382062
for column, [source_column, target_column] in columns.items():
@@ -2637,23 +2661,11 @@ def show_model_difference_summary(
26372661
no_diff: Hide the actual SQL differences.
26382662
"""
26392663
added_snapshots = {context_diff.snapshots[s_id] for s_id in context_diff.added}
2640-
added_snapshot_models = {s for s in added_snapshots if s.is_model}
2641-
if added_snapshot_models:
2664+
if added_snapshots:
26422665
self._print("\n**Added Models:**")
2643-
added_models = sorted(added_snapshot_models)
2644-
list_length = len(added_models)
2645-
if (
2646-
self.verbosity < Verbosity.VERY_VERBOSE
2647-
and list_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2648-
):
2649-
self._print(added_models[0])
2650-
self._print(f"- `.... {list_length - 2} more ....`\n")
2651-
self._print(added_models[-1])
2652-
else:
2653-
for snapshot in added_models:
2654-
self._print(
2655-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2656-
)
2666+
self._print_models_with_threshold(
2667+
environment_naming_info, {s for s in added_snapshots if s.is_model}, default_catalog
2668+
)
26572669

26582670
added_snapshot_audits = {s for s in added_snapshots if s.is_audit}
26592671
if added_snapshot_audits:
@@ -2664,23 +2676,13 @@ def show_model_difference_summary(
26642676
)
26652677

26662678
removed_snapshot_table_infos = set(context_diff.removed_snapshots.values())
2667-
removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model}
2668-
if removed_model_snapshot_table_infos:
2679+
if removed_snapshot_table_infos:
26692680
self._print("\n**Removed Models:**")
2670-
removed_models = sorted(removed_model_snapshot_table_infos)
2671-
list_length = len(removed_models)
2672-
if (
2673-
self.verbosity < Verbosity.VERY_VERBOSE
2674-
and list_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2675-
):
2676-
self._print(removed_models[0])
2677-
self._print(f"- `.... {list_length - 2} more ....`\n")
2678-
self._print(removed_models[-1])
2679-
else:
2680-
for snapshot_table_info in removed_models:
2681-
self._print(
2682-
f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2683-
)
2681+
self._print_models_with_threshold(
2682+
environment_naming_info,
2683+
{s for s in removed_snapshot_table_infos if s.is_model},
2684+
default_catalog,
2685+
)
26842686

26852687
removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit}
26862688
if removed_audit_snapshot_table_infos:
@@ -2694,48 +2696,72 @@ def show_model_difference_summary(
26942696
current_snapshot for current_snapshot, _ in context_diff.modified_snapshots.values()
26952697
}
26962698
if modified_snapshots:
2697-
directly_modified = []
2698-
indirectly_modified = []
2699-
metadata_modified = []
2700-
for snapshot in modified_snapshots:
2701-
if context_diff.directly_modified(snapshot.name):
2702-
directly_modified.append(snapshot)
2703-
elif context_diff.indirectly_modified(snapshot.name):
2704-
indirectly_modified.append(snapshot)
2705-
elif context_diff.metadata_updated(snapshot.name):
2706-
metadata_modified.append(snapshot)
2707-
if directly_modified:
2708-
self._print("\n**Directly Modified:**")
2709-
for snapshot in sorted(directly_modified):
2710-
self._print(
2711-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2712-
)
2713-
if not no_diff:
2714-
self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
2715-
if indirectly_modified:
2716-
self._print("\n**Indirectly Modified:**")
2717-
indirectly_modified = sorted(indirectly_modified)
2718-
modified_length = len(indirectly_modified)
2719-
if (
2720-
self.verbosity < Verbosity.VERY_VERBOSE
2721-
and modified_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2722-
):
2723-
self._print(
2724-
f"- `{indirectly_modified[0].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`\n"
2725-
f"- `.... {modified_length - 2} more ....`\n"
2726-
f"- `{indirectly_modified[-1].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2727-
)
2728-
else:
2729-
for snapshot in indirectly_modified:
2730-
self._print(
2731-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2732-
)
2733-
if metadata_modified:
2734-
self._print("\n**Metadata Updated:**")
2735-
for snapshot in sorted(metadata_modified):
2736-
self._print(
2737-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2738-
)
2699+
self._print_modified_models(
2700+
context_diff, modified_snapshots, environment_naming_info, default_catalog, no_diff
2701+
)
2702+
2703+
def _print_models_with_threshold(
2704+
self,
2705+
environment_naming_info: EnvironmentNamingInfo,
2706+
snapshot_table_infos: t.Set[SnapshotInfoLike],
2707+
default_catalog: t.Optional[str] = None,
2708+
) -> None:
2709+
models = sorted(snapshot_table_infos)
2710+
list_length = len(models)
2711+
if (
2712+
self.verbosity < Verbosity.VERY_VERBOSE
2713+
and list_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2714+
):
2715+
self._print(
2716+
f"- `{models[0].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2717+
)
2718+
self._print(f"- `.... {list_length - 2} more ....`\n")
2719+
self._print(
2720+
f"- `{models[-1].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2721+
)
2722+
else:
2723+
for snapshot_table_info in models:
2724+
self._print(
2725+
f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2726+
)
2727+
2728+
def _print_modified_models(
2729+
self,
2730+
context_diff: ContextDiff,
2731+
modified_snapshots: t.Set[Snapshot],
2732+
environment_naming_info: EnvironmentNamingInfo,
2733+
default_catalog: t.Optional[str] = None,
2734+
no_diff: bool = True,
2735+
) -> None:
2736+
directly_modified = []
2737+
indirectly_modified = []
2738+
metadata_modified = []
2739+
for snapshot in modified_snapshots:
2740+
if context_diff.directly_modified(snapshot.name):
2741+
directly_modified.append(snapshot)
2742+
elif context_diff.indirectly_modified(snapshot.name):
2743+
indirectly_modified.append(snapshot)
2744+
elif context_diff.metadata_updated(snapshot.name):
2745+
metadata_modified.append(snapshot)
2746+
if directly_modified:
2747+
self._print("\n**Directly Modified:**")
2748+
for snapshot in sorted(directly_modified):
2749+
self._print(
2750+
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2751+
)
2752+
if not no_diff:
2753+
self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
2754+
if indirectly_modified:
2755+
self._print("\n**Indirectly Modified:**")
2756+
self._print_models_with_threshold(
2757+
environment_naming_info, set(indirectly_modified), default_catalog
2758+
)
2759+
if metadata_modified:
2760+
self._print("\n**Metadata Updated:**")
2761+
for snapshot in sorted(metadata_modified):
2762+
self._print(
2763+
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2764+
)
27392765

27402766
def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
27412767
"""Displays the models with missing dates."""

0 commit comments

Comments
 (0)