Skip to content

Commit c9429ae

Browse files
Feat!: Add support for multiple virtual layers controlled by gateway (#4101)
Co-authored-by: Iaroslav Zeigerman <zeigerman.ia@gmail.com>
1 parent 5883eb6 commit c9429ae

32 files changed

Lines changed: 575 additions & 88 deletions

docs/reference/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Configuration options for SQLMesh environment creation and promotion.
3535
| `physical_schema_override` | (Deprecated) Use `physical_schema_mapping` instead. A mapping from model schema names to names of schemas in which physical tables for the corresponding models will be placed. | dict[string, string] | N |
3636
| `physical_schema_mapping` | A mapping from regular expressions to names of schemas in which physical tables for the corresponding models [will be placed](../guides/configuration.md#physical-table-schemas). (Default physical schema name: `sqlmesh__[model schema]`) | dict[string, string] | N |
3737
| `environment_suffix_target` | Whether SQLMesh views should append their environment name to the `schema` or `table` - [additional details](../guides/configuration.md#view-schema-override). (Default: `schema`) | string | N |
38+
| `gateway_managed_virtual_layer` | Whether SQLMesh views of the virtual layer will be created by the default gateway or model specified gateways - [additional details](../guides/configuration.md#view-schema-override). (Default: False) | boolean | N |
3839
| `environment_catalog_mapping` | A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment. | dict[string, string] | N |
3940
| `log_limit` | The default number of logs to keep (Default: `20`) | int | N |
4041

sqlmesh/core/config/root.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class Config(BaseConfig):
7272
model_defaults: Default values for model definitions.
7373
physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
7474
environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
75+
gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
7576
environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
7677
default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
7778
log_limit: The default number of logs to keep.
@@ -110,6 +111,7 @@ class Config(BaseConfig):
110111
environment_suffix_target: EnvironmentSuffixTarget = Field(
111112
default=EnvironmentSuffixTarget.default
112113
)
114+
gateway_managed_virtual_layer: bool = False
113115
environment_catalog_mapping: t.Dict[re.Pattern, str] = {}
114116
default_target_environment: str = c.PROD
115117
log_limit: int = c.DEFAULT_LOG_LIMIT

sqlmesh/core/context.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ def __init__(
364364
self._environment_statements: t.List[EnvironmentStatements] = []
365365
self._excluded_requirements: t.Set[str] = set()
366366
self._default_catalog: t.Optional[str] = None
367+
self._default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None
367368
self._linters: t.Dict[str, Linter] = {}
368369
self._loaded: bool = False
369370

@@ -380,6 +381,7 @@ def __init__(
380381
self.pinned_environments = Environment.sanitize_names(self.config.pinned_environments)
381382
self.auto_categorize_changes = self.config.plan.auto_categorize_changes
382383
self.selected_gateway = gateway or self.config.default_gateway_name
384+
self.gateway_managed_virtual_layer = self.config.gateway_managed_virtual_layer
383385

384386
gw_model_defaults = self.config.gateways[self.selected_gateway].model_defaults
385387
if gw_model_defaults:
@@ -2212,16 +2214,6 @@ def _model_tables(self) -> t.Dict[str, str]:
22122214
for fqn, snapshot in self.snapshots.items()
22132215
}
22142216

2215-
@property
2216-
def _snapshot_gateways(self) -> t.Dict[str, str]:
2217-
"""Mapping of snapshot name to the gateway if specified in the model."""
2218-
2219-
return {
2220-
fqn: snapshot.model.gateway
2221-
for fqn, snapshot in self.snapshots.items()
2222-
if snapshot.is_model and snapshot.model.gateway
2223-
}
2224-
22252217
@cached_property
22262218
def engine_adapters(self) -> t.Dict[str, EngineAdapter]:
22272219
"""Returns all the engine adapters for the gateways defined in the configuration."""
@@ -2232,6 +2224,17 @@ def engine_adapters(self) -> t.Dict[str, EngineAdapter]:
22322224
self._engine_adapters[gateway_name] = adapter
22332225
return self._engine_adapters
22342226

2227+
@cached_property
2228+
def default_catalog_per_gateway(self) -> t.Dict[str, str]:
2229+
"""Returns the default catalogs for each engine adapter."""
2230+
if self._default_catalog_per_gateway is None:
2231+
self._default_catalog_per_gateway = {
2232+
name: adapter.default_catalog
2233+
for name, adapter in self.engine_adapters.items()
2234+
if adapter.default_catalog
2235+
}
2236+
return self._default_catalog_per_gateway
2237+
22352238
def _get_engine_adapter(self, gateway: t.Optional[str] = None) -> EngineAdapter:
22362239
if gateway:
22372240
if adapter := self.engine_adapters.get(gateway):
@@ -2292,22 +2295,33 @@ def _context_diff(
22922295
ensure_finalized_snapshots=ensure_finalized_snapshots,
22932296
diff_rendered=diff_rendered,
22942297
environment_statements=self._environment_statements,
2298+
gateway_managed_virtual_layer=self.gateway_managed_virtual_layer,
22952299
)
22962300

22972301
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2302+
# Clean up expired environments by removing their views and schemas
22982303
self._cleanup_environments()
2299-
expired_snapshots = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
2304+
2305+
# Identify and delete expired snapshots
2306+
cleanup_targets = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
2307+
2308+
# Remove the expired snapshots tables
23002309
self.snapshot_evaluator.cleanup(
2301-
expired_snapshots,
2302-
self._snapshot_gateways,
2310+
target_snapshots=cleanup_targets,
23032311
on_complete=self.console.update_cleanup_progress,
23042312
)
23052313

23062314
self.state_sync.compact_intervals()
23072315

23082316
def _cleanup_environments(self) -> None:
23092317
expired_environments = self.state_sync.delete_expired_environments()
2310-
cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
2318+
2319+
cleanup_expired_views(
2320+
default_adapter=self.engine_adapter,
2321+
engine_adapters=self.engine_adapters,
2322+
environments=expired_environments,
2323+
console=self.console,
2324+
)
23112325

23122326
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23132327
connection_name = connection_name.capitalize()

sqlmesh/core/context_diff.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class ContextDiff(PydanticModel):
5353
"""Whether the currently stored environment record is in unfinalized state."""
5454
normalize_environment_name: bool
5555
"""Whether the environment name should be normalized."""
56+
previous_gateway_managed_virtual_layer: bool
57+
"""Whether the previous environment's virtual layer's views were created by the model specified gateways."""
58+
gateway_managed_virtual_layer: bool
59+
"""Whether the virtual layer's views will be created by the model specified gateways."""
5660
create_from: str
5761
"""The name of the environment the target environment will be created from if new."""
5862
create_from_env_exists: bool
@@ -96,6 +100,7 @@ def create(
96100
excluded_requirements: t.Optional[t.Set[str]] = None,
97101
diff_rendered: bool = False,
98102
environment_statements: t.Optional[t.List[EnvironmentStatements]] = [],
103+
gateway_managed_virtual_layer: bool = False,
99104
) -> ContextDiff:
100105
"""Create a ContextDiff object.
101106
@@ -226,6 +231,8 @@ def create(
226231
diff_rendered=diff_rendered,
227232
previous_environment_statements=previous_environment_statements,
228233
environment_statements=environment_statements,
234+
previous_gateway_managed_virtual_layer=env.gateway_managed if env else False,
235+
gateway_managed_virtual_layer=gateway_managed_virtual_layer,
229236
)
230237

231238
@classmethod
@@ -263,6 +270,8 @@ def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextD
263270
previous_requirements=env.requirements,
264271
requirements=env.requirements,
265272
previous_environment_statements=[],
273+
previous_gateway_managed_virtual_layer=env.gateway_managed,
274+
gateway_managed_virtual_layer=env.gateway_managed,
266275
)
267276

268277
@property
@@ -273,6 +282,7 @@ def has_changes(self) -> bool:
273282
or self.is_unfinalized_environment
274283
or self.has_requirement_changes
275284
or self.has_environment_statements_changes
285+
or self.previous_gateway_managed_virtual_layer != self.gateway_managed_virtual_layer
276286
)
277287

278288
@property

sqlmesh/core/environment.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from sqlmesh.utils.date import TimeLike, now_timestamp
1717
from sqlmesh.utils.jinja import JinjaMacroRegistry
1818
from sqlmesh.utils.metaprogramming import Executable
19-
from sqlmesh.utils.pydantic import PydanticModel, field_validator
19+
from sqlmesh.utils.pydantic import PydanticModel, field_validator, ValidationInfo
2020

2121
T = t.TypeVar("T", bound="EnvironmentNamingInfo")
2222
PydanticType = t.TypeVar("PydanticType", bound="PydanticModel")
@@ -32,22 +32,27 @@ class EnvironmentNamingInfo(PydanticModel):
3232
catalog_name_override: The name of the catalog to use for this environment if an override was provided
3333
normalize_name: Indicates whether the environment's name will be normalized. For example, if it's
3434
`dev`, then it will become `DEV` when targeting Snowflake.
35+
gateway_managed: Determines whether the virtual layer's views are created by the model-specific
36+
gateways, otherwise the default gateway is used. Default: False.
3537
"""
3638

3739
name: str = c.PROD
3840
suffix_target: EnvironmentSuffixTarget = Field(default=EnvironmentSuffixTarget.SCHEMA)
3941
catalog_name_override: t.Optional[str] = None
4042
normalize_name: bool = True
43+
gateway_managed: bool = False
4144

4245
@field_validator("name", mode="before")
4346
@classmethod
4447
def _sanitize_name(cls, v: str) -> str:
4548
return word_characters_only(v).lower()
4649

47-
@field_validator("normalize_name", mode="before")
50+
@field_validator("normalize_name", "gateway_managed", mode="before")
4851
@classmethod
49-
def _validate_normalize_name(cls, v: t.Any) -> bool:
50-
return True if v is None else bool(v)
52+
def _validate_boolean_field(cls, v: t.Any, info: ValidationInfo) -> bool:
53+
if v is None:
54+
return info.field_name == "normalize_name"
55+
return bool(v)
5156

5257
@t.overload
5358
@classmethod
@@ -194,6 +199,7 @@ def naming_info(self) -> EnvironmentNamingInfo:
194199
suffix_target=self.suffix_target,
195200
catalog_name_override=self.catalog_name_override,
196201
normalize_name=self.normalize_name,
202+
gateway_managed=self.gateway_managed,
197203
)
198204

199205
@property

sqlmesh/core/loader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ def _load() -> t.List[Model]:
468468
default_catalog=self.context.default_catalog,
469469
infer_names=self.config.model_naming.infer_names,
470470
signal_definitions=signals,
471+
default_catalog_per_gateway=self.context.default_catalog_per_gateway,
471472
)
472473
except Exception as ex:
473474
raise ConfigError(f"Failed to load model definition at '{path}'.\n{ex}")
@@ -525,6 +526,7 @@ def _load_python_models(
525526
default_catalog=self.context.default_catalog,
526527
infer_names=self.config.model_naming.infer_names,
527528
audit_definitions=audits,
529+
default_catalog_per_gateway=self.context.default_catalog_per_gateway,
528530
):
529531
if model.enabled:
530532
models[model.fqn] = model

sqlmesh/core/model/decorator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def models(
9393
path: Path,
9494
module_path: Path,
9595
dialect: t.Optional[str] = None,
96+
default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
9697
**loader_kwargs: t.Any,
9798
) -> t.List[Model]:
9899
return create_models_from_blueprints(
@@ -103,6 +104,7 @@ def models(
103104
path=path,
104105
module_path=module_path,
105106
dialect=dialect,
107+
default_catalog_per_gateway=default_catalog_per_gateway,
106108
**loader_kwargs,
107109
)
108110

sqlmesh/core/model/definition.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1886,6 +1886,7 @@ def create_models_from_blueprints(
18861886
path: Path = Path(),
18871887
module_path: Path = Path(),
18881888
dialect: DialectType = None,
1889+
default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
18891890
**loader_kwargs: t.Any,
18901891
) -> t.List[Model]:
18911892
model_blueprints: t.List[Model] = []
@@ -1907,6 +1908,13 @@ def create_models_from_blueprints(
19071908
else:
19081909
gateway_name = None
19091910

1911+
if (
1912+
default_catalog_per_gateway
1913+
and gateway_name
1914+
and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None
1915+
):
1916+
loader_kwargs["default_catalog"] = catalog
1917+
19101918
model_blueprints.append(
19111919
loader(
19121920
path=path,
@@ -1927,6 +1935,7 @@ def load_sql_based_models(
19271935
path: Path = Path(),
19281936
module_path: Path = Path(),
19291937
dialect: DialectType = None,
1938+
default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
19301939
**loader_kwargs: t.Any,
19311940
) -> t.List[Model]:
19321941
gateway: t.Optional[exp.Expression] = None
@@ -1964,6 +1973,7 @@ def load_sql_based_models(
19641973
path=path,
19651974
module_path=module_path,
19661975
dialect=dialect,
1976+
default_catalog_per_gateway=default_catalog_per_gateway,
19671977
**loader_kwargs,
19681978
)
19691979

sqlmesh/core/plan/builder.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def __init__(
151151
name=self._context_diff.environment,
152152
suffix_target=environment_suffix_target,
153153
normalize_name=self._context_diff.normalize_environment_name,
154+
gateway_managed=self._context_diff.gateway_managed_virtual_layer,
154155
)
155156

156157
self._latest_plan: t.Optional[Plan] = None

sqlmesh/core/snapshot/definition.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
478478
base_table_name_override: t.Optional[str] = None
479479
custom_materialization: t.Optional[str] = None
480480
dev_table_suffix: str
481+
model_gateway: t.Optional[str] = None
481482

482483
def __lt__(self, other: SnapshotTableInfo) -> bool:
483484
return self.name < other.name
@@ -1179,6 +1180,7 @@ def table_info(self) -> SnapshotTableInfo:
11791180
node_type=self.node_type,
11801181
custom_materialization=custom_materialization,
11811182
dev_table_suffix=self.dev_table_suffix,
1183+
model_gateway=self.model_gateway,
11821184
)
11831185

11841186
@property

0 commit comments

Comments
 (0)