-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathv0016_fix_windows_path.py
More file actions
63 lines (49 loc) · 1.92 KB
/
v0016_fix_windows_path.py
File metadata and controls
63 lines (49 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""Fix paths that have a Windows forward slash in them."""
import json
from sqlglot import exp
from sqlmesh.utils.migration import index_text_type
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass
def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"
new_snapshots = []
for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
quote_identifiers=True,
):
parsed_snapshot = json.loads(snapshot)
model = parsed_snapshot["model"]
python_env = model.get("python_env")
if python_env:
for py_definition in python_env.values():
path = py_definition.get("path")
if path:
py_definition["path"] = path.replace("\\", "/")
new_snapshots.append(
{
"name": name,
"identifier": identifier,
"version": version,
"snapshot": json.dumps(parsed_snapshot),
"kind_name": kind_name,
}
)
if new_snapshots:
engine_adapter.delete_from(snapshots_table, "TRUE")
index_type = index_text_type(engine_adapter.dialect)
engine_adapter.insert_append(
snapshots_table,
pd.DataFrame(new_snapshots),
target_columns_to_types={
"name": exp.DataType.build(index_type),
"identifier": exp.DataType.build(index_type),
"version": exp.DataType.build(index_type),
"snapshot": exp.DataType.build("text"),
"kind_name": exp.DataType.build(index_type),
},
)