Skip to content

Commit 1473bc6

Browse files
authored
feat: add the ability to check intervals (#4187)
1 parent effeba3 commit 1473bc6

9 files changed

Lines changed: 244 additions & 5 deletions

File tree

docs/guides/signals.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,19 @@ from sqlmesh import signal, DatetimeRanges, ExecutionContext
131131
def one_week_ago(batch: DatetimeRanges, context: ExecutionContext) -> t.Union[bool, DatetimeRanges]:
132132
return len(context.engine_adapter.fetchdf("SELECT 1")) > 1
133133
```
134+
135+
### Testing Signals
136+
Signals only evaluate on `run` or with `check_intervals`.
137+
138+
To test signals with the [check_intervals](../reference/cli.md#check_intervals) command:
139+
140+
1. Deploy your changes to an environment with `sqlmesh plan my_dev`.
141+
2. Run `sqlmesh check_intervals my_dev`.
142+
143+
* To check a subset of models use the --select-model flag.
144+
* To turn off signals and just check missing intervals, use the --no-signals flag.
145+
146+
3. To iterate, make changes to the signal, and redeploy with step 1.
147+
148+
!!! note
149+
`check_intervals` only works on remote models in an environment. Local signal changes are never run.

docs/reference/cli.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,24 @@ Options:
6565
--help Show this message and exit.
6666
```
6767

68+
## check_intervals
69+
70+
```
71+
Usage: sqlmesh check_intervals [OPTIONS] [ENVIRONMENT]
72+
73+
Show missing intervals in an environment, respecting signals.
74+
75+
Options:
76+
--no-signals Disable signal checks and only show missing intervals.
77+
--select-model TEXT Select specific models to show missing intervals for.
78+
-s, --start TEXT The start datetime of the interval for which this
79+
command will be applied.
80+
-e, --end TEXT The end datetime of the interval for which this command
81+
will be applied.
82+
--help Show this message and exit.
83+
```
84+
85+
6886
## clean
6987

7088
```
@@ -584,4 +602,4 @@ Options:
584602
--model TEXT A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.
585603
--help Show this message and exit.
586604
587-
```
605+
```

docs/reference/notebook.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,27 @@ options:
438438
Execution time.
439439
```
440440

441+
#### check_intervals
442+
```
443+
%check_intervals [--no-signals] [--select-model [SELECT_MODEL ...]]
444+
[--start START] [--end END]
445+
[environment]
446+
447+
Show missing intervals in an environment, respecting signals.
448+
449+
positional arguments:
450+
environment The environment to check intervals for.
451+
452+
options:
453+
--no-signals Disable signal checks and only show missing intervals.
454+
--select-model <[SELECT_MODEL ...]>
455+
Select specific model changes that should be included
456+
in the plan.
457+
--start START, -s START
458+
Start date of intervals to check for.
459+
--end END, -e END End date of intervals to check for.
460+
```
461+
441462
#### rollback
442463
```
443464
%rollback
@@ -499,10 +520,10 @@ options:
499520

500521
#### lint
501522
```
502-
%lint [--models ...]
523+
%lint [--models ...]
503524
504525
Run the linter on the target models(s)
505526
506527
positional arguments:
507528
--models A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.
508-
```
529+
```

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ nav:
3636
- guides/observer.md
3737
- Advanced usage:
3838
- guides/customizing_sqlmesh.md
39+
- guides/signals.md
3940
- Concepts:
4041
- concepts/overview.md
4142
- Development:

sqlmesh/cli/main.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,42 @@ def audit(
693693
obj.audit(models=models, start=start, end=end, execution_time=execution_time)
694694

695695

696+
@cli.command("check_intervals")
697+
@click.option(
698+
"--no-signals",
699+
is_flag=True,
700+
help="Disable signal checks and only show missing intervals.",
701+
default=False,
702+
)
703+
@click.argument("environment", required=False)
704+
@click.option(
705+
"--select-model",
706+
type=str,
707+
multiple=True,
708+
help="Select specific models to show missing intervals for.",
709+
)
710+
@opt.start_time
711+
@opt.end_time
712+
@click.pass_context
713+
@error_handler
714+
@cli_analytics
715+
def check_intervals(
716+
ctx: click.Context,
717+
environment: t.Optional[str],
718+
no_signals: bool,
719+
select_model: t.List[str],
720+
start: TimeLike,
721+
end: TimeLike,
722+
) -> None:
723+
"""Show missing intervals in an environment, respecting signals."""
724+
context = ctx.obj
725+
context.console.show_intervals(
726+
context.check_intervals(
727+
environment, no_signals=no_signals, select_models=select_model, start=start, end=end
728+
)
729+
)
730+
731+
696732
@cli.command("fetchdf")
697733
@click.argument("sql")
698734
@click.pass_context

sqlmesh/core/console.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ class EnvironmentsConsole(abc.ABC):
191191
def print_environments(self, environments_summary: t.Dict[str, int]) -> None:
192192
"""Prints all environment names along with expiry datetime."""
193193

194+
@abc.abstractmethod
195+
def show_intervals(self, snapshot_intervals: t.Dict[Snapshot, SnapshotIntervals]) -> None:
196+
"""Show ready intervals"""
197+
194198

195199
class DifferenceConsole(abc.ABC):
196200
"""Console for displaying environment differences"""
@@ -658,6 +662,9 @@ def show_row_diff(
658662
def print_environments(self, environments_summary: t.Dict[str, int]) -> None:
659663
pass
660664

665+
def show_intervals(self, snapshot_intervals: t.Dict[Snapshot, SnapshotIntervals]) -> None:
666+
pass
667+
661668
def show_linter_violations(
662669
self, violations: t.List[RuleViolation], model: Model, is_error: bool = False
663670
) -> None:
@@ -2091,6 +2098,24 @@ def print_environments(self, environments_summary: t.Dict[str, int]) -> None:
20912098
output_str = "\n".join([str(len(output)), *output])
20922099
self.log_status_update(f"Number of SQLMesh environments are: {output_str}")
20932100

2101+
def show_intervals(self, snapshot_intervals: t.Dict[Snapshot, SnapshotIntervals]) -> None:
2102+
complete = Tree(f"[b]Complete Intervals[/b]")
2103+
incomplete = Tree(f"[b]Missing Intervals[/b]")
2104+
2105+
for snapshot, intervals in sorted(snapshot_intervals.items(), key=lambda s: s[0].node.name):
2106+
if intervals.intervals:
2107+
incomplete.add(
2108+
f"{snapshot.node.name}: [{intervals.format_intervals(snapshot.node.interval_unit)}]"
2109+
)
2110+
else:
2111+
complete.add(snapshot.node.name)
2112+
2113+
if complete.children:
2114+
self._print(complete)
2115+
2116+
if incomplete.children:
2117+
self._print(incomplete)
2118+
20942119
def print_connection_config(self, config: ConnectionConfig, title: str = "Connection") -> None:
20952120
tree = Tree(f"[b]{title}:[/b]")
20962121
tree.add(f"Type: [bold cyan]{config.type_}[/bold cyan]")

sqlmesh/core/context.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
NotificationTarget,
8888
NotificationTargetManager,
8989
)
90-
from sqlmesh.core.plan import Plan, PlanBuilder
90+
from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals
9191
from sqlmesh.core.reference import ReferenceGraph
9292
from sqlmesh.core.scheduler import Scheduler, CompletionStatus
9393
from sqlmesh.core.schema_loader import create_external_models_file
@@ -97,6 +97,7 @@
9797
Snapshot,
9898
SnapshotEvaluator,
9999
SnapshotFingerprint,
100+
missing_intervals,
100101
to_table_mapping,
101102
)
102103
from sqlmesh.core.snapshot.definition import get_next_model_interval_start
@@ -469,11 +470,12 @@ def execution_context(
469470
self,
470471
deployability_index: t.Optional[DeployabilityIndex] = None,
471472
engine_adapter: t.Optional[EngineAdapter] = None,
473+
snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
472474
) -> ExecutionContext:
473475
"""Returns an execution context."""
474476
return ExecutionContext(
475477
engine_adapter=engine_adapter or self.engine_adapter,
476-
snapshots=self.snapshots,
478+
snapshots=snapshots or self.snapshots,
477479
deployability_index=deployability_index,
478480
default_dialect=self.default_dialect,
479481
default_catalog=self.default_catalog,
@@ -1893,6 +1895,61 @@ def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
18931895
dialect=dialect or self.default_dialect,
18941896
)
18951897

1898+
@python_api_analytics
1899+
def check_intervals(
1900+
self,
1901+
environment: t.Optional[str],
1902+
no_signals: bool,
1903+
select_models: t.Collection[str],
1904+
start: t.Optional[TimeLike] = None,
1905+
end: t.Optional[TimeLike] = None,
1906+
) -> t.Dict[Snapshot, SnapshotIntervals]:
1907+
"""Check intervals for a given environment.
1908+
1909+
Args:
1910+
environment: The environment or prod if None.
1911+
select_models: A list of model selection strings to show intervals for.
1912+
start: The start of the intervals to check.
1913+
end: The end of the intervals to check.
1914+
"""
1915+
1916+
environment = environment or c.PROD
1917+
env = self.state_reader.get_environment(environment)
1918+
if not env:
1919+
raise SQLMeshError(f"Environment '{environment}' was not found.")
1920+
1921+
snapshots = {k.name: v for k, v in self.state_sync.get_snapshots(env.snapshots).items()}
1922+
1923+
missing = {
1924+
k.name: v
1925+
for k, v in missing_intervals(
1926+
snapshots.values(), start=start, end=end, execution_time=end
1927+
).items()
1928+
}
1929+
1930+
if select_models:
1931+
selected: t.Collection[str] = self._select_models_for_run(
1932+
select_models, True, snapshots.values()
1933+
)
1934+
else:
1935+
selected = snapshots.keys()
1936+
1937+
results = {}
1938+
execution_context = self.execution_context(snapshots=snapshots)
1939+
1940+
for fqn in selected:
1941+
snapshot = snapshots[fqn]
1942+
intervals = missing.get(fqn) or []
1943+
1944+
results[snapshot] = SnapshotIntervals(
1945+
snapshot.snapshot_id,
1946+
intervals
1947+
if no_signals
1948+
else snapshot.check_ready_intervals(intervals, execution_context),
1949+
)
1950+
1951+
return results
1952+
18961953
@python_api_analytics
18971954
def migrate(self) -> None:
18981955
"""Migrates SQLMesh to the current running version.

sqlmesh/magics.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,38 @@ def audit(self, context: Context, line: str) -> None:
10071007
models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
10081008
)
10091009

1010+
@magic_arguments()
1011+
@argument("environment", nargs="?", type=str, help="The environment to check intervals for.")
1012+
@argument(
1013+
"--no-signals",
1014+
action="store_true",
1015+
help="Disable signal checks and only show missing intervals.",
1016+
default=False,
1017+
)
1018+
@argument(
1019+
"--select-model",
1020+
type=str,
1021+
nargs="*",
1022+
help="Select specific model changes that should be included in the plan.",
1023+
)
1024+
@argument("--start", "-s", type=str, help="Start date of intervals to check for.")
1025+
@argument("--end", "-e", type=str, help="End date of intervals to check for.")
1026+
@line_magic
1027+
@pass_sqlmesh_context
1028+
def check_intervals(self, context: Context, line: str) -> None:
1029+
"""Show missing intervals in an environment, respecting signals."""
1030+
args = parse_argstring(self.check_intervals, line)
1031+
1032+
context.console.show_intervals(
1033+
context.check_intervals(
1034+
environment=args.environment,
1035+
no_signals=args.no_signals,
1036+
select_models=args.select_model,
1037+
start=args.start,
1038+
end=args.end,
1039+
)
1040+
)
1041+
10101042
@magic_arguments()
10111043
@argument(
10121044
"--skip-connection",

tests/core/test_context.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1993,3 +1993,36 @@ def test_plan_audit_intervals(tmp_path: pathlib.Path, capsys, caplog):
19931993
"""SELECT COUNT(*) FROM (SELECT ("date_id") AS "date_id" FROM (SELECT * FROM "sqlmesh__sqlmesh_audit"."sqlmesh_audit__date_example__4100277424" AS "sqlmesh_audit__date_example__4100277424" WHERE "date_id" BETWEEN CAST('2025-02-01' AS DATE) AND CAST('2025-02-01' AS DATE)) AS "_q_0" WHERE TRUE GROUP BY ("date_id") HAVING COUNT(*) > 1) AS "audit\""""
19941994
in caplog.text
19951995
)
1996+
1997+
1998+
def test_check_intervals(sushi_context, mocker):
1999+
with pytest.raises(
2000+
SQLMeshError,
2001+
match="Environment 'dev' was not found",
2002+
):
2003+
sushi_context.check_intervals(environment="dev", no_signals=False, select_models=[])
2004+
2005+
spy = mocker.spy(sqlmesh.core.snapshot.definition, "_check_ready_intervals")
2006+
intervals = sushi_context.check_intervals(environment=None, no_signals=False, select_models=[])
2007+
2008+
min_intervals = 19
2009+
assert spy.call_count == 1
2010+
assert len(intervals) >= min_intervals
2011+
2012+
for i in intervals.values():
2013+
assert not i.intervals
2014+
2015+
spy.reset_mock()
2016+
intervals = sushi_context.check_intervals(environment=None, no_signals=True, select_models=[])
2017+
assert spy.call_count == 0
2018+
assert len(intervals) >= min_intervals
2019+
2020+
intervals = sushi_context.check_intervals(
2021+
environment=None, no_signals=False, select_models=["*waiter_as_customer*"]
2022+
)
2023+
assert len(intervals) == 1
2024+
2025+
intervals = sushi_context.check_intervals(
2026+
environment=None, no_signals=False, select_models=["*waiter_as_customer*"], end="next week"
2027+
)
2028+
assert tuple(intervals.values())[0].intervals

0 commit comments

Comments
 (0)