Skip to content

Commit 6a2ada2

Browse files
Feat: Add support for concurrent table diff across all impacted models
1 parent fd19aa0 commit 6a2ada2

3 files changed

Lines changed: 325 additions & 136 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -897,14 +897,21 @@ def create_external_models(obj: Context, **kwargs: t.Any) -> None:
897897
def table_diff(
898898
obj: Context, source_to_target: str, model: t.Optional[str], **kwargs: t.Any
899899
) -> None:
900-
"""Show the diff between two tables."""
900+
"""Show the diff between two tables or all impacted when no model is specified."""
901901
source, target = source_to_target.split(":")
902-
obj.table_diff(
903-
source=source,
904-
target=target,
905-
model_or_snapshot=model,
906-
**kwargs,
907-
)
902+
if model:
903+
obj.table_diff(
904+
source=source,
905+
target=target,
906+
model_or_snapshot=model,
907+
**kwargs,
908+
)
909+
else:
910+
obj.table_diff_impacted_models(
911+
source=source,
912+
target=target,
913+
**kwargs,
914+
)
908915

909916

910917
@cli.command("rewrite")

sqlmesh/core/console.py

Lines changed: 149 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def show_intervals(self, snapshot_intervals: t.Dict[Snapshot, SnapshotIntervals]
197197

198198

199199
class DifferenceConsole(abc.ABC):
200-
"""Console for displaying environment differences"""
200+
"""Console for displaying differences"""
201201

202202
@abc.abstractmethod
203203
def show_environment_difference_summary(
@@ -217,6 +217,20 @@ def show_model_difference_summary(
217217
) -> None:
218218
"""Displays a summary of differences for the given models."""
219219

220+
@abc.abstractmethod
221+
def show_table_diff_summary(self, table_diff: TableDiff) -> None:
222+
"""Display information about the tables being diffed and how they are being joined"""
223+
224+
@abc.abstractmethod
225+
def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
226+
"""Show table schema diff."""
227+
228+
@abc.abstractmethod
229+
def show_row_diff(
230+
self, row_diff: RowDiff, show_sample: bool = True, skip_grain_check: bool = False
231+
) -> None:
232+
"""Show table summary diff."""
233+
220234

221235
class BaseConsole(abc.ABC):
222236
@abc.abstractmethod
@@ -424,20 +438,6 @@ def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
424438
def loading_stop(self, id: uuid.UUID) -> None:
425439
"""Stop loading for the given id."""
426440

427-
@abc.abstractmethod
428-
def show_table_diff_summary(self, table_diff: TableDiff) -> None:
429-
"""Display information about the tables being diffed and how they are being joined"""
430-
431-
@abc.abstractmethod
432-
def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
433-
"""Show table schema diff."""
434-
435-
@abc.abstractmethod
436-
def show_row_diff(
437-
self, row_diff: RowDiff, show_sample: bool = True, skip_grain_check: bool = False
438-
) -> None:
439-
"""Show table summary diff."""
440-
441441

442442
class NoopConsole(Console):
443443
def start_plan_evaluation(self, plan: EvaluatablePlan) -> None:
@@ -697,6 +697,7 @@ class TerminalConsole(Console):
697697
"""A rich based implementation of the console."""
698698

699699
TABLE_DIFF_SOURCE_BLUE = "#0248ff"
700+
TABLE_DIFF_TARGET_GREEN = "green"
700701

701702
def __init__(
702703
self,
@@ -1544,39 +1545,58 @@ def _show_summary_tree_for(
15441545
)
15451546
tree.add(self._limit_model_names(removed_tree, self.verbosity))
15461547
if modified_snapshot_ids:
1547-
direct = Tree("[bold][direct]Directly Modified:")
1548-
indirect = Tree("[bold][indirect]Indirectly Modified:")
1549-
metadata = Tree("[bold][metadata]Metadata Updated:")
1550-
for s_id in modified_snapshot_ids:
1551-
name = s_id.name
1552-
display_name = context_diff.snapshots[s_id].display_name(
1553-
environment_naming_info,
1554-
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
1555-
dialect=self.dialect,
1556-
)
1557-
if context_diff.directly_modified(name):
1558-
direct.add(
1559-
f"[direct]{display_name}"
1560-
if no_diff
1561-
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1562-
)
1563-
elif context_diff.indirectly_modified(name):
1564-
indirect.add(f"[indirect]{display_name}")
1565-
elif context_diff.metadata_updated(name):
1566-
metadata.add(
1567-
f"[metadata]{display_name}"
1568-
if no_diff
1569-
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1570-
)
1548+
tree = self._add_modified_models(
1549+
context_diff,
1550+
modified_snapshot_ids,
1551+
tree,
1552+
environment_naming_info,
1553+
default_catalog,
1554+
no_diff,
1555+
)
15711556

1572-
if direct.children:
1573-
tree.add(direct)
1574-
if indirect.children:
1575-
tree.add(self._limit_model_names(indirect, self.verbosity))
1576-
if metadata.children:
1577-
tree.add(metadata)
15781557
self._print(tree)
15791558

1559+
def _add_modified_models(
1560+
self,
1561+
context_diff: ContextDiff,
1562+
modified_snapshot_ids: t.Set[SnapshotId],
1563+
tree: Tree,
1564+
environment_naming_info: EnvironmentNamingInfo,
1565+
default_catalog: t.Optional[str] = None,
1566+
no_diff: bool = True,
1567+
) -> Tree:
1568+
direct = Tree("[bold][direct]Directly Modified:")
1569+
indirect = Tree("[bold][indirect]Indirectly Modified:")
1570+
metadata = Tree("[bold][metadata]Metadata Updated:")
1571+
for s_id in modified_snapshot_ids:
1572+
name = s_id.name
1573+
display_name = context_diff.snapshots[s_id].display_name(
1574+
environment_naming_info,
1575+
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
1576+
dialect=self.dialect,
1577+
)
1578+
if context_diff.directly_modified(name):
1579+
direct.add(
1580+
f"[direct]{display_name}"
1581+
if no_diff
1582+
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1583+
)
1584+
elif context_diff.indirectly_modified(name):
1585+
indirect.add(f"[indirect]{display_name}")
1586+
elif context_diff.metadata_updated(name):
1587+
metadata.add(
1588+
f"[metadata]{display_name}"
1589+
if no_diff
1590+
else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
1591+
)
1592+
if direct.children:
1593+
tree.add(direct)
1594+
if indirect.children:
1595+
tree.add(self._limit_model_names(indirect, self.verbosity))
1596+
if metadata.children:
1597+
tree.add(metadata)
1598+
return tree
1599+
15801600
def _show_options_after_categorization(
15811601
self,
15821602
plan_builder: PlanBuilder,
@@ -1897,7 +1917,9 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
18971917
)
18981918
envs.add(source)
18991919

1900-
target = Tree(f"Target: [green]{table_diff.target_alias}[/green]")
1920+
target = Tree(
1921+
f"Target: [{self.TABLE_DIFF_TARGET_GREEN}]{table_diff.target_alias}[/{self.TABLE_DIFF_TARGET_GREEN}]"
1922+
)
19011923
envs.add(target)
19021924

19031925
tree.add(envs)
@@ -1907,7 +1929,9 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
19071929
tables.add(
19081930
f"Source: [{self.TABLE_DIFF_SOURCE_BLUE}]{table_diff.source}[/{self.TABLE_DIFF_SOURCE_BLUE}]"
19091931
)
1910-
tables.add(f"Target: [green]{table_diff.target}[/green]")
1932+
tables.add(
1933+
f"Target: [{self.TABLE_DIFF_TARGET_GREEN}]{table_diff.target}[/{self.TABLE_DIFF_TARGET_GREEN}]"
1934+
)
19111935

19121936
tree.add(tables)
19131937

@@ -1928,7 +1952,7 @@ def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
19281952
if schema_diff.target_alias:
19291953
target_name = schema_diff.target_alias.upper()
19301954

1931-
first_line = f"\n[b]Schema Diff Between '[{self.TABLE_DIFF_SOURCE_BLUE}]{source_name}[/{self.TABLE_DIFF_SOURCE_BLUE}]' and '[green]{target_name}[/green]'"
1955+
first_line = f"\n[b]Schema Diff Between '[{self.TABLE_DIFF_SOURCE_BLUE}]{source_name}[/{self.TABLE_DIFF_SOURCE_BLUE}]' and '[{self.TABLE_DIFF_TARGET_GREEN}]{target_name}[/{self.TABLE_DIFF_TARGET_GREEN}]'"
19321956
if schema_diff.model_name:
19331957
first_line = (
19341958
first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'"
@@ -2032,7 +2056,7 @@ def show_row_diff(
20322056

20332057
column_styles = {
20342058
source_name: self.TABLE_DIFF_SOURCE_BLUE,
2035-
target_name: "green",
2059+
target_name: self.TABLE_DIFF_TARGET_GREEN,
20362060
}
20372061

20382062
for column, [source_column, target_column] in columns.items():
@@ -2639,23 +2663,11 @@ def show_model_difference_summary(
26392663
no_diff: Hide the actual SQL differences.
26402664
"""
26412665
added_snapshots = {context_diff.snapshots[s_id] for s_id in context_diff.added}
2642-
added_snapshot_models = {s for s in added_snapshots if s.is_model}
2643-
if added_snapshot_models:
2666+
if added_snapshots:
26442667
self._print("\n**Added Models:**")
2645-
added_models = sorted(added_snapshot_models)
2646-
list_length = len(added_models)
2647-
if (
2648-
self.verbosity < Verbosity.VERY_VERBOSE
2649-
and list_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2650-
):
2651-
self._print(added_models[0])
2652-
self._print(f"- `.... {list_length - 2} more ....`\n")
2653-
self._print(added_models[-1])
2654-
else:
2655-
for snapshot in added_models:
2656-
self._print(
2657-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2658-
)
2668+
self._print_models_with_threshold(
2669+
environment_naming_info, {s for s in added_snapshots if s.is_model}, default_catalog
2670+
)
26592671

26602672
added_snapshot_audits = {s for s in added_snapshots if s.is_audit}
26612673
if added_snapshot_audits:
@@ -2666,23 +2678,13 @@ def show_model_difference_summary(
26662678
)
26672679

26682680
removed_snapshot_table_infos = set(context_diff.removed_snapshots.values())
2669-
removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model}
2670-
if removed_model_snapshot_table_infos:
2681+
if removed_snapshot_table_infos:
26712682
self._print("\n**Removed Models:**")
2672-
removed_models = sorted(removed_model_snapshot_table_infos)
2673-
list_length = len(removed_models)
2674-
if (
2675-
self.verbosity < Verbosity.VERY_VERBOSE
2676-
and list_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2677-
):
2678-
self._print(removed_models[0])
2679-
self._print(f"- `.... {list_length - 2} more ....`\n")
2680-
self._print(removed_models[-1])
2681-
else:
2682-
for snapshot_table_info in removed_models:
2683-
self._print(
2684-
f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2685-
)
2683+
self._print_models_with_threshold(
2684+
environment_naming_info,
2685+
{s for s in removed_snapshot_table_infos if s.is_model},
2686+
default_catalog,
2687+
)
26862688

26872689
removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit}
26882690
if removed_audit_snapshot_table_infos:
@@ -2696,48 +2698,72 @@ def show_model_difference_summary(
26962698
current_snapshot for current_snapshot, _ in context_diff.modified_snapshots.values()
26972699
}
26982700
if modified_snapshots:
2699-
directly_modified = []
2700-
indirectly_modified = []
2701-
metadata_modified = []
2702-
for snapshot in modified_snapshots:
2703-
if context_diff.directly_modified(snapshot.name):
2704-
directly_modified.append(snapshot)
2705-
elif context_diff.indirectly_modified(snapshot.name):
2706-
indirectly_modified.append(snapshot)
2707-
elif context_diff.metadata_updated(snapshot.name):
2708-
metadata_modified.append(snapshot)
2709-
if directly_modified:
2710-
self._print("\n**Directly Modified:**")
2711-
for snapshot in sorted(directly_modified):
2712-
self._print(
2713-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2714-
)
2715-
if not no_diff:
2716-
self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
2717-
if indirectly_modified:
2718-
self._print("\n**Indirectly Modified:**")
2719-
indirectly_modified = sorted(indirectly_modified)
2720-
modified_length = len(indirectly_modified)
2721-
if (
2722-
self.verbosity < Verbosity.VERY_VERBOSE
2723-
and modified_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2724-
):
2725-
self._print(
2726-
f"- `{indirectly_modified[0].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`\n"
2727-
f"- `.... {modified_length - 2} more ....`\n"
2728-
f"- `{indirectly_modified[-1].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2729-
)
2730-
else:
2731-
for snapshot in indirectly_modified:
2732-
self._print(
2733-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2734-
)
2735-
if metadata_modified:
2736-
self._print("\n**Metadata Updated:**")
2737-
for snapshot in sorted(metadata_modified):
2738-
self._print(
2739-
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2740-
)
2701+
self._print_modified_models(
2702+
context_diff, modified_snapshots, environment_naming_info, default_catalog, no_diff
2703+
)
2704+
2705+
def _print_models_with_threshold(
2706+
self,
2707+
environment_naming_info: EnvironmentNamingInfo,
2708+
snapshot_table_infos: t.Set[SnapshotInfoLike],
2709+
default_catalog: t.Optional[str] = None,
2710+
) -> None:
2711+
models = sorted(snapshot_table_infos)
2712+
list_length = len(models)
2713+
if (
2714+
self.verbosity < Verbosity.VERY_VERBOSE
2715+
and list_length > self.INDIRECTLY_MODIFIED_DISPLAY_THRESHOLD
2716+
):
2717+
self._print(
2718+
f"- `{models[0].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2719+
)
2720+
self._print(f"- `.... {list_length - 2} more ....`\n")
2721+
self._print(
2722+
f"- `{models[-1].display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2723+
)
2724+
else:
2725+
for snapshot_table_info in models:
2726+
self._print(
2727+
f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2728+
)
2729+
2730+
def _print_modified_models(
2731+
self,
2732+
context_diff: ContextDiff,
2733+
modified_snapshots: t.Set[Snapshot],
2734+
environment_naming_info: EnvironmentNamingInfo,
2735+
default_catalog: t.Optional[str] = None,
2736+
no_diff: bool = True,
2737+
) -> None:
2738+
directly_modified = []
2739+
indirectly_modified = []
2740+
metadata_modified = []
2741+
for snapshot in modified_snapshots:
2742+
if context_diff.directly_modified(snapshot.name):
2743+
directly_modified.append(snapshot)
2744+
elif context_diff.indirectly_modified(snapshot.name):
2745+
indirectly_modified.append(snapshot)
2746+
elif context_diff.metadata_updated(snapshot.name):
2747+
metadata_modified.append(snapshot)
2748+
if directly_modified:
2749+
self._print("\n**Directly Modified:**")
2750+
for snapshot in sorted(directly_modified):
2751+
self._print(
2752+
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2753+
)
2754+
if not no_diff:
2755+
self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
2756+
if indirectly_modified:
2757+
self._print("\n**Indirectly Modified:**")
2758+
self._print_models_with_threshold(
2759+
environment_naming_info, set(indirectly_modified), default_catalog
2760+
)
2761+
if metadata_modified:
2762+
self._print("\n**Metadata Updated:**")
2763+
for snapshot in sorted(metadata_modified):
2764+
self._print(
2765+
f"- `{snapshot.display_name(environment_naming_info, default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None, dialect=self.dialect)}`"
2766+
)
27412767

27422768
def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
27432769
"""Displays the models with missing dates."""

0 commit comments

Comments
 (0)