Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 53 additions & 22 deletions docs/integrations/engines/mssql.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,65 @@
# MSSQL

## Local/Built-in Scheduler
**Engine Adapter Type**: `mssql`
## Installation

### Installation
#### User / Password Authentication:
### User / Password Authentication:
```
pip install "sqlmesh[mssql]"
```
#### Microsoft Entra ID / Azure Active Directory Authentication:
### Microsoft Entra ID / Azure Active Directory Authentication:
```
pip install "sqlmesh[mssql-odbc]"
```

## Incremental by unique key `MERGE`

SQLMesh executes a `MERGE` statement to insert rows for [incremental by unique key](../../concepts/models/model_kinds.md#incremental_by_unique_key) model kinds.

By default, the `MERGE` statement updates all non-key columns of an existing row when a new row with the same key values is inserted. If all column values match between the two rows, those updates are unnecessary.

SQLMesh provides an optional performance optimization that skips unnecessary updates by comparing column values with the `EXISTS` and `EXCEPT` operators.

Enable the optimization by setting the `mssql_merge_exists` key to `true` in the [`physical_properties`](../../concepts/models/overview.md#physical_properties) section of the `MODEL` statement.

For example:

```sql linenums="1" hl_lines="7-9"
MODEL (
name sqlmesh_example.unique_key,
kind INCREMENTAL_BY_UNIQUE_KEY (
unique_key id
),
cron '@daily',
physical_properties (
mssql_merge_exists = true
)
);
```

!!! warning "Not all column types supported"
The `mssql_merge_exists` optimization is not supported for all column types, including `GEOMETRY`, `XML`, `TEXT`, `NTEXT`, `IMAGE`, and most user-defined types.

Learn more in the [MSSQL `EXCEPT` statement documentation](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/set-operators-except-and-intersect-transact-sql?view=sql-server-ver17#arguments).

## Local/Built-in Scheduler
**Engine Adapter Type**: `mssql`

### Connection options

| Option | Description | Type | Required |
| ----------------- | ------------------------------------------------------------ | :----------: | :------: |
| `type` | Engine type name - must be `mssql` | string | Y |
| `host` | The hostname of the MSSQL server | string | Y |
| `user` | The username / client id to use for authentication with the MSSQL server | string | N |
| `password` | The password / client secret to use for authentication with the MSSQL server | string | N |
| `port` | The port number of the MSSQL server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `driver` | The driver to use for the connection. Default: pymssql | string | N |
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
| Option | Description | Type | Required |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------: | :------: |
| `type` | Engine type name - must be `mssql` | string | Y |
| `host` | The hostname of the MSSQL server | string | Y |
| `user` | The username / client id to use for authentication with the MSSQL server | string | N |
| `password` | The password / client secret to use for authentication with the MSSQL server | string | N |
| `port` | The port number of the MSSQL server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `driver` | The driver to use for the connection. Default: pymssql | string | N |
| `driver_name` | The driver name to use for the connection (e.g., *ODBC Driver 18 for SQL Server*). | string | N |
| `odbc_properties` | ODBC connection properties (e.g., *authentication: ActiveDirectoryServicePrincipal*). See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,7 @@ def merge(
unique_key: t.Sequence[exp.Expression],
when_matched: t.Optional[exp.Whens] = None,
merge_filter: t.Optional[exp.Expression] = None,
**kwargs: t.Any,
) -> None:
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
source_table, columns_to_types, target_table=target_table
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def merge(
unique_key: t.Sequence[exp.Expression],
when_matched: t.Optional[exp.Whens] = None,
merge_filter: t.Optional[exp.Expression] = None,
**kwargs: t.Any,
) -> None:
logical_merge(
self,
Expand Down
14 changes: 10 additions & 4 deletions sqlmesh/core/engine_adapter/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ def merge(
unique_key: t.Sequence[exp.Expression],
when_matched: t.Optional[exp.Whens] = None,
merge_filter: t.Optional[exp.Expression] = None,
**kwargs: t.Any,
) -> None:
mssql_merge_exists = kwargs.get("physical_properties", {}).get("mssql_merge_exists")

source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
source_table, columns_to_types, target_table=target_table
)
Expand All @@ -214,7 +217,6 @@ def merge(

match_expressions = []
if not when_matched:
match_condition = None
unique_key_names = [y.name for y in unique_key]
columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names]

Expand All @@ -225,10 +227,14 @@ def merge(
exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys
]

match_condition = exp.Exists(
this=exp.select(*target_columns_no_keys).except_(
exp.select(*source_columns_no_keys)
match_condition = (
exp.Exists(
this=exp.select(*target_columns_no_keys).except_(
exp.select(*source_columns_no_keys)
)
)
if mssql_merge_exists
else None
)

if target_columns_no_keys:
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def merge(
unique_key: t.Sequence[exp.Expression],
when_matched: t.Optional[exp.Whens] = None,
merge_filter: t.Optional[exp.Expression] = None,
**kwargs: t.Any,
) -> None:
# Merge isn't supported until Postgres 15
merge_impl = (
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def merge(
unique_key: t.Sequence[exp.Expression],
when_matched: t.Optional[exp.Whens] = None,
merge_filter: t.Optional[exp.Expression] = None,
**kwargs: t.Any,
) -> None:
if self.enable_merge:
# By default we use the logical merge unless the user has opted in
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,7 @@ def insert(
end=kwargs.get("end"),
execution_time=kwargs.get("execution_time"),
),
physical_properties=kwargs.get("physical_properties", model.physical_properties),
)

def append(
Expand All @@ -1634,6 +1635,7 @@ def append(
end=kwargs.get("end"),
execution_time=kwargs.get("execution_time"),
),
physical_properties=kwargs.get("physical_properties", model.physical_properties),
)


Expand Down
68 changes: 62 additions & 6 deletions tests/core/engine_adapter/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def test_merge_pandas(

assert to_sql_calls(adapter) == [
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
]

Expand All @@ -498,11 +498,47 @@ def test_merge_pandas(

assert to_sql_calls(adapter) == [
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
]


def test_merge_exists(
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
):
mocker.patch(
"sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists",
return_value=False,
)

adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)

temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
table_name = "target"
temp_table_id = "abcdefgh"
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)

df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]})

# regular implementation
adapter.merge(
target_table=table_name,
source_table=df,
columns_to_types={
"id": exp.DataType.build("int"),
"ts": exp.DataType.build("TIMESTAMP"),
"val": exp.DataType.build("int"),
},
unique_key=[exp.to_identifier("id")],
)

assert to_sql_calls(adapter) == [
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
]

# all model columns are keys
# merge exists implementation
adapter.cursor.reset_mock()
adapter._connection_pool.get().reset_mock()
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
Expand All @@ -512,11 +548,31 @@ def test_merge_pandas(
columns_to_types={
"id": exp.DataType.build("int"),
"ts": exp.DataType.build("TIMESTAMP"),
"val": exp.DataType.build("int"),
},
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
unique_key=[exp.to_identifier("id")],
physical_properties={"mssql_merge_exists": True},
)
adapter._connection_pool.get().bulk_copy.assert_called_with(
f"__temp_target_{temp_table_id}", [(1, 1), (2, 2), (3, 3)]

assert to_sql_calls(adapter) == [
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
]

# merge exists and all model columns are keys
adapter.cursor.reset_mock()
adapter._connection_pool.get().reset_mock()
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
adapter.merge(
target_table=table_name,
source_table=df,
columns_to_types={
"id": exp.DataType.build("int"),
"ts": exp.DataType.build("TIMESTAMP"),
},
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
physical_properties={"mssql_merge_exists": True},
)

assert to_sql_calls(adapter) == [
Expand Down
3 changes: 3 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2231,6 +2231,7 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
)
]
),
physical_properties={},
)


Expand Down Expand Up @@ -2327,6 +2328,7 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
),
],
),
physical_properties={},
)


Expand Down Expand Up @@ -2478,6 +2480,7 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
expression=exp.Literal(this="2020-01-01", is_string=True),
),
),
physical_properties={},
)


Expand Down