Skip to content

Commit 31f1e5c

Browse files
committed
Make MSSQL merge exists implementation opt-in
1 parent 649fdf0 commit 31f1e5c

8 files changed

Lines changed: 152 additions & 70 deletions

File tree

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,7 @@ def merge(
19391939
unique_key: t.Sequence[exp.Expression],
19401940
when_matched: t.Optional[exp.Whens] = None,
19411941
merge_filter: t.Optional[exp.Expression] = None,
1942+
**kwargs: t.Any,
19421943
) -> None:
19431944
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
19441945
source_table, columns_to_types, target_table=target_table

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def merge(
3232
unique_key: t.Sequence[exp.Expression],
3333
when_matched: t.Optional[exp.Whens] = None,
3434
merge_filter: t.Optional[exp.Expression] = None,
35+
**kwargs: t.Any,
3536
) -> None:
3637
logical_merge(
3738
self,

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 81 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -198,80 +198,97 @@ def merge(
198198
unique_key: t.Sequence[exp.Expression],
199199
when_matched: t.Optional[exp.Whens] = None,
200200
merge_filter: t.Optional[exp.Expression] = None,
201+
**kwargs: t.Any,
201202
) -> None:
202-
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
203-
source_table, columns_to_types, target_table=target_table
204-
)
205-
columns_to_types = columns_to_types or self.columns(target_table)
206-
on = exp.and_(
207-
*(
208-
add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS))
209-
for part in unique_key
203+
mssql_merge_exists = kwargs.get("physical_properties", {}).get("mssql_merge_exists")
204+
205+
if mssql_merge_exists:
206+
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
207+
source_table, columns_to_types, target_table=target_table
210208
)
211-
)
212-
if merge_filter:
213-
on = exp.and_(merge_filter, on)
214-
215-
match_expressions = []
216-
if not when_matched:
217-
match_condition = None
218-
unique_key_names = [y.name for y in unique_key]
219-
columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names]
220-
221-
target_columns_no_keys = [
222-
exp.column(c, MERGE_TARGET_ALIAS) for c in columns_to_types_no_keys
223-
]
224-
source_columns_no_keys = [
225-
exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys
226-
]
227-
228-
match_condition = exp.Exists(
229-
this=exp.select(*target_columns_no_keys).except_(
230-
exp.select(*source_columns_no_keys)
209+
columns_to_types = columns_to_types or self.columns(target_table)
210+
on = exp.and_(
211+
*(
212+
add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS))
213+
for part in unique_key
231214
)
232215
)
216+
if merge_filter:
217+
on = exp.and_(merge_filter, on)
218+
219+
match_expressions = []
220+
if not when_matched:
221+
match_condition = None
222+
unique_key_names = [y.name for y in unique_key]
223+
columns_to_types_no_keys = [
224+
c for c in columns_to_types if c not in unique_key_names
225+
]
226+
227+
target_columns_no_keys = [
228+
exp.column(c, MERGE_TARGET_ALIAS) for c in columns_to_types_no_keys
229+
]
230+
source_columns_no_keys = [
231+
exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys
232+
]
233+
234+
match_condition = exp.Exists(
235+
this=exp.select(*target_columns_no_keys).except_(
236+
exp.select(*source_columns_no_keys)
237+
)
238+
)
233239

234-
if target_columns_no_keys:
235-
match_expressions.append(
236-
exp.When(
237-
matched=True,
238-
source=False,
239-
condition=match_condition,
240-
then=exp.Update(
240+
if target_columns_no_keys:
241+
match_expressions.append(
242+
exp.When(
243+
matched=True,
244+
source=False,
245+
condition=match_condition,
246+
then=exp.Update(
247+
expressions=[
248+
exp.column(col, MERGE_TARGET_ALIAS).eq(
249+
exp.column(col, MERGE_SOURCE_ALIAS)
250+
)
251+
for col in columns_to_types_no_keys
252+
],
253+
),
254+
)
255+
)
256+
else:
257+
match_expressions.extend(when_matched.copy().expressions)
258+
259+
match_expressions.append(
260+
exp.When(
261+
matched=False,
262+
source=False,
263+
then=exp.Insert(
264+
this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]),
265+
expression=exp.Tuple(
241266
expressions=[
242-
exp.column(col, MERGE_TARGET_ALIAS).eq(
243-
exp.column(col, MERGE_SOURCE_ALIAS)
244-
)
245-
for col in columns_to_types_no_keys
246-
],
267+
exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types
268+
]
247269
),
248-
)
249-
)
250-
else:
251-
match_expressions.extend(when_matched.copy().expressions)
252-
253-
match_expressions.append(
254-
exp.When(
255-
matched=False,
256-
source=False,
257-
then=exp.Insert(
258-
this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]),
259-
expression=exp.Tuple(
260-
expressions=[
261-
exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types
262-
]
263270
),
264-
),
271+
)
265272
)
273+
for source_query in source_queries:
274+
with source_query as query:
275+
self._merge(
276+
target_table=target_table,
277+
query=query,
278+
on=on,
279+
whens=exp.Whens(expressions=match_expressions),
280+
)
281+
return
282+
283+
super().merge(
284+
target_table,
285+
source_table,
286+
columns_to_types,
287+
unique_key,
288+
when_matched,
289+
merge_filter,
290+
**kwargs,
266291
)
267-
for source_query in source_queries:
268-
with source_query as query:
269-
self._merge(
270-
target_table=target_table,
271-
query=query,
272-
on=on,
273-
whens=exp.Whens(expressions=match_expressions),
274-
)
275292

