|
59 | 59 | from sqlmesh.core.state_sync.db.environment import EnvironmentState |
60 | 60 | from sqlmesh.core.state_sync.db.snapshot import SnapshotState |
61 | 61 | from sqlmesh.core.state_sync.db.version import VersionState |
62 | | -from sqlmesh.core.state_sync.db.migrator import StateMigrator |
| 62 | +from sqlmesh.core.state_sync.db.migrator import StateMigrator, _backup_table_name |
63 | 63 | from sqlmesh.utils.date import TimeLike, to_timestamp, time_like_to_str, now_timestamp |
64 | 64 | from sqlmesh.utils.errors import ConflictingPlanError, SQLMeshError |
65 | 65 |
|
@@ -270,8 +270,8 @@ def unpause_snapshots( |
270 | 270 | ) -> None: |
271 | 271 | self.snapshot_state.unpause_snapshots(snapshots, unpaused_dt, self.interval_state) |
272 | 272 |
|
273 | | - def invalidate_environment(self, name: str) -> None: |
274 | | - self.environment_state.invalidate_environment(name) |
| 273 | + def invalidate_environment(self, name: str, protect_prod: bool = True) -> None: |
| 274 | + self.environment_state.invalidate_environment(name, protect_prod) |
275 | 275 |
|
276 | 276 | def get_expired_snapshots( |
277 | 277 | self, current_ts: int, ignore_ttl: bool = False |
@@ -313,18 +313,26 @@ def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[Sna |
313 | 313 | def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]: |
314 | 314 | return self.snapshot_state.nodes_exist(names, exclude_external) |
315 | 315 |
|
316 | | - def reset(self, default_catalog: t.Optional[str]) -> None: |
317 | | - """Resets the state store to the state when it was first initialized.""" |
| 316 | + def remove_state(self, including_backup: bool = False) -> None: |
| 317 | + """Removes the state store objects.""" |
318 | 318 | for table in ( |
319 | 319 | self.snapshot_state.snapshots_table, |
320 | 320 | self.snapshot_state.auto_restatements_table, |
321 | 321 | self.environment_state.environments_table, |
| 322 | + self.environment_state.environment_statements_table, |
322 | 323 | self.interval_state.intervals_table, |
323 | 324 | self.plan_dags_table, |
324 | 325 | self.version_state.versions_table, |
325 | 326 | ): |
326 | 327 | self.engine_adapter.drop_table(table) |
| 328 | + if including_backup: |
| 329 | + self.engine_adapter.drop_table(_backup_table_name(table)) |
| 330 | + |
327 | 331 | self.snapshot_state.clear_cache() |
| 332 | + |
| 333 | + def reset(self, default_catalog: t.Optional[str]) -> None: |
| 334 | + """Resets the state store to the state when it was first initialized.""" |
| 335 | + self.remove_state() |
328 | 336 | self.migrate(default_catalog) |
329 | 337 |
|
330 | 338 | @transactional() |
|
0 commit comments