-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathv0011_add_model_kind_name.py
More file actions
73 lines (58 loc) · 2.15 KB
/
v0011_add_model_kind_name.py
File metadata and controls
73 lines (58 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Add the kind_name column to the snapshots table."""
import json
from sqlglot import exp
from sqlmesh.utils.migration import index_text_type
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"
index_type = index_text_type(engine_adapter.dialect)
alter_table_exp = exp.Alter(
this=exp.to_table(snapshots_table),
kind="TABLE",
actions=[
exp.ColumnDef(
this=exp.to_column("kind_name"),
kind=exp.DataType.build(index_type),
)
],
)
engine_adapter.execute(alter_table_exp)
def migrate_rows(state_sync, **kwargs): # type: ignore
import pandas as pd
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"
index_type = index_text_type(engine_adapter.dialect)
new_snapshots = []
for name, identifier, version, snapshot in engine_adapter.fetchall(
exp.select("name", "identifier", "version", "snapshot").from_(snapshots_table),
quote_identifiers=True,
):
parsed_snapshot = json.loads(snapshot)
new_snapshots.append(
{
"name": name,
"identifier": identifier,
"version": version,
"snapshot": snapshot,
"kind_name": parsed_snapshot["model"]["kind"]["name"],
}
)
if new_snapshots:
engine_adapter.delete_from(snapshots_table, "TRUE")
engine_adapter.insert_append(
snapshots_table,
pd.DataFrame(new_snapshots),
target_columns_to_types={
"name": exp.DataType.build(index_type),
"identifier": exp.DataType.build(index_type),
"version": exp.DataType.build(index_type),
"snapshot": exp.DataType.build("text"),
"kind_name": exp.DataType.build(index_type),
},
)