-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathv0012_update_jinja_expressions.py
More file actions
90 lines (66 loc) · 2.64 KB
/
v0012_update_jinja_expressions.py
File metadata and controls
90 lines (66 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Fix expressions that contain jinja."""
import json
import typing as t
from sqlglot import exp
from sqlmesh.utils.jinja import has_jinja
from sqlmesh.utils.migration import index_text_type
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass
def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"
new_snapshots = []
for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
quote_identifiers=True,
):
parsed_snapshot = json.loads(snapshot)
audits = parsed_snapshot.get("audits", [])
model = parsed_snapshot["model"]
if "query" in model and has_jinja(model["query"]):
model["query"] = _wrap_query(model["query"])
_wrap_statements(model, "pre_statements")
_wrap_statements(model, "post_statements")
for audit in audits:
if has_jinja(audit["query"]):
audit["query"] = _wrap_query(audit["query"])
_wrap_statements(audit, "expressions")
new_snapshots.append(
{
"name": name,
"identifier": identifier,
"version": version,
"snapshot": json.dumps(parsed_snapshot),
"kind_name": kind_name,
}
)
if new_snapshots:
engine_adapter.delete_from(snapshots_table, "TRUE")
index_type = index_text_type(engine_adapter.dialect)
engine_adapter.insert_append(
snapshots_table,
pd.DataFrame(new_snapshots),
target_columns_to_types={
"name": exp.DataType.build(index_type),
"identifier": exp.DataType.build(index_type),
"version": exp.DataType.build(index_type),
"snapshot": exp.DataType.build("text"),
"kind_name": exp.DataType.build(index_type),
},
)
def _wrap_statements(obj: t.Dict, key: str) -> None:
updated_statements = []
for statement in obj.get(key, []):
if has_jinja(statement):
statement = _wrap_statement(statement)
updated_statements.append(statement)
if updated_statements:
obj[key] = updated_statements
def _wrap_query(sql: str) -> str:
return f"JINJA_QUERY_BEGIN;\n{sql}\nJINJA_END;"
def _wrap_statement(sql: str) -> str:
return f"JINJA_STATEMENT_BEGIN;\n{sql}\nJINJA_END;"