|
6 | 6 |
|
7 | 7 | from sqlglot import exp |
8 | 8 |
|
9 | | -from sqlmesh.core.dialect import to_schema, add_table |
| 9 | +from sqlmesh.core.dialect import to_schema |
10 | 10 | from sqlmesh.core.engine_adapter.base import ( |
11 | 11 | EngineAdapterWithIndexSupport, |
12 | 12 | EngineAdapter, |
13 | 13 | InsertOverwriteStrategy, |
14 | | - MERGE_SOURCE_ALIAS, |
15 | | - MERGE_TARGET_ALIAS, |
16 | 14 | ) |
17 | 15 | from sqlmesh.core.engine_adapter.mixins import ( |
18 | 16 | GetCurrentCatalogFromFunctionMixin, |
|
34 | 32 |
|
35 | 33 | if t.TYPE_CHECKING: |
36 | 34 | from sqlmesh.core._typing import SchemaName, TableName |
37 | | - from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF |
| 35 | + from sqlmesh.core.engine_adapter._typing import DF, Query |
38 | 36 |
|
39 | 37 |
|
40 | 38 | @set_catalog() |
@@ -190,88 +188,7 @@ def drop_schema( |
190 | 188 | ) |
191 | 189 | super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False) |
192 | 190 |
|
193 | | - def merge( |
194 | | - self, |
195 | | - target_table: TableName, |
196 | | - source_table: QueryOrDF, |
197 | | - columns_to_types: t.Optional[t.Dict[str, exp.DataType]], |
198 | | - unique_key: t.Sequence[exp.Expression], |
199 | | - when_matched: t.Optional[exp.Whens] = None, |
200 | | - merge_filter: t.Optional[exp.Expression] = None, |
201 | | - ) -> None: |
202 | | - source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( |
203 | | - source_table, columns_to_types, target_table=target_table |
204 | | - ) |
205 | | - columns_to_types = columns_to_types or self.columns(target_table) |
206 | | - on = exp.and_( |
207 | | - *( |
208 | | - add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS)) |
209 | | - for part in unique_key |
210 | | - ) |
211 | | - ) |
212 | | - if merge_filter: |
213 | | - on = exp.and_(merge_filter, on) |
214 | | - |
215 | | - match_expressions = [] |
216 | | - if not when_matched: |
217 | | - match_condition = None |
218 | | - unique_key_names = [y.name for y in unique_key] |
219 | | - columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names] |
220 | | - |
221 | | - target_columns_no_keys = [ |
222 | | - exp.column(c, MERGE_TARGET_ALIAS) for c in columns_to_types_no_keys |
223 | | - ] |
224 | | - source_columns_no_keys = [ |
225 | | - exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys |
226 | | - ] |
227 | | - |
228 | | - match_condition = exp.Exists( |
229 | | - this=exp.select(*target_columns_no_keys).except_( |
230 | | - exp.select(*source_columns_no_keys) |
231 | | - ) |
232 | | - ) |
233 | 191 |
|
234 | | - if target_columns_no_keys: |
235 | | - match_expressions.append( |
236 | | - exp.When( |
237 | | - matched=True, |
238 | | - source=False, |
239 | | - condition=match_condition, |
240 | | - then=exp.Update( |
241 | | - expressions=[ |
242 | | - exp.column(col, MERGE_TARGET_ALIAS).eq( |
243 | | - exp.column(col, MERGE_SOURCE_ALIAS) |
244 | | - ) |
245 | | - for col in columns_to_types_no_keys |
246 | | - ], |
247 | | - ), |
248 | | - ) |
249 | | - ) |
250 | | - else: |
251 | | - match_expressions.extend(when_matched.copy().expressions) |
252 | | - |
253 | | - match_expressions.append( |
254 | | - exp.When( |
255 | | - matched=False, |
256 | | - source=False, |
257 | | - then=exp.Insert( |
258 | | - this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]), |
259 | | - expression=exp.Tuple( |
260 | | - expressions=[ |
261 | | - exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types |
262 | | - ] |
263 | | - ), |
264 | | - ), |
265 | | - ) |
266 | | - ) |
267 | | - for source_query in source_queries: |
268 | | - with source_query as query: |
269 | | - self._merge( |
270 | | - target_table=target_table, |
271 | | - query=query, |
272 | | - on=on, |
273 | | - whens=exp.Whens(expressions=match_expressions), |
274 | | - ) |
275 | 192 |
|
276 | 193 | def _convert_df_datetime(self, df: DF, columns_to_types: t.Dict[str, exp.DataType]) -> None: |
277 | 194 | import pandas as pd |
|
0 commit comments