Skip to content

Commit 1075a58

Browse files
committed
PR Feedback 1
1 parent 4329f8d commit 1075a58

9 files changed

Lines changed: 99 additions & 103 deletions

File tree

sqlmesh/core/context.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
from sqlmesh.core.user import User
118118
from sqlmesh.utils import UniqueKeyDict, Verbosity
119119
from sqlmesh.utils.dag import DAG
120-
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime
120+
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime, now_timestamp
121121
from sqlmesh.utils.errors import (
122122
CircuitBreakerError,
123123
ConfigError,
@@ -2334,10 +2334,14 @@ def _context_diff(
23342334
)
23352335

23362336
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2337+
current_ts = now_timestamp()
2338+
23372339
# Clean up expired environments by removing their views and schemas
23382340
self._cleanup_environments()
23392341

2340-
_, cleanup_targets = self.state_sync.get_expired_snapshots(ignore_ttl=ignore_ttl)
2342+
_, cleanup_targets = self.state_sync.get_expired_snapshots(
2343+
ignore_ttl=ignore_ttl, current_ts=current_ts
2344+
)
23412345

23422346
# Remove the expired snapshots tables
23432347
self.snapshot_evaluator.cleanup(
@@ -2346,21 +2350,25 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23462350
)
23472351

23482352
# Delete the expired snapshot records from the state sync
2349-
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
2353+
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl, current_ts=current_ts)
23502354

23512355
self.state_sync.compact_intervals()
23522356

2353-
def _cleanup_environments(self) -> None:
2354-
cleanup_targets = self.state_sync.get_expired_environments()
2357+
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
2358+
current_ts = current_ts or now_timestamp()
2359+
2360+
expired_environments = self.state_sync.get_expired_environments(current_ts=current_ts)
23552361

23562362
cleanup_expired_views(
23572363
default_adapter=self.engine_adapter,
23582364
engine_adapters=self.engine_adapters,
2359-
environments=cleanup_targets.expired_environments,
2365+
environments=expired_environments,
23602366
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
23612367
console=self.console,
23622368
)
2363-
self.state_sync.delete_environments(cleanup_targets)
2369+
self.state_sync.delete_environments(
2370+
environments=expired_environments, current_ts=current_ts
2371+
)
23642372

23652373
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23662374
connection_name = connection_name.capitalize()

sqlmesh/core/environment.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import re
55
import typing as t
66

7-
from sqlglot import exp
87
from pydantic import Field
98

109
from sqlmesh.core import constants as c
@@ -229,11 +228,6 @@ class EnvironmentStatements(PydanticModel):
229228
jinja_macros: t.Optional[JinjaMacroRegistry] = None
230229

231230

