Skip to content

Commit e55e343

Browse files
committed
Make MSSQL merge exists implementation opt-in
1 parent 649fdf0 commit e55e343

8 files changed

Lines changed: 82 additions & 10 deletions

File tree

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,7 @@ def merge(
19391939
unique_key: t.Sequence[exp.Expression],
19401940
when_matched: t.Optional[exp.Whens] = None,
19411941
merge_filter: t.Optional[exp.Expression] = None,
1942+
**kwargs: t.Any,
19421943
) -> None:
19431944
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
19441945
source_table, columns_to_types, target_table=target_table

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def merge(
3232
unique_key: t.Sequence[exp.Expression],
3333
when_matched: t.Optional[exp.Whens] = None,
3434
merge_filter: t.Optional[exp.Expression] = None,
35+
**kwargs: t.Any,
3536
) -> None:
3637
logical_merge(
3738
self,

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,10 @@ def merge(
198198
unique_key: t.Sequence[exp.Expression],
199199
when_matched: t.Optional[exp.Whens] = None,
200200
merge_filter: t.Optional[exp.Expression] = None,
201+
**kwargs: t.Any,
201202
) -> None:
203+
mssql_merge_exists = kwargs.get("physical_properties", {}).get("mssql_merge_exists")
204+
202205
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
203206
source_table, columns_to_types, target_table=target_table
204207
)
@@ -214,7 +217,6 @@ def merge(
214217

215218
match_expressions = []
216219
if not when_matched:
217-
match_condition = None
218220
unique_key_names = [y.name for y in unique_key]
219221
columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names]
220222

@@ -225,10 +227,14 @@ def merge(
225227
exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys
226228
]
227229

228-
match_condition = exp.Exists(
229-
this=exp.select(*target_columns_no_keys).except_(
230-
exp.select(*source_columns_no_keys)
230+
match_condition = (
231+
exp.Exists(
232+
this=exp.select(*target_columns_no_keys).except_(
233+
exp.select(*source_columns_no_keys)
234+
)
231235
)
236+
if mssql_merge_exists
237+
else None
232238
)
233239

234240
if target_columns_no_keys:
@@ -272,6 +278,7 @@ def merge(
272278
on=on,
273279
whens=exp.Whens(expressions=match_expressions),
274280
)
281+
return
275282

276283
def _convert_df_datetime(self, df: DF, columns_to_types: t.Dict[str, exp.DataType]) -> None:
277284
import pandas as pd

sqlmesh/core/engine_adapter/postgres.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def merge(
109109
unique_key: t.Sequence[exp.Expression],
110110
when_matched: t.Optional[exp.Whens] = None,
111111
merge_filter: t.Optional[exp.Expression] = None,
112+
**kwargs: t.Any,
112113
) -> None:
113114
# Merge isn't supported until Postgres 15
114115
merge_impl = (

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ def merge(
353353
unique_key: t.Sequence[exp.Expression],
354354
when_matched: t.Optional[exp.Whens] = None,
355355
merge_filter: t.Optional[exp.Expression] = None,
356+
**kwargs: t.Any,
356357
) -> None:
357358
if self.enable_merge:
358359
# By default we use the logical merge unless the user has opted in

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,6 +1614,7 @@ def insert(
16141614
end=kwargs.get("end"),
16151615
execution_time=kwargs.get("execution_time"),
16161616
),
1617+
physical_properties=kwargs.get("physical_properties", model.physical_properties),
16171618
)
16181619

16191620
def append(
@@ -1634,6 +1635,7 @@ def append(
16341635
end=kwargs.get("end"),
16351636
execution_time=kwargs.get("execution_time"),
16361637
),
1638+
physical_properties=kwargs.get("physical_properties", model.physical_properties),
16371639
)
16381640

16391641

tests/core/engine_adapter/test_mssql.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def test_merge_pandas(
474474

475475
assert to_sql_calls(adapter) == [
476476
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
477-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
477+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
478478
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
479479
]
480480

@@ -498,11 +498,47 @@ def test_merge_pandas(
498498

499499
assert to_sql_calls(adapter) == [
500500
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
501-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
501+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
502+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
503+
]
504+
505+
506+
def test_merge_exists(
507+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
508+
):
509+
mocker.patch(
510+
"sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists",
511+
return_value=False,
512+
)
513+
514+
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)
515+
516+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
517+
table_name = "target"
518+
temp_table_id = "abcdefgh"
519+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
520+
521+
df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]})
522+
523+
# regular implementation
524+
adapter.merge(
525+
target_table=table_name,
526+
source_table=df,
527+
columns_to_types={
528+
"id": exp.DataType.build("int"),
529+
"ts": exp.DataType.build("TIMESTAMP"),
530+
"val": exp.DataType.build("int"),
531+
},
532+
unique_key=[exp.to_identifier("id")],
533+
)
534+
535+
assert to_sql_calls(adapter) == [
536+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
537+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
502538
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
503539
]
504540

505-
# all model columns are keys
541+
# merge exists implementation
506542
adapter.cursor.reset_mock()
507543
adapter._connection_pool.get().reset_mock()
508544
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
@@ -512,11 +548,31 @@ def test_merge_pandas(
512548
columns_to_types={
513549
"id": exp.DataType.build("int"),
514550
"ts": exp.DataType.build("TIMESTAMP"),
551+
"val": exp.DataType.build("int"),
515552
},
516-
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
553+
unique_key=[exp.to_identifier("id")],
554+
physical_properties={"mssql_merge_exists": True},
517555
)
518-
adapter._connection_pool.get().bulk_copy.assert_called_with(
519-
f"__temp_target_{temp_table_id}", [(1, 1), (2, 2), (3, 3)]
556+
557+
assert to_sql_calls(adapter) == [
558+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
559+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
560+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
561+
]
562+
563+
# merge exists and all model columns are keys
564+
adapter.cursor.reset_mock()
565+
adapter._connection_pool.get().reset_mock()
566+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
567+
adapter.merge(
568+
target_table=table_name,
569+
source_table=df,
570+
columns_to_types={
571+
"id": exp.DataType.build("int"),
572+
"ts": exp.DataType.build("TIMESTAMP"),
573+
},
574+
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
575+
physical_properties={"mssql_merge_exists": True},
520576
)
521577

522578
assert to_sql_calls(adapter) == [

tests/core/test_snapshot_evaluator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2225,6 +2225,7 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
22252225
)
22262226
]
22272227
),
2228+
physical_properties={},
22282229
)
22292230

22302231

@@ -2307,6 +2308,7 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
23072308
),
23082309
],
23092310
),
2311+
physical_properties={},
23102312
)
23112313

23122314

@@ -2452,6 +2454,7 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24522454
expression=exp.Literal(this="2020-01-01", is_string=True),
24532455
),
24542456
),
2457+
physical_properties={},
24552458
)
24562459

24572460

0 commit comments

Comments
 (0)