Skip to content

Commit 0b69311

Browse files
committed
check connected catalog
1 parent 6e1e980 commit 0b69311

2 files changed

Lines changed: 113 additions & 40 deletions

File tree

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ def _target_catalog(self) -> t.Optional[str]:
5252
def _target_catalog(self, value: t.Optional[str]) -> None:
5353
self._connection_pool.set_attribute("target_catalog", value)
5454

55+
@property
56+
def _connected_catalog(self) -> t.Optional[str]:
57+
"""Catalog the currently-open thread-local connection is actually using."""
58+
return self._connection_pool.get_attribute("connected_catalog")
59+
60+
@_connected_catalog.setter
61+
def _connected_catalog(self, value: t.Optional[str]) -> None:
62+
self._connection_pool.set_attribute("connected_catalog", value)
63+
5564
def _normalize_catalog(
5665
self, catalog_name: t.Optional[str]
5766
) -> t.Optional[str]:
@@ -112,17 +121,21 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None:
112121
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
113122
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
114123
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
115-
current_catalog = self.get_current_catalog()
116124

117125
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
118126
self.api_client.delete_warehouse(warehouse_name)
119127

120-
if warehouse_name == current_catalog:
121-
# Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist
122-
# In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog"
123-
# So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads
124-
# that use an either use an existing connection pointing to this warehouse or trigger a new connection
125-
# will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data
128+
# Close all connections if any thread may be using the dropped warehouse.
129+
# We must check both the logical target and the physical connection catalog
130+
# (falling back to the configured default when either is neutral) because
131+
# Fabric validates the DATABASE= connection argument and raises
132+
# 'Authentication Failed' when it points at a non-existent warehouse.
133+
default_db = self._extra_config.get("database")
134+
in_use = {
135+
self.get_current_catalog() or default_db,
136+
self._normalize_catalog(self._connected_catalog) or default_db,
137+
}
138+
if warehouse_name in in_use:
126139
self.close()
127140

128141
def get_current_catalog(self) -> t.Optional[str]:
@@ -149,39 +162,58 @@ def set_current_catalog(self, catalog_name: t.Optional[str]) -> None:
149162
See:
150163
https://learn.microsoft.com/en-us/fabric/data-warehouse/sql-query-editor#limitations
151164
"""
152-
current_catalog = self.get_current_catalog()
153165
target_catalog = self._normalize_catalog(catalog_name)
154166

155-
# If already using the requested catalog, do nothing
156-
if current_catalog == target_catalog:
157-
logger.debug("Already using the requested Fabric catalog state, no action needed")
167+
# No-op: the logical catalog state already matches.
168+
if self.get_current_catalog() == target_catalog:
169+
logger.debug(
170+
"Already using requested Fabric catalog state, no action needed"
171+
)
158172
return
159173

160-
logger.info(
161-
"Switching from catalog '%s' to '%s'",
162-
self._catalog_state_label(current_catalog),
163-
self._catalog_state_label(target_catalog),
174+
# Decide whether the open connection needs to be replaced.
175+
#
176+
# The set_catalog decorator restores the previous catalog (often None)
177+
# after every catalog-scoped call. For Fabric, a connection close +
178+
# reopen is expensive because each new connection goes through ODBC and
179+
# the Fabric gateway. We therefore apply lazy connection management:
180+
#
181+
# * When restoring to neutral (target=None): just update _target_catalog.
182+
# The existing connection stays alive and will be reused or replaced
183+
# on the next real switch, avoiding a pointless bounce through the
184+
# default catalog.
185+
#
186+
# * When switching to a non-neutral catalog: only close/reopen if the
187+
# open connection is already on a different catalog. If a previous
188+
# restore-to-neutral left the connection on the right catalog, we
189+
# skip the close entirely.
190+
connected_catalog = self._normalize_catalog(self._connected_catalog)
191+
needs_reconnect = (
192+
target_catalog is not None and connected_catalog != target_catalog
164193
)
165194

166-
# commit the transaction before closing the connection to help prevent errors like:
167-
# > Snapshot isolation transaction failed in database because the object accessed by the statement has been modified by a
168-
# > DDL statement in another concurrent transaction since the start of this transaction
169-
# on subsequent queries in the new connection
170-
self._connection_pool.commit()
195+
if needs_reconnect:
196+
logger.info(
197+
"Switching connection from catalog '%s' to '%s'",
198+
self._catalog_state_label(connected_catalog),
199+
self._catalog_state_label(target_catalog),
200+
)
201+
# Commit before closing to avoid snapshot-isolation errors on
202+
# subsequent queries in the new connection.
203+
self._connection_pool.commit()
204+
# note: close() on the pool (not self.close()) to only affect this
205+
# thread's connection rather than all threads.
206+
self._connection_pool.close()
207+
self._connected_catalog = target_catalog
208+
else:
209+
logger.debug(
210+
"Updating catalog target to '%s' (connection remains on '%s')",
211+
self._catalog_state_label(target_catalog),
212+
self._catalog_state_label(connected_catalog),
213+
)
171214

172-
# note: we call close() on the connection pool instead of self.close() because self.close() calls close_all()
173-
# on the connection pool but we just want to close the connection for this thread
174-
self._connection_pool.close()
175215
self._target_catalog = target_catalog
176216

177-
catalog_after_switch = self.get_current_catalog()
178-
179-
if catalog_after_switch != target_catalog:
180-
# We need to raise an error if the catalog switch failed to prevent the operation that needed the catalog switch from being run against the wrong catalog
181-
raise SQLMeshError(
182-
f"Unable to switch catalog to {target_catalog}, catalog ended up as {catalog_after_switch}"
183-
)
184-
185217
def alter_table(
186218
self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]]
187219
) -> None:

tests/core/engine_adapter/test_fabric.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,71 @@ def test_set_current_catalog_to_default_clears_explicit_target(
7979
adapter.cursor.execute.assert_not_called()
8080

8181

82-
def test_catalog_scoped_call_restores_to_neutral_state(
82+
def test_catalog_scoped_call_restores_to_neutral_without_close(
8383
make_mocked_engine_adapter: t.Callable,
8484
mocker: MockerFixture,
8585
):
86+
"""Decorator's restore-to-neutral must not close the existing connection."""
8687
adapter = make_mocked_engine_adapter(
8788
FabricEngineAdapter,
8889
default_catalog="core",
8990
database="core",
9091
)
91-
set_current_catalog_spy = mocker.patch.object(
92-
adapter,
93-
"set_current_catalog",
94-
wraps=adapter.set_current_catalog,
95-
)
92+
close_spy = mocker.spy(adapter._connection_pool, "close")
9693
adapter.cursor.fetchone.return_value = (1,)
9794