232-
class EnvironmentCleanupTask(PydanticModel):
233-
expired_environments: t.List[Environment]
234-
filter_expr: exp.LTE
235-
236-
237231
def execute_environment_statements(
238232
adapter: EngineAdapter,
239233
environment_statements: t.List[EnvironmentStatements],

sqlmesh/core/state_sync/base.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,7 @@
88
from sqlglot import __version__ as SQLGLOT_VERSION
99

1010
from sqlmesh import migrations
11-
from sqlmesh.core.environment import (
12-
Environment,
13-
EnvironmentNamingInfo,
14-
EnvironmentStatements,
15-
EnvironmentCleanupTask,
16-
)
11+
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
1712
from sqlmesh.core.snapshot import (
1813
Snapshot,
1914
SnapshotId,
@@ -308,7 +303,7 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
308303

309304
@abc.abstractmethod
310305
def get_expired_snapshots(
311-
self, ignore_ttl: bool = False
306+
self, current_ts: int, ignore_ttl: bool = False
312307
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
313308
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
314309
@@ -322,8 +317,7 @@ def get_expired_snapshots(
322317

323318
@abc.abstractmethod
324319
def delete_expired_snapshots(
325-
self,
326-
ignore_ttl: bool = False,
320+
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
327321
) -> t.List[SnapshotTableCleanupTask]:
328322
"""Removes expired snapshots.
329323
@@ -405,7 +399,7 @@ def finalize(self, environment: Environment) -> None:
405399
"""
406400

407401
@abc.abstractmethod
408-
def get_expired_environments(self) -> EnvironmentCleanupTask:
402+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
409403
"""Returns the expired environments.
410404
411405
Expired environments are environments that have exceeded their time-to-live value.
@@ -414,15 +408,18 @@ def get_expired_environments(self) -> EnvironmentCleanupTask:
414408
"""
415409

416410
@abc.abstractmethod
417-
def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None:
418-
"""Removes the environments specified by the arguments..
411+
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None:
412+
"""Removes the environments specified by the arguments.
419413
420-
Returns:
421-
The list of removed environments.
414+
Args:
415+
environments: The environments to remove.
416+
current_ts: The current timestamp.
422417
"""
423418

424419
@abc.abstractmethod
425-
def delete_expired_environments(self) -> t.List[Environment]:
420+
def delete_expired_environments(
421+
self, current_ts: t.Optional[int] = None
422+
) -> t.List[Environment]:
426423
"""Removes expired environments.
427424
428425
Expired environments are environments that have exceeded their time-to-live value.

sqlmesh/core/state_sync/cache.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,13 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
111111
self.state_sync.delete_snapshots(snapshot_ids)
112112

113113
def delete_expired_snapshots(
114-
self, ignore_ttl: bool = False
114+
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
115115
) -> t.List[SnapshotTableCleanupTask]:
116+
current_ts = current_ts or now_timestamp()
116117
self.snapshot_cache.clear()
117-
return self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
118+
return self.state_sync.delete_expired_snapshots(
119+
current_ts=current_ts, ignore_ttl=ignore_ttl
120+
)
118121

119122
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
120123
for snapshot_intervals in snapshots_intervals:

sqlmesh/core/state_sync/db/environment.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
fetchall,
1313
fetchone,
1414
)
15-
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentCleanupTask
15+
from sqlmesh.core.environment import Environment, EnvironmentStatements
1616
from sqlmesh.utils.migration import index_text_type, blob_text_type
1717
from sqlmesh.utils.date import now_timestamp, time_like_to_str
1818
from sqlmesh.utils.errors import SQLMeshError
@@ -162,62 +162,57 @@ def finalize(self, environment: Environment) -> None:
162162
where=environment_filter,
163163
)
164164

165-
def get_expired_environments(self) -> EnvironmentCleanupTask:
165+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
166166
"""Returns the expired environments.
167167
168168
Expired environments are environments that have exceeded their time-to-live value.
169169
Returns:
170170
The list of environments to remove, the filter to remove environments.
171171
"""
172-
now_ts = now_timestamp()
173-
filter_expr = exp.LTE(
174-
this=exp.column("expiration_ts"),
175-
expression=exp.Literal.number(now_ts),
176-
)
177-
178172
rows = fetchall(
179173
self.engine_adapter,
180174
self._environments_query(
181-
where=filter_expr,
175+
where=self._create_filter_expr(current_ts),
182176
lock_for_update=True,
183177
),
184178
)
185179
expired_environments = [self._environment_from_row(r) for r in rows]
186180

187-
return EnvironmentCleanupTask(
188-
expired_environments=expired_environments, filter_expr=filter_expr
189-
)
181+
return expired_environments
190182

191-
def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None:
183+
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None:
192184
"""Deletes expired environments.
193185
194186
Returns:
195187
A list of deleted environments.
196188
"""
197189
self.engine_adapter.delete_from(
198190
self.environments_table,
199-
where=cleanup_targets.filter_expr,
191+
where=self._create_filter_expr(current_ts),
200192
)
201193

202194
# Delete the expired environments' corresponding environment statements
203195
if expired_environments := [
204196
exp.EQ(this=exp.column("environment_name"), expression=exp.Literal.string(env.name))
205-
for env in cleanup_targets.expired_environments
197+
for env in environments
206198
]:
207199
self.engine_adapter.delete_from(
208200
self.environment_statements_table,
209201
where=exp.or_(*expired_environments),
210202
)
211203

212-
def delete_expired_environments(self) -> t.List[Environment]:
204+
def delete_expired_environments(
205+
self, current_ts: t.Optional[int] = None
206+
) -> t.List[Environment]:
213207
"""Deletes expired environments.
214208
215209
Returns:
216210
A list of deleted environments.
217211
"""
218-
cleanup_targets = self.get_expired_environments()
219-
self.delete_environments(cleanup_targets)
220-
return cleanup_targets.expired_environments
212+
current_ts = current_ts or now_timestamp()
213+
expired_environments = self.get_expired_environments(current_ts=current_ts)
214+
self.delete_environments(expired_environments, current_ts=current_ts)
215+
return expired_environments
221216

222217
def get_environments(self) -> t.List[Environment]:
223218
"""Fetches all environments.
@@ -321,6 +316,12 @@ def _environments_query(
321316
return query.lock(copy=False)
322317
return query
323318

319+
def _create_filter_expr(self, current_ts: int) -> exp.Expression:
320+
return exp.LTE(
321+
this=exp.column("expiration_ts"),
322+
expression=exp.Literal.number(current_ts),
323+
)
324+
324325

325326
def _environment_to_df(environment: Environment) -> pd.DataFrame:
326327
return pd.DataFrame(

sqlmesh/core/state_sync/db/facade.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
from sqlmesh.core.console import Console, get_console
2828
from sqlmesh.core.engine_adapter import EngineAdapter
29-
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentCleanupTask
29+
from sqlmesh.core.environment import Environment, EnvironmentStatements
3030
from sqlmesh.core.snapshot import (
3131
Snapshot,
3232
SnapshotId,
@@ -60,7 +60,7 @@
6060
from sqlmesh.core.state_sync.db.snapshot import SnapshotState
6161
from sqlmesh.core.state_sync.db.version import VersionState
6262
from sqlmesh.core.state_sync.db.migrator import StateMigrator
63-
from sqlmesh.utils.date import TimeLike, to_timestamp, time_like_to_str
63+
from sqlmesh.utils.date import TimeLike, to_timestamp, time_like_to_str, now_timestamp
6464
from sqlmesh.utils.errors import ConflictingPlanError, SQLMeshError
6565

6666
logger = logging.getLogger(__name__)
@@ -274,33 +274,39 @@ def invalidate_environment(self, name: str) -> None:
274274
self.environment_state.invalidate_environment(name)
275275

276276
def get_expired_snapshots(
277-
self, ignore_ttl: bool = False
277+
self, current_ts: int, ignore_ttl: bool = False
278278
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
279279
return self.snapshot_state.get_expired_snapshots(
280-
self.environment_state.get_environments(), ignore_ttl=ignore_ttl
280+
self.environment_state.get_environments(), current_ts=current_ts, ignore_ttl=ignore_ttl
281281
)
282282

283-
def get_expired_environments(self) -> EnvironmentCleanupTask:
284-
return self.environment_state.get_expired_environments()
283+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
284+
return self.environment_state.get_expired_environments(current_ts=current_ts)
285285

286286
@transactional()
287287
def delete_expired_snapshots(
288-
self, ignore_ttl: bool = False
288+
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
289289
) -> t.List[SnapshotTableCleanupTask]:
290-
expired_snapshot_ids, cleanup_targets = self.get_expired_snapshots(ignore_ttl=ignore_ttl)
290+
current_ts = current_ts or now_timestamp()
291+
expired_snapshot_ids, cleanup_targets = self.get_expired_snapshots(
292+
ignore_ttl=ignore_ttl, current_ts=current_ts
293+
)
291294

292-
self.snapshot_state.delete_expired_snapshots(expired_snapshot_ids=expired_snapshot_ids)
295+
self.snapshot_state.delete_snapshots(expired_snapshot_ids)
293296
self.interval_state.cleanup_intervals(cleanup_targets, expired_snapshot_ids)
294297

295298
return cleanup_targets
296299

297300
@transactional()
298-
def delete_expired_environments(self) -> t.List[Environment]:
299-
return self.environment_state.delete_expired_environments()
301+
def delete_expired_environments(
302+
self, current_ts: t.Optional[int] = None
303+
) -> t.List[Environment]:
304+
current_ts = current_ts or now_timestamp()
305+
return self.environment_state.delete_expired_environments(current_ts=current_ts)
300306

301307
@transactional()
302-
def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None:
303-
return self.environment_state.delete_environments(cleanup_targets=cleanup_targets)
308+
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None:
309+
self.environment_state.delete_environments(environments=environments, current_ts=current_ts)
304310

305311
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
306312
self.snapshot_state.delete_snapshots(snapshot_ids)

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ def unpause_snapshots(
194194
self._update_snapshots(unrestorable_snapshots, unrestorable=True)
195195

196196
def get_expired_snapshots(
197-
self, environments: t.Iterable[Environment], ignore_ttl: bool = False
197+
self,
198+
environments: t.Iterable[Environment],
199+
current_ts: int,
200+
ignore_ttl: bool = False,
198201
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
199202
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
200203
@@ -205,8 +208,6 @@ def get_expired_snapshots(
205208
The set of expired snapshot ids.
206209
The list of table cleanup tasks.
207210
"""
208-
current_ts = now_timestamp(minute_floor=False)
209-
210211
expired_query = exp.select("name", "identifier", "version").from_(self.snapshots_table)
211212

212213
if not ignore_ttl:
@@ -272,14 +273,6 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
272273

273274
return expired_snapshot_ids, cleanup_targets
274275

275-
def delete_expired_snapshots(self, expired_snapshot_ids: t.Set[SnapshotId]) -> None:
276-
"""Deletes expired snapshots.
277-
278-
Args:
279-
expired_snapshot_ids: The list of expired snapshot ids.
280-
"""
281-
self.delete_snapshots(expired_snapshot_ids)
282-
283276
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
284277
"""Deletes snapshots.
285278

tests/core/state_sync/test_state_sync.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,9 @@ def test_delete_expired_snapshots_promoted(
12551255
env.snapshots_ = []
12561256
state_sync.promote(env)
12571257

1258-
now_timestamp_mock = mocker.patch("sqlmesh.core.state_sync.db.snapshot.now_timestamp")
1258+
# now_timestamp_mock = mocker.patch("sqlmesh.core.state_sync.db.snapshot.now_timestamp")
1259+
now_timestamp_mock = mocker.patch("sqlmesh.core.state_sync.db.facade.now_timestamp")
1260+
12591261
now_timestamp_mock.return_value = now_timestamp() + 11000
12601262

12611263
assert state_sync.delete_expired_snapshots() == [

0 commit comments

Comments
 (0)