276293
def _convert_df_datetime(self, df: DF, columns_to_types: t.Dict[str, exp.DataType]) -> None:
277294
import pandas as pd

sqlmesh/core/engine_adapter/postgres.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def merge(
109109
unique_key: t.Sequence[exp.Expression],
110110
when_matched: t.Optional[exp.Whens] = None,
111111
merge_filter: t.Optional[exp.Expression] = None,
112+
**kwargs: t.Any,
112113
) -> None:
113114
# Merge isn't supported until Postgres 15
114115
merge_impl = (

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ def merge(
353353
unique_key: t.Sequence[exp.Expression],
354354
when_matched: t.Optional[exp.Whens] = None,
355355
merge_filter: t.Optional[exp.Expression] = None,
356+
**kwargs: t.Any,
356357
) -> None:
357358
if self.enable_merge:
358359
# By default we use the logical merge unless the user has opted in

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,6 +1614,7 @@ def insert(
16141614
end=kwargs.get("end"),
16151615
execution_time=kwargs.get("execution_time"),
16161616
),
1617+
physical_properties=kwargs.get("physical_properties", model.physical_properties),
16171618
)
16181619

16191620
def append(
@@ -1634,6 +1635,7 @@ def append(
16341635
end=kwargs.get("end"),
16351636
execution_time=kwargs.get("execution_time"),
16361637
),
1638+
physical_properties=kwargs.get("physical_properties", model.physical_properties),
16371639
)
16381640

16391641

tests/core/engine_adapter/test_mssql.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def test_merge_pandas(
474474

475475
assert to_sql_calls(adapter) == [
476476
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
477-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
477+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id], [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
478478
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
479479
]
480480

@@ -498,11 +498,47 @@ def test_merge_pandas(
498498

499499
assert to_sql_calls(adapter) == [
500500
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
501-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
501+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id], [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
502+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
503+
]
504+
505+
506+
def test_merge_exists(
507+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable
508+
):
509+
mocker.patch(
510+
"sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists",
511+
return_value=False,
512+
)
513+
514+
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)
515+
516+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
517+
table_name = "target"
518+
temp_table_id = "abcdefgh"
519+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
520+
521+
df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]})
522+
523+
# regular implementation
524+
adapter.merge(
525+
target_table=table_name,
526+
source_table=df,
527+
columns_to_types={
528+
"id": exp.DataType.build("int"),
529+
"ts": exp.DataType.build("TIMESTAMP"),
530+
"val": exp.DataType.build("int"),
531+
},
532+
unique_key=[exp.to_identifier("id")],
533+
)
534+
535+
assert to_sql_calls(adapter) == [
536+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
537+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id], [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
502538
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
503539
]
504540

505-
# all model columns are keys
541+
# merge exists implementation
506542
adapter.cursor.reset_mock()
507543
adapter._connection_pool.get().reset_mock()
508544
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
@@ -512,11 +548,31 @@ def test_merge_pandas(
512548
columns_to_types={
513549
"id": exp.DataType.build("int"),
514550
"ts": exp.DataType.build("TIMESTAMP"),
551+
"val": exp.DataType.build("int"),
515552
},
516-
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
553+
unique_key=[exp.to_identifier("id")],
554+
physical_properties={"mssql_merge_exists": True},
517555
)
518-
adapter._connection_pool.get().bulk_copy.assert_called_with(
519-
f"__temp_target_{temp_table_id}", [(1, 1), (2, 2), (3, 3)]
556+
557+
assert to_sql_calls(adapter) == [
558+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
559+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
560+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
561+
]
562+
563+
# merge exists and all model columns are keys
564+
adapter.cursor.reset_mock()
565+
adapter._connection_pool.get().reset_mock()
566+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
567+
adapter.merge(
568+
target_table=table_name,
569+
source_table=df,
570+
columns_to_types={
571+
"id": exp.DataType.build("int"),
572+
"ts": exp.DataType.build("TIMESTAMP"),
573+
},
574+
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
575+
physical_properties={"mssql_merge_exists": True},
520576
)
521577

522578
assert to_sql_calls(adapter) == [

tests/core/test_snapshot_evaluator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2225,6 +2225,7 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
22252225
)
22262226
]
22272227
),
2228+
physical_properties={},
22282229
)
22292230

22302231

@@ -2307,6 +2308,7 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
23072308
),
23082309
],
23092310
),
2311+
physical_properties={},
23102312
)
23112313

23122314

@@ -2452,6 +2454,7 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24522454
expression=exp.Literal(this="2020-01-01", is_string=True),
24532455
),
24542456
),
2457+
physical_properties={},
24552458
)
24562459

24572460

0 commit comments

Comments
 (0)