9895
adapter.table_exists("planning.db.table")
9996

100-
assert [
101-
call.args[0] for call in set_current_catalog_spy.call_args_list
102-
] == ["planning", None]
97+
# Decorator calls set_current_catalog("planning") then set_current_catalog(None).
98+
# Only the first call (None→planning) should trigger a connection close.
99+
assert close_spy.call_count == 1
100+
assert adapter._connected_catalog == "planning"
103101
assert adapter.get_current_catalog() is None
104102

105103

104+
def test_repeated_same_catalog_reuses_connection(
105+
make_mocked_engine_adapter: t.Callable,
106+
mocker: MockerFixture,
107+
):
108+
"""Two consecutive operations on the same catalog share one connection."""
109+
adapter = make_mocked_engine_adapter(
110+
FabricEngineAdapter,
111+
default_catalog="core",
112+
database="core",
113+
)
114+
close_spy = mocker.spy(adapter._connection_pool, "close")
115+
adapter.cursor.fetchone.return_value = (1,)
116+
117+
adapter.table_exists("planning.db.table")
118+
adapter.table_exists("planning.db.table")
119+
120+
# Only the very first switch (None→planning) should close.
121+
# The restore to neutral keeps the connection alive and the second
122+
# planning operation reuses it without another close.
123+
assert close_spy.call_count == 1
124+
assert adapter._connected_catalog == "planning"
125+
126+
127+
def test_switching_between_catalogs_closes_each_time(
128+
make_mocked_engine_adapter: t.Callable,
129+
mocker: MockerFixture,
130+
):
131+
"""Switching to a different catalog always triggers a connection close."""
132+
adapter = make_mocked_engine_adapter(
133+
FabricEngineAdapter,
134+
default_catalog="core",
135+
database="core",
136+
)
137+
close_spy = mocker.spy(adapter._connection_pool, "close")
138+
adapter.cursor.fetchone.return_value = (1,)
139+
140+
adapter.table_exists("safran.db.table") # None→safran: 1 close
141+
adapter.table_exists("planning.db.table") # safran→planning: 2nd close
142+
143+
assert close_spy.call_count == 2
144+
assert adapter._connected_catalog == "planning"
145+
146+
106147
def test_columns(adapter: FabricEngineAdapter):
107148
adapter.cursor.fetchall.return_value = [
108149
("decimal_ps", "decimal", None, 5, 4),

0 commit comments

Comments
 (0)