Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions sqlmesh/core/state_sync/db/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,9 @@ def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary
Returns:
The list of environment summaries to remove.
"""

environment_summaries = self.get_environments_summary()
return [
env_summary
for env_summary in environment_summaries
if env_summary.expiration_ts is not None and env_summary.expiration_ts <= current_ts
]
return self._fetch_environment_summaries(
where=self._create_expiration_filter_expr(current_ts)
)

def delete_expired_environments(
self, current_ts: t.Optional[int] = None
Expand Down Expand Up @@ -225,13 +221,7 @@ def get_environments_summary(self) -> t.List[EnvironmentSummary]:
Returns:
A list of all environment summaries.
"""
return [
self._environment_summmary_from_row(row)
for row in fetchall(
self.engine_adapter,
self._environments_query(required_fields=list(EnvironmentSummary.all_fields())),
)
]
return self._fetch_environment_summaries()

def get_environment(
self, environment: str, lock_for_update: bool = False
Expand Down Expand Up @@ -327,6 +317,20 @@ def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expression:
expression=exp.Literal.number(current_ts),
)

def _fetch_environment_summaries(
self, where: t.Optional[str | exp.Expression] = None
) -> t.List[EnvironmentSummary]:
return [
self._environment_summmary_from_row(row)
for row in fetchall(
self.engine_adapter,
self._environments_query(
where=where,
required_fields=list(EnvironmentSummary.all_fields()),
),
)
]


def _environment_to_df(environment: Environment) -> pd.DataFrame:
import pandas as pd
Expand Down
4 changes: 0 additions & 4 deletions sqlmesh/core/state_sync/db/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@
T = t.TypeVar("T")


if t.TYPE_CHECKING:
pass


class EngineAdapterStateSync(StateSync):
"""Manages state of nodes and snapshot with an existing engine adapter.

Expand Down