Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions docs/integrations/engines/azuresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,35 @@

[Azure SQL](https://azure.microsoft.com/en-us/products/azure-sql) is "a family of managed, secure, and intelligent products that use the SQL Server database engine in the Azure cloud."

The Azure SQL adapter only supports authentication with a username and password. It does not support authentication with Microsoft Entra or Azure Active Directory.

## Local/Built-in Scheduler
**Engine Adapter Type**: `azuresql`

### Installation
#### User / Password Authentication:
```
pip install "sqlmesh[azuresql]"
```
#### Microsoft Entra ID / Azure Active Directory Authentication:
```
pip install "sqlmesh[azuresql-odbc]"
```

### Connection options

| Option | Description | Type | Required |
| ----------------- | ---------------------------------------------------------------- | :----------: | :------: |
| `type` | Engine type name - must be `azuresql` | string | Y |
| `host` | The hostname of the Azure SQL server | string | Y |
| `user` | The username to use for authentication with the Azure SQL server | string | N |
| `password` | The password to use for authentication with the Azure SQL server | string | N |
| `user` | The username / client ID to use for authentication with the Azure SQL server | string | N |
| `password` | The password / client secret to use for authentication with the Azure SQL server | string | N |
| `port` | The port number of the Azure SQL server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `driver` | The driver to use for the connection. Default: pymssql | string | N |
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
12 changes: 10 additions & 2 deletions docs/integrations/engines/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
**Engine Adapter Type**: `mssql`

### Installation
#### User / Password Authentication:
```
pip install "sqlmesh[mssql]"
```
#### Microsoft Entra ID / Azure Active Directory Authentication:
```
pip install "sqlmesh[mssql-odbc]"
```

### Connection options

| Option | Description | Type | Required |
| ----------------- | ------------------------------------------------------------ | :----------: | :------: |
| `type` | Engine type name - must be `mssql` | string | Y |
| `host` | The hostname of the MSSQL server | string | Y |
| `user` | The username to use for authentication with the MSSQL server | string | N |
| `password` | The password to use for authentication with the MSSQL server | string | N |
| `user` | The username / client id to use for authentication with the MSSQL server | string | N |
| `password` | The password / client secret to use for authentication with the MSSQL server | string | N |
| `port` | The port number of the MSSQL server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
Expand All @@ -24,3 +29,6 @@ pip install "sqlmesh[mssql]"
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `driver` | The driver to use for the connection. Default: pymssql | string | N |
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ classifiers = [
[project.optional-dependencies]
athena = ["PyAthena[Pandas]"]
azuresql = ["pymssql"]
azuresql-odbc = ["pyodbc"]
bigquery = [
"google-cloud-bigquery[pandas]",
"google-cloud-bigquery-storage"
Expand Down Expand Up @@ -104,6 +105,7 @@ gcppostgres = ["cloud-sql-python-connector[pg8000]>=1.8.0"]
github = ["PyGithub~=2.5.0"]
llm = ["langchain", "openai"]
mssql = ["pymssql"]
mssql-odbc = ["pyodbc"]
mysql = ["pymysql"]
mwaa = ["boto3"]
postgres = ["psycopg2"]
Expand Down Expand Up @@ -203,6 +205,7 @@ module = [
"databricks_cli.*",
"mysql.*",
"pymssql.*",
"pyodbc.*",
"psycopg2.*",
"langchain.*",
"pytest_lazyfixture.*",
Expand Down
127 changes: 120 additions & 7 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@


def _get_engine_import_validator(
import_name: str, engine_type: str, extra_name: t.Optional[str] = None
import_name: str, engine_type: str, extra_name: t.Optional[str] = None, decorate: bool = True
) -> t.Callable:
extra_name = extra_name or engine_type

@model_validator(mode="before")
def validate(cls: t.Any, data: t.Any) -> t.Any:
check_import = (
str_to_bool(str(data.pop("check_import", True))) if isinstance(data, dict) else True
Expand All @@ -83,7 +82,7 @@ def validate(cls: t.Any, data: t.Any) -> t.Any:

return data

return validate
return model_validator(mode="before")(validate) if decorate else validate


class ConnectionConfig(abc.ABC, BaseConfig):
Expand Down Expand Up @@ -1422,17 +1421,50 @@ class MSSQLConnectionConfig(ConnectionConfig):
autocommit: t.Optional[bool] = False
tds_version: t.Optional[str] = None

# Driver options
driver: t.Literal["pymssql", "pyodbc"] = "pymssql"
# PyODBC specific options
driver_name: t.Optional[str] = None # e.g. "ODBC Driver 18 for SQL Server"
trust_server_certificate: t.Optional[bool] = None
encrypt: t.Optional[bool] = None
# Dictionary of arbitrary ODBC connection properties
# See: https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute
odbc_properties: t.Optional[t.Dict[str, t.Any]] = None

concurrent_tasks: int = 4
register_comments: bool = True
pre_ping: bool = True

type_: t.Literal["mssql"] = Field(alias="type", default="mssql")

_engine_import_validator = _get_engine_import_validator("pymssql", "mssql")
@model_validator(mode="before")
@classmethod
def _mssql_engine_import_validator(cls, data: t.Any) -> t.Any:
if not isinstance(data, dict):
return data

driver = data.get("driver", "pymssql")

# Define the mapping of driver to import module and extra name
driver_configs = {"pymssql": ("pymssql", "mssql"), "pyodbc": ("pyodbc", "mssql-odbc")}

if driver not in driver_configs:
raise ValueError(f"Unsupported driver: {driver}")

import_module, extra_name = driver_configs[driver]

# Use _get_engine_import_validator with decorate=False to get the raw validation function
# This avoids the __wrapped__ issue in Python 3.9
validator_func = _get_engine_import_validator(
import_module, driver, extra_name, decorate=False
)

# Call the raw validation function directly
return validator_func(cls, data)

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
return {
base_keys = {
"host",
"user",
"password",
Expand All @@ -1447,15 +1479,96 @@ def _connection_kwargs_keys(self) -> t.Set[str]:
"tds_version",
}

if self.driver == "pyodbc":
base_keys.update(
{
"driver_name",
"trust_server_certificate",
"encrypt",
"odbc_properties",
}
)
# Remove pymssql-specific parameters
base_keys.discard("tds_version")
base_keys.discard("conn_properties")

return base_keys

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
return engine_adapter.MSSQLEngineAdapter

@property
def _connection_factory(self) -> t.Callable:
import pymssql
if self.driver == "pymssql":
import pymssql

return pymssql.connect

import pyodbc

def connect(**kwargs: t.Any) -> t.Callable:
# Extract parameters for connection string
host = kwargs.pop("host")
port = kwargs.pop("port", 1433)
database = kwargs.pop("database", "")
user = kwargs.pop("user", None)
password = kwargs.pop("password", None)
driver_name = kwargs.pop("driver_name", "ODBC Driver 18 for SQL Server")
trust_server_certificate = kwargs.pop("trust_server_certificate", False)
encrypt = kwargs.pop("encrypt", True)
login_timeout = kwargs.pop("login_timeout", 60)

# Build connection string
conn_str_parts = [
f"DRIVER={{{driver_name}}}",
f"SERVER={host},{port}",
]

if database:
conn_str_parts.append(f"DATABASE={database}")

# Add security options
conn_str_parts.append(f"Encrypt={'YES' if encrypt else 'NO'}")
if trust_server_certificate:
conn_str_parts.append("TrustServerCertificate=YES")

conn_str_parts.append(f"Connection Timeout={login_timeout}")

# Standard SQL Server authentication
if user:
conn_str_parts.append(f"UID={user}")
if password:
conn_str_parts.append(f"PWD={password}")

# Add any additional ODBC properties from the odbc_properties dictionary
if self.odbc_properties:
for key, value in self.odbc_properties.items():
# Skip properties that we've already set above
if key.lower() in (
"driver",
"server",
"database",
"uid",
"pwd",
"encrypt",
"trustservercertificate",
"connection timeout",
):
continue

return pymssql.connect
# Handle boolean values properly
if isinstance(value, bool):
conn_str_parts.append(f"{key}={'YES' if value else 'NO'}")
else:
conn_str_parts.append(f"{key}={value}")

# Create the connection string
conn_str = ";".join(conn_str_parts)

return pyodbc.connect(conn_str, autocommit=kwargs.get("autocommit", False))

return connect

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/engine_adapter/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ def _df_to_source_queries(
assert isinstance(df, pd.DataFrame)
temp_table = self._get_temp_table(target_table or "pandas")

# Return the superclass implementation if the connection pool doesn't support bulk_copy
if not hasattr(self._connection_pool.get(), "bulk_copy"):
return super()._df_to_source_queries(df, columns_to_types, batch_size, target_table)

def query_factory() -> Query:
# It is possible for the factory to be called multiple times and if so then the temp table will already
# be created so we skip creating again. This means we are assuming the first call is the same result
Expand Down
Loading