Skip to content

Commit 5eb89c1

Browse files
Refactor to pass as argument to create _engine_adapter
1 parent 9c38fd2 commit 5eb89c1

3 files changed

Lines changed: 15 additions & 8 deletions

File tree

sqlmesh/core/config/connection.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,12 @@ def connection_validator(self) -> t.Callable[[], None]:
109109
"""A function that validates the connection configuration"""
110110
return self.create_engine_adapter().ping
111111

112-
def create_engine_adapter(self, register_comments_override: bool = False) -> EngineAdapter:
112+
def create_engine_adapter(
113+
self, register_comments_override: bool = False, concurrent_tasks: t.Optional[int] = None
114+
) -> EngineAdapter:
113115
"""Returns a new instance of the Engine Adapter."""
116+
if concurrent_tasks:
117+
self.concurrent_tasks = concurrent_tasks
114118
return self._engine_adapter(
115119
self._connection_factory_with_kwargs,
116120
multithreaded=self.concurrent_tasks > 1,
@@ -288,7 +292,9 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None:
288292

289293
return init
290294

291-
def create_engine_adapter(self, register_comments_override: bool = False) -> EngineAdapter:
295+
def create_engine_adapter(
296+
self, register_comments_override: bool = False, concurrent_tasks: t.Optional[int] = None
297+
) -> EngineAdapter:
292298
"""Checks if another engine adapter has already been created that shares a catalog that points to the same data
293299
file. If so, it uses that same adapter instead of creating a new one. As a result, any additional configuration
294300
associated with the new adapter will be ignored."""
@@ -319,7 +325,9 @@ def create_engine_adapter(self, register_comments_override: bool = False) -> Eng
319325
logger.info(f"Creating new DuckDB adapter for data files: {masked_files}")
320326
else:
321327
logger.info("Creating new DuckDB adapter for in-memory database")
322-
adapter = super().create_engine_adapter(register_comments_override)
328+
adapter = super().create_engine_adapter(
329+
register_comments_override, concurrent_tasks=concurrent_tasks
330+
)
323331
for data_file in data_files:
324332
key = data_file if isinstance(data_file, str) else data_file.path
325333
BaseDuckDBConnectionConfig._data_file_to_adapter[key] = adapter

sqlmesh/core/config/scheduler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ def create_state_sync(self, context: GenericContext) -> StateSync:
9090
+ f"multi threaded mode with {warehouse_connection.concurrent_tasks} concurrent tasks."
9191
+ " This can cause SQLMesh to hang. Overriding the duckdb state connection config to use multi threaded mode."
9292
)
93-
# this triggers multithreaded mode and has to happen before the engine adapter is created below
94-
state_connection.concurrent_tasks = warehouse_connection.concurrent_tasks
9593

96-
engine_adapter = state_connection.create_engine_adapter()
94+
engine_adapter = state_connection.create_engine_adapter(
95+
concurrent_tasks=warehouse_connection.concurrent_tasks
96+
)
9797
if state_connection.is_forbidden_for_state_sync:
9898
raise ConfigError(
9999
f"The {engine_adapter.DIALECT.upper()} engine cannot be used to store SQLMesh state - please specify a different `state_connection` engine."

sqlmesh/core/context.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2197,8 +2197,7 @@ def engine_adapters(self) -> t.Dict[str, EngineAdapter]:
21972197
for gateway_name in self.config.gateways:
21982198
if gateway_name != self.selected_gateway:
21992199
connection = self.config.get_connection(gateway_name)
2200-
connection.concurrent_tasks = self.concurrent_tasks
2201-
adapter = connection.create_engine_adapter()
2200+
adapter = connection.create_engine_adapter(concurrent_tasks=self.concurrent_tasks)
22022201
self._engine_adapters[gateway_name] = adapter
22032202
return self._engine_adapters
22042203

0 commit comments

Comments
 (0)