diff --git a/sqlmesh/core/state_sync/db/environment.py b/sqlmesh/core/state_sync/db/environment.py index b06d6160cc..b7e8128a93 100644 --- a/sqlmesh/core/state_sync/db/environment.py +++ b/sqlmesh/core/state_sync/db/environment.py @@ -172,13 +172,9 @@ def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary Returns: The list of environment summaries to remove. """ - - environment_summaries = self.get_environments_summary() - return [ - env_summary - for env_summary in environment_summaries - if env_summary.expiration_ts is not None and env_summary.expiration_ts <= current_ts - ] + return self._fetch_environment_summaries( + where=self._create_expiration_filter_expr(current_ts) + ) def delete_expired_environments( self, current_ts: t.Optional[int] = None @@ -225,13 +221,7 @@ def get_environments_summary(self) -> t.List[EnvironmentSummary]: Returns: A list of all environment summaries. """ - return [ - self._environment_summmary_from_row(row) - for row in fetchall( - self.engine_adapter, - self._environments_query(required_fields=list(EnvironmentSummary.all_fields())), - ) - ] + return self._fetch_environment_summaries() def get_environment( self, environment: str, lock_for_update: bool = False @@ -327,6 +317,20 @@ def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expression: expression=exp.Literal.number(current_ts), ) + def _fetch_environment_summaries( + self, where: t.Optional[str | exp.Expression] = None + ) -> t.List[EnvironmentSummary]: + return [ + self._environment_summmary_from_row(row) + for row in fetchall( + self.engine_adapter, + self._environments_query( + where=where, + required_fields=list(EnvironmentSummary.all_fields()), + ), + ) + ] + def _environment_to_df(environment: Environment) -> pd.DataFrame: import pandas as pd diff --git a/sqlmesh/core/state_sync/db/facade.py b/sqlmesh/core/state_sync/db/facade.py index 2a27c5fd92..c0d44893c4 100644 --- a/sqlmesh/core/state_sync/db/facade.py +++ b/sqlmesh/core/state_sync/db/facade.py @@ -69,10 +69,6 @@ T = t.TypeVar("T") -if t.TYPE_CHECKING: - pass - - class EngineAdapterStateSync(StateSync): """Manages state of nodes and snapshot with an existing engine adapter.