Skip to content

Commit 8fd1b9e

Browse files
committed
add file systems config for duckdb
1 parent c6dc2e7 commit 8fd1b9e

2 files changed

Lines changed: 15 additions & 18 deletions

File tree

sqlmesh/core/config/connection.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ class BaseDuckDBConnectionConfig(ConnectionConfig):
267267
extensions: A list of autoloadable extensions to load.
268268
connector_config: A dictionary of configuration to pass into the duckdb connector.
269269
secrets: A list of dictionaries used to generate DuckDB secrets for authenticating with external services (e.g. S3).
270+
file_systems: A list of dictionaries used to register `fsspec` filesystems to the DuckDB cursor.
270271
concurrent_tasks: The maximum number of tasks that can use this connection concurrently.
271272
register_comments: Whether or not to register model comments with the SQL engine.
272273
pre_ping: Whether or not to pre-ping the connection before starting a new transaction to ensure it is still alive.
@@ -278,6 +279,7 @@ class BaseDuckDBConnectionConfig(ConnectionConfig):
278279
extensions: t.List[t.Union[str, t.Dict[str, t.Any]]] = []
279280
connector_config: t.Dict[str, t.Any] = {}
280281
secrets: t.List[t.Dict[str, t.Any]] = []
282+
file_systems: t.List[t.Dict[str, t.Any]] = []
281283

282284
concurrent_tasks: int = 1
283285
register_comments: bool = True
@@ -372,6 +374,15 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None:
372374
except Exception as e:
373375
raise ConfigError(f"Failed to create secret: {e}")
374376

377+
if self.file_systems:
378+
from fsspec import filesystem # type: ignore
379+
380+
for file_system in self.file_systems:
381+
protocol = file_system.pop("protocol")
382+
storage_options = file_system.pop("storage_options")
383+
fs = filesystem(protocol, **storage_options)
384+
cursor.register_filesystem(fs)
385+
375386
for i, (alias, path_options) in enumerate(
376387
(getattr(self, "catalogs", None) or {}).items()
377388
):
@@ -383,24 +394,6 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None:
383394
try:
384395
if isinstance(path_options, DuckDBAttachOptions):
385396
query = path_options.to_sql(alias)
386-
387-
if path_options.data_path.split(":")[0] == "abfs":
388-
389-
if path_options.azure_account_name is None or path_options.azure_account_host is None:
390-
raise ValueError("azure_account_name and azure_account_host must be set when using abfs protocol")
391-
392-
393-
storage_options = {
394-
"account_name": path_options.azure_account_name,
395-
"account_host": path_options.azure_account_host,
396-
"anon":False,
397-
}
398-
from fsspec import filesystem
399-
400-
fs = filesystem("abfs", **storage_options)
401-
cursor.register_filesystem(fs)
402-
cursor.commit()
403-
404397
else:
405398
query = f"ATTACH IF NOT EXISTS '{path_options}'"
406399
if not path_options.startswith("md:"):

sqlmesh/dbt/target.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ class DuckDbConfig(TargetConfig):
138138
extensions: A list of autoloadable extensions to load.
139139
settings: A dictionary of settings to pass into the duckdb connector.
140140
secrets: A list of secrets to pass to the secret manager in the duckdb connector.
141+
file_systems: A list of `fsspec` filesystems to register in the duckdb connection.
141142
"""
142143

143144
type: t.Literal["duckdb"] = "duckdb"
@@ -147,6 +148,7 @@ class DuckDbConfig(TargetConfig):
147148
extensions: t.Optional[t.List[str]] = None
148149
settings: t.Optional[t.Dict[str, t.Any]] = None
149150
secrets: t.Optional[t.List[t.Dict[str, t.Any]]] = None
151+
file_systems: t.Optional[t.List[t.Dict[str, t.Any]]] = None
150152

151153
@model_validator(mode="before")
152154
def validate_authentication(cls, data: t.Any) -> t.Any:
@@ -182,6 +184,8 @@ def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
182184
kwargs["connector_config"] = self.settings
183185
if self.secrets is not None:
184186
kwargs["secrets"] = self.secrets
187+
if self.file_systems is not None:
188+
kwargs["file_systems"] = self.file_systems
185189
return DuckDBConnectionConfig(
186190
database=self.path,
187191
concurrent_tasks=1,

0 commit comments

Comments
 (0)