Skip to content

Commit 863b4d4

Browse files
refactor; add comments
1 parent 28a02a5 commit 863b4d4

16 files changed

Lines changed: 121 additions & 10 deletions

File tree

sqlmesh/core/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2226,7 +2226,7 @@ def engine_adapters(self) -> t.Dict[str, EngineAdapter]:
22262226

22272227
@cached_property
22282228
def default_catalog_per_gateway(self) -> t.Dict[str, str]:
2229-
"""Returns the catalogs for each engine adapter in a multi virtual layer setup when the catalog isn't shared."""
2229+
"""Returns the default catalogs for each engine adapter."""
22302230
if self._default_catalog_per_gateway is None:
22312231
self._default_catalog_per_gateway = {
22322232
name: adapter.default_catalog

sqlmesh/core/context_diff.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ def create_no_diff(cls, environment: str, state_reader: StateReader) -> ContextD
270270
previous_requirements=env.requirements,
271271
requirements=env.requirements,
272272
previous_environment_statements=[],
273+
previous_gateway_managed_virtual_layer=env.gateway_managed,
274+
gateway_managed_virtual_layer=env.gateway_managed,
273275
)
274276

275277
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ def promote(
239239
)
240240
tables_by_gateway[gateway].append(table)
241241

242+
# A schema can be shared across multiple engines, so we need to group by gateway
242243
for gateway, tables in tables_by_gateway.items():
243244
self._create_schemas(tables=tables, gateway=gateway)
244245

@@ -337,6 +338,7 @@ def _get_data_objects(
337338

338339
with self.concurrent_context():
339340
existing_objects: t.Set[str] = set()
341+
# A schema can be shared across multiple engines, so we need to group tables by both gateway and schema
340342
for gateway, tables_by_schema in tables_by_gateway_and_schema.items():
341343
objs_for_gateway = {
342344
obj

tests/core/test_context.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,21 +327,15 @@ def test_evaluate_limit():
327327
def test_gateway_specific_adapters(copy_to_temp_path, mocker):
328328
path = copy_to_temp_path("examples/sushi")
329329
ctx = Context(paths=path, config="isolated_systems_config", gateway="prod")
330-
assert len(ctx._engine_adapters) == 1
330+
assert len(ctx._engine_adapters) == 3
331331
assert ctx.engine_adapter == ctx._engine_adapters["prod"]
332-
333-
with pytest.raises(SQLMeshError):
334-
assert ctx._get_engine_adapter("non_existing")
335-
336-
# This will create the requested engine adapter
337332
assert ctx._get_engine_adapter("dev") == ctx._engine_adapters["dev"]
338333

339334
ctx = Context(paths=path, config="isolated_systems_config")
340-
assert len(ctx._engine_adapters) == 1
335+
assert len(ctx._engine_adapters) == 3
341336
assert ctx.engine_adapter == ctx._engine_adapters["dev"]
342337

343338
ctx = Context(paths=path, config="isolated_systems_config")
344-
345339
assert len(ctx.engine_adapters) == 3
346340
assert ctx.engine_adapter == ctx._get_engine_adapter()
347341
assert ctx._get_engine_adapter("test") == ctx._engine_adapters["test"]

tests/core/test_integration.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import pytest
1212
from pathlib import Path
1313
import os
14+
from sqlmesh.utils.concurrency import NodeExecutionFailedError
1415
import time_machine
1516
from pytest_mock.plugin import MockerFixture
1617
from sqlglot import exp
@@ -4497,7 +4498,7 @@ def test_multi(mocker):
44974498

44984499
@use_terminal_console
44994500
def test_multi_virtual_layer(mocker):
4500-
context = Context(paths=["examples/multi_virtual_layer"])
4501+
context = Context(paths=["tests/fixtures/multi_virtual_layer"])
45014502

45024503
local_db = "db.duckdb"
45034504
if os.path.exists(local_db):
@@ -4569,6 +4570,20 @@ def test_multi_virtual_layer(mocker):
45694570
== " item_id global_one macro_one extra\n0 gateway_2 88 1 c"
45704571
)
45714572

4573+
# Changing the flag should show a diff
4574+
context.gateway_managed_virtual_layer = False
4575+
plan = context.plan_builder().build()
4576+
assert not plan.requires_backfill
4577+
assert (
4578+
plan.context_diff.previous_gateway_managed_virtual_layer
4579+
!= plan.context_diff.gateway_managed_virtual_layer
4580+
)
4581+
assert plan.context_diff.has_changes
4582+
4583+
# This should error since the default_gateway won't have access to create the view on a non-shared catalog
4584+
with pytest.raises(NodeExecutionFailedError, match=r"Execution failed for node SnapshotId*"):
4585+
context.apply(plan)
4586+
45724587
if os.path.exists(local_db):
45734588
os.remove(local_db)
45744589

0 commit comments

Comments
 (0)