|
12 | 12 | fetchall, |
13 | 13 | fetchone, |
14 | 14 | ) |
15 | | -from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentCleanupTask |
| 15 | +from sqlmesh.core.environment import Environment, EnvironmentStatements |
16 | 16 | from sqlmesh.utils.migration import index_text_type, blob_text_type |
17 | 17 | from sqlmesh.utils.date import now_timestamp, time_like_to_str |
18 | 18 | from sqlmesh.utils.errors import SQLMeshError |
@@ -162,62 +162,57 @@ def finalize(self, environment: Environment) -> None: |
162 | 162 | where=environment_filter, |
163 | 163 | ) |
164 | 164 |
|
165 | | - def get_expired_environments(self) -> EnvironmentCleanupTask: |
| 165 | + def get_expired_environments(self, current_ts: int) -> t.List[Environment]: |
166 | 166 | """Returns the expired environments. |
167 | 167 |
|
168 | 168 | Expired environments are environments that have exceeded their time-to-live value. |
169 | 169 | Returns: |
170 | 170 | The list of environments to remove, the filter to remove environments. |
171 | 171 | """ |
172 | | - now_ts = now_timestamp() |
173 | | - filter_expr = exp.LTE( |
174 | | - this=exp.column("expiration_ts"), |
175 | | - expression=exp.Literal.number(now_ts), |
176 | | - ) |
177 | | - |
178 | 172 | rows = fetchall( |
179 | 173 | self.engine_adapter, |
180 | 174 | self._environments_query( |
181 | | - where=filter_expr, |
| 175 | + where=self._create_filter_expr(current_ts), |
182 | 176 | lock_for_update=True, |
183 | 177 | ), |
184 | 178 | ) |
185 | 179 | expired_environments = [self._environment_from_row(r) for r in rows] |
186 | 180 |
|
187 | | - return EnvironmentCleanupTask( |
188 | | - expired_environments=expired_environments, filter_expr=filter_expr |
189 | | - ) |
| 181 | + return expired_environments |
190 | 182 |
|
191 | | - def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None: |
| 183 | + def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None: |
192 | 184 | """Deletes expired environments. |
193 | 185 |
|
194 | 186 | Returns: |
195 | 187 | A list of deleted environments. |
196 | 188 | """ |
197 | 189 | self.engine_adapter.delete_from( |
198 | 190 | self.environments_table, |
199 | | - where=cleanup_targets.filter_expr, |
| 191 | + where=self._create_filter_expr(current_ts), |
200 | 192 | ) |
201 | 193 |
|
202 | 194 | # Delete the expired environments' corresponding environment statements |
203 | 195 | if expired_environments := [ |
204 | 196 | exp.EQ(this=exp.column("environment_name"), expression=exp.Literal.string(env.name)) |
205 | | - for env in cleanup_targets.expired_environments |
| 197 | + for env in environments |
206 | 198 | ]: |
207 | 199 | self.engine_adapter.delete_from( |
208 | 200 | self.environment_statements_table, |
209 | 201 | where=exp.or_(*expired_environments), |
210 | 202 | ) |
211 | 203 |
|
212 | | - def delete_expired_environments(self) -> t.List[Environment]: |
| 204 | + def delete_expired_environments( |
| 205 | + self, current_ts: t.Optional[int] = None |
| 206 | + ) -> t.List[Environment]: |
213 | 207 | """Deletes expired environments. |
214 | 208 |
|
215 | 209 | Returns: |
216 | 210 | A list of deleted environments. |
217 | 211 | """ |
218 | | - cleanup_targets = self.get_expired_environments() |
219 | | - self.delete_environments(cleanup_targets) |
220 | | - return cleanup_targets.expired_environments |
| 212 | + current_ts = current_ts or now_timestamp() |
| 213 | + expired_environments = self.get_expired_environments(current_ts=current_ts) |
| 214 | + self.delete_environments(expired_environments, current_ts=current_ts) |
| 215 | + return expired_environments |
221 | 216 |
|
222 | 217 | def get_environments(self) -> t.List[Environment]: |
223 | 218 | """Fetches all environments. |
@@ -321,6 +316,12 @@ def _environments_query( |
321 | 316 | return query.lock(copy=False) |
322 | 317 | return query |
323 | 318 |
|
| 319 | + def _create_filter_expr(self, current_ts: int) -> exp.Expression: |
| 320 | + return exp.LTE( |
| 321 | + this=exp.column("expiration_ts"), |
| 322 | + expression=exp.Literal.number(current_ts), |
| 323 | + ) |
| 324 | + |
324 | 325 |
|
325 | 326 | def _environment_to_df(environment: Environment) -> pd.DataFrame: |
326 | 327 | return pd.DataFrame( |
|
0 commit comments