Skip to content

Commit 5fb2c81

Browse files
authored
Chore: Move removal of partial intervals out of the interval state and into the facade (#3932)
1 parent ece068b commit 5fb2c81

2 files changed

Lines changed: 46 additions & 50 deletions

File tree

sqlmesh/core/state_sync/db/facade.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
from sqlmesh.core.state_sync.db.snapshot import SnapshotState
5353
from sqlmesh.core.state_sync.db.version import VersionState
5454
from sqlmesh.core.state_sync.db.migrator import StateMigrator
55-
from sqlmesh.utils.date import TimeLike, to_timestamp
55+
from sqlmesh.utils.date import TimeLike, to_timestamp, time_like_to_str
5656
from sqlmesh.utils.errors import ConflictingPlanError, SQLMeshError
5757

5858
logger = logging.getLogger(__name__)
@@ -358,7 +358,24 @@ def add_interval(
358358

359359
@transactional()
360360
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
361-
self.interval_state.add_snapshots_intervals(snapshots_intervals)
361+
intervals_to_insert = []
362+
for snapshot_intervals in snapshots_intervals:
363+
snapshot_intervals = snapshot_intervals.copy(
364+
update={
365+
"intervals": _remove_partial_intervals(
366+
snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False
367+
),
368+
"dev_intervals": _remove_partial_intervals(
369+
snapshot_intervals.dev_intervals,
370+
snapshot_intervals.snapshot_id,
371+
is_dev=True,
372+
),
373+
}
374+
)
375+
if not snapshot_intervals.is_empty():
376+
intervals_to_insert.append(snapshot_intervals)
377+
if intervals_to_insert:
378+
self.interval_state.add_snapshots_intervals(intervals_to_insert)
362379

363380
@transactional()
364381
def remove_intervals(
@@ -478,3 +495,27 @@ def _ensure_no_gaps(
478495
def _transaction(self) -> t.Iterator[None]:
479496
with self.engine_adapter.transaction():
480497
yield
498+
499+
500+
def _remove_partial_intervals(
501+
intervals: t.List[Interval], snapshot_id: t.Optional[SnapshotId], *, is_dev: bool
502+
) -> t.List[Interval]:
503+
results = []
504+
for start_ts, end_ts in intervals:
505+
if start_ts < end_ts:
506+
logger.info(
507+
"Adding %s (%s, %s) for snapshot %s",
508+
"dev interval" if is_dev else "interval",
509+
time_like_to_str(start_ts),
510+
time_like_to_str(end_ts),
511+
snapshot_id,
512+
)
513+
results.append((start_ts, end_ts))
514+
else:
515+
logger.info(
516+
"Skipping partial interval (%s, %s) for snapshot %s",
517+
start_ts,
518+
end_ts,
519+
snapshot_id,
520+
)
521+
return results

sqlmesh/core/state_sync/db/interval.py

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121
SnapshotNameVersion,
2222
SnapshotInfoLike,
2323
Snapshot,
24-
SnapshotId,
2524
)
2625
from sqlmesh.core.snapshot.definition import Interval
2726
from sqlmesh.utils.migration import index_text_type
2827
from sqlmesh.utils import random_id
29-
from sqlmesh.utils.date import now_timestamp, time_like_to_str
28+
from sqlmesh.utils.date import now_timestamp
3029

3130

3231
logger = logging.getLogger(__name__)
@@ -62,52 +61,8 @@ def __init__(
6261
}
6362

6463
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
65-
def remove_partial_intervals(
66-
intervals: t.List[Interval], snapshot_id: t.Optional[SnapshotId], *, is_dev: bool
67-
) -> t.List[Interval]:
68-
results = []
69-
for start_ts, end_ts in intervals:
70-
if start_ts < end_ts:
71-
logger.info(
72-
"Adding %s (%s, %s) for snapshot %s",
73-
"dev interval" if is_dev else "interval",
74-
time_like_to_str(start_ts),
75-
time_like_to_str(end_ts),
76-
snapshot_id,
77-
)
78-
results.append((start_ts, end_ts))
79-
else:
80-
logger.info(
81-
"Skipping partial interval (%s, %s) for snapshot %s",
82-
start_ts,
83-
end_ts,
84-
snapshot_id,
85-
)
86-
return results
87-
88-
intervals_to_insert = []
89-
for snapshot_intervals in snapshots_intervals:
90-
snapshot_intervals = snapshot_intervals.copy(
91-
update={
92-
"intervals": remove_partial_intervals(
93-
snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False
94-
),
95-
"dev_intervals": remove_partial_intervals(
96-
snapshot_intervals.dev_intervals,
97-
snapshot_intervals.snapshot_id,
98-
is_dev=True,
99-
),
100-
}
101-
)
102-
if (
103-
snapshot_intervals.intervals
104-
or snapshot_intervals.dev_intervals
105-
or snapshot_intervals.pending_restatement_intervals
106-
):
107-
intervals_to_insert.append(snapshot_intervals)
108-
109-
if intervals_to_insert:
110-
self._push_snapshot_intervals(intervals_to_insert)
64+
if snapshots_intervals:
65+
self._push_snapshot_intervals(snapshots_intervals)
11166

11267
def remove_intervals(
11368
self,

0 commit comments

Comments
 (0)