From 99404f30fe994c9e18929c267271cdb2d37e1d27 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Tue, 1 Jul 2025 17:04:37 -0500 Subject: [PATCH 1/2] Make MSSQL merge exists implementation opt-in --- sqlmesh/core/engine_adapter/base.py | 1 + sqlmesh/core/engine_adapter/mixins.py | 1 + sqlmesh/core/engine_adapter/mssql.py | 14 +++-- sqlmesh/core/engine_adapter/postgres.py | 1 + sqlmesh/core/engine_adapter/redshift.py | 1 + sqlmesh/core/snapshot/evaluator.py | 2 + tests/core/engine_adapter/test_mssql.py | 68 ++++++++++++++++++++++--- tests/core/test_snapshot_evaluator.py | 3 ++ 8 files changed, 81 insertions(+), 10 deletions(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 591d81c9ae..8740177837 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1939,6 +1939,7 @@ def merge( unique_key: t.Sequence[exp.Expression], when_matched: t.Optional[exp.Whens] = None, merge_filter: t.Optional[exp.Expression] = None, + **kwargs: t.Any, ) -> None: source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( source_table, columns_to_types, target_table=target_table diff --git a/sqlmesh/core/engine_adapter/mixins.py b/sqlmesh/core/engine_adapter/mixins.py index 4b0e86a772..5ca1f200d9 100644 --- a/sqlmesh/core/engine_adapter/mixins.py +++ b/sqlmesh/core/engine_adapter/mixins.py @@ -32,6 +32,7 @@ def merge( unique_key: t.Sequence[exp.Expression], when_matched: t.Optional[exp.Whens] = None, merge_filter: t.Optional[exp.Expression] = None, + **kwargs: t.Any, ) -> None: logical_merge( self, diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 60fe99ffc4..112193073d 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -198,7 +198,10 @@ def merge( unique_key: t.Sequence[exp.Expression], when_matched: t.Optional[exp.Whens] = None, merge_filter: t.Optional[exp.Expression] = None, + **kwargs: t.Any, ) -> None: + mssql_merge_exists = kwargs.get("physical_properties", {}).get("mssql_merge_exists") + source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( source_table, columns_to_types, target_table=target_table ) @@ -214,7 +217,6 @@ def merge( match_expressions = [] if not when_matched: - match_condition = None unique_key_names = [y.name for y in unique_key] columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names] @@ -225,10 +227,14 @@ def merge( exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys ] - match_condition = exp.Exists( - this=exp.select(*target_columns_no_keys).except_( - exp.select(*source_columns_no_keys) + match_condition = ( + exp.Exists( + this=exp.select(*target_columns_no_keys).except_( + exp.select(*source_columns_no_keys) + ) ) + if mssql_merge_exists + else None ) if target_columns_no_keys: diff --git a/sqlmesh/core/engine_adapter/postgres.py b/sqlmesh/core/engine_adapter/postgres.py index bd7faef289..9962c037ac 100644 --- a/sqlmesh/core/engine_adapter/postgres.py +++ b/sqlmesh/core/engine_adapter/postgres.py @@ -109,6 +109,7 @@ def merge( unique_key: t.Sequence[exp.Expression], when_matched: t.Optional[exp.Whens] = None, merge_filter: t.Optional[exp.Expression] = None, + **kwargs: t.Any, ) -> None: # Merge isn't supported until Postgres 15 merge_impl = ( diff --git a/sqlmesh/core/engine_adapter/redshift.py b/sqlmesh/core/engine_adapter/redshift.py index 946d4ee318..906c52445f 100644 --- a/sqlmesh/core/engine_adapter/redshift.py +++ b/sqlmesh/core/engine_adapter/redshift.py @@ -353,6 +353,7 @@ def merge( unique_key: t.Sequence[exp.Expression], when_matched: t.Optional[exp.Whens] = None, merge_filter: t.Optional[exp.Expression] = None, + **kwargs: t.Any, ) -> None: if self.enable_merge: # By default we use the logical merge unless the user has opted in diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index baac96f64a..eff458dc5d 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1614,6 +1614,7 @@ def insert( end=kwargs.get("end"), execution_time=kwargs.get("execution_time"), ), + physical_properties=kwargs.get("physical_properties", model.physical_properties), ) def append( @@ -1634,6 +1635,7 @@ def append( end=kwargs.get("end"), execution_time=kwargs.get("execution_time"), ), + physical_properties=kwargs.get("physical_properties", model.physical_properties), ) diff --git a/tests/core/engine_adapter/test_mssql.py b/tests/core/engine_adapter/test_mssql.py index eef0a320da..65f3231163 100644 --- a/tests/core/engine_adapter/test_mssql.py +++ b/tests/core/engine_adapter/test_mssql.py @@ -474,7 +474,7 @@ def test_merge_pandas( assert to_sql_calls(adapter) == [ f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""", - f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", ] @@ -498,11 +498,47 @@ def test_merge_pandas( assert to_sql_calls(adapter) == [ f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""", - f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ] + + +def test_merge_exists( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable +): + mocker.patch( + "sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists", + return_value=False, + ) + + adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) + + temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table") + table_name = "target" + temp_table_id = "abcdefgh" + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + + df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]}) + + # regular implementation + adapter.merge( + target_table=table_name, + source_table=df, + columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + "val": exp.DataType.build("int"), + }, + unique_key=[exp.to_identifier("id")], + ) + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", ] - # all model columns are keys + # merge exists implementation adapter.cursor.reset_mock() adapter._connection_pool.get().reset_mock() temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) @@ -512,11 +548,31 @@ def test_merge_pandas( columns_to_types={ "id": exp.DataType.build("int"), "ts": exp.DataType.build("TIMESTAMP"), + "val": exp.DataType.build("int"), }, - unique_key=[exp.to_identifier("id"), exp.to_column("ts")], + unique_key=[exp.to_identifier("id")], + physical_properties={"mssql_merge_exists": True}, ) - adapter._connection_pool.get().bulk_copy.assert_called_with( - f"__temp_target_{temp_table_id}", [(1, 1), (2, 2), (3, 3)] + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ] + + # merge exists and all model columns are keys + adapter.cursor.reset_mock() + adapter._connection_pool.get().reset_mock() + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + adapter.merge( + target_table=table_name, + source_table=df, + columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + }, + unique_key=[exp.to_identifier("id"), exp.to_column("ts")], + physical_properties={"mssql_merge_exists": True}, ) assert to_sql_calls(adapter) == [ diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index e4de741cdb..dace2d93ac 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -2231,6 +2231,7 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap ) ] ), + physical_properties={}, ) @@ -2327,6 +2328,7 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock, ), ], ), + physical_properties={}, ) @@ -2478,6 +2480,7 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh expression=exp.Literal(this="2020-01-01", is_string=True), ), ), + physical_properties={}, ) From 027fbb0fc80256ad97bc48b89c0ce91f735951a9 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Tue, 1 Jul 2025 17:42:05 -0500 Subject: [PATCH 2/2] Add docs --- docs/integrations/engines/mssql.md | 75 +++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/docs/integrations/engines/mssql.md b/docs/integrations/engines/mssql.md index f06b5f1387..4c68219dd2 100644 --- a/docs/integrations/engines/mssql.md +++ b/docs/integrations/engines/mssql.md @@ -1,34 +1,65 @@ # MSSQL -## Local/Built-in Scheduler -**Engine Adapter Type**: `mssql` +## Installation -### Installation -#### User / Password Authentication: +### User / Password Authentication: ``` pip install "sqlmesh[mssql]" ``` -#### Microsoft Entra ID / Azure Active Directory Authentication: +### Microsoft Entra ID / Azure Active Directory Authentication: ``` pip install "sqlmesh[mssql-odbc]" ``` +## Incremental by unique key `MERGE` + +SQLMesh executes a `MERGE` statement to insert rows for [incremental by unique key](../../concepts/models/model_kinds.md#incremental_by_unique_key) model kinds. + +By default, the `MERGE` statement updates all non-key columns of an existing row when a new row with the same key values is inserted. If all column values match between the two rows, those updates are unnecessary. + +SQLMesh provides an optional performance optimization that skips unnecessary updates by comparing column values with the `EXISTS` and `EXCEPT` operators. + +Enable the optimization by setting the `mssql_merge_exists` key to `true` in the [`physical_properties`](../../concepts/models/overview.md#physical_properties) section of the `MODEL` statement. + +For example: + +```sql linenums="1" hl_lines="7-9" +MODEL ( + name sqlmesh_example.unique_key, + kind INCREMENTAL_BY_UNIQUE_KEY ( + unique_key id + ), + cron '@daily', + physical_properties ( + mssql_merge_exists = true + ) +); +``` + +!!! warning "Not all column types supported" + The `mssql_merge_exists` optimization is not supported for all column types, including `GEOMETRY`, `XML`, `TEXT`, `NTEXT`, `IMAGE`, and most user-defined types. + + Learn more in the [MSSQL `EXCEPT` statement documentation](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/set-operators-except-and-intersect-transact-sql?view=sql-server-ver17#arguments). + +## Local/Built-in Scheduler +**Engine Adapter Type**: `mssql` + ### Connection options -| Option | Description | Type | Required | -| ----------------- | ------------------------------------------------------------ | :----------: | :------: | -| `type` | Engine type name - must be `mssql` | string | Y | -| `host` | The hostname of the MSSQL server | string | Y | -| `user` | The username / client id to use for authentication with the MSSQL server | string | N | -| `password` | The password / client secret to use for authentication with the MSSQL server | string | N | -| `port` | The port number of the MSSQL server | int | N | -| `database` | The target database | string | N | -| `charset` | The character set used for the connection | string | N | -| `timeout` | The query timeout in seconds. Default: no timeout | int | N | -| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | -| `appname` | The application name to use for the connection | string | N | -| `conn_properties` | The list of connection properties | list[string] | N | -| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | -| `driver` | The driver to use for the connection. Default: pymssql | string | N | -| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | -| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | \ No newline at end of file +| Option | Description | Type | Required | +| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------: | :------: | +| `type` | Engine type name - must be `mssql` | string | Y | +| `host` | The hostname of the MSSQL server | string | Y | +| `user` | The username / client id to use for authentication with the MSSQL server | string | N | +| `password` | The password / client secret to use for authentication with the MSSQL server | string | N | +| `port` | The port number of the MSSQL server | int | N | +| `database` | The target database | string | N | +| `charset` | The character set used for the connection | string | N | +| `timeout` | The query timeout in seconds. Default: no timeout | int | N | +| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | +| `appname` | The application name to use for the connection | string | N | +| `conn_properties` | The list of connection properties | list[string] | N | +| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | +| `driver` | The driver to use for the connection. Default: pymssql | string | N | +| `driver_name` | The driver name to use for the connection (e.g., *ODBC Driver 18 for SQL Server*). | string | N | +| `odbc_properties` | ODBC connection properties (e.g., *authentication: ActiveDirectoryServicePrincipal*). See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | \ No newline at end of file