-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathbuiltin.py
More file actions
140 lines (109 loc) · 5.04 KB
/
builtin.py
File metadata and controls
140 lines (109 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Contains all the standard rules included with SQLMesh"""
from __future__ import annotations
import typing as t
from sqlglot.expressions import Star
from sqlglot.helper import subclasses
from sqlmesh.core.linter.helpers import TokenPositionDetails, get_range_of_model_block
from sqlmesh.core.linter.rule import Rule, RuleViolation, Range, Fix, TextEdit
from sqlmesh.core.linter.definition import RuleSet
from sqlmesh.core.model import Model, SqlModel, ExternalModel
class NoSelectStar(Rule):
"""Query should not contain SELECT * on its outer most projections, even if it can be expanded."""
def check_model(self, model: Model) -> t.Optional[RuleViolation]:
# Only applies to SQL models, as other model types do not have a query.
if not isinstance(model, SqlModel):
return None
if model.query.is_star:
violation_range = self._get_range(model)
fixes = self._create_fixes(model, violation_range)
return self.violation(violation_range=violation_range, fixes=fixes)
return None
def _get_range(self, model: SqlModel) -> t.Optional[Range]:
"""Get the range of the violation if available."""
try:
if len(model.query.expressions) == 1 and isinstance(model.query.expressions[0], Star):
return TokenPositionDetails.from_meta(model.query.expressions[0].meta).to_range(
None
)
except Exception:
pass
return None
def _create_fixes(
self, model: SqlModel, violation_range: t.Optional[Range]
) -> t.Optional[t.List[Fix]]:
"""Create fixes for the SELECT * violation."""
if not violation_range:
return None
columns = model.columns_to_types
if not columns:
return None
new_text = ", ".join(columns.keys())
return [
Fix(
title="Replace SELECT * with explicit column list",
edits=[
TextEdit(
range=violation_range,
new_text=new_text,
)
],
)
]
class InvalidSelectStarExpansion(Rule):
def check_model(self, model: Model) -> t.Optional[RuleViolation]:
deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion)
if not deps:
return None
violation_msg = (
f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. "
"Run `sqlmesh create_external_models` and / or make sure that the model "
f"'{model.fqn}' can be rendered at parse time."
)
return self.violation(violation_msg)
class AmbiguousOrInvalidColumn(Rule):
def check_model(self, model: Model) -> t.Optional[RuleViolation]:
sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn)
if not sqlglot_err:
return None
violation_msg = (
f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous."
)
return self.violation(violation_msg)
class NoMissingAudits(Rule):
"""Model `audits` must be configured to test data quality."""
def check_model(self, model: Model) -> t.Optional[RuleViolation]:
if model.audits or model.kind.is_symbolic:
return None
if model._path is None or not str(model._path).endswith(".sql"):
return self.violation()
try:
with open(model._path, "r", encoding="utf-8") as file:
content = file.read()
range = get_range_of_model_block(content, model.dialect)
if range:
return self.violation(violation_range=range)
return self.violation()
except Exception:
return self.violation()
class NoMissingExternalModels(Rule):
"""All external models must be registered in the external_models.yaml file"""
def check_model(self, model: Model) -> t.Optional[RuleViolation]:
# Ignore external models themselves, because either they are registered,
# and if they are not, they will be caught as referenced in another model.
if isinstance(model, ExternalModel):
return None
# Handle other models that may refer to the external models.
not_registered_external_models: t.Set[str] = set()
for depends_on_model in model.depends_on:
existing_model = self.context.get_model(depends_on_model)
if existing_model is None:
not_registered_external_models.add(depends_on_model)
if not not_registered_external_models:
return None
return RuleViolation(
rule=self,
violation_msg=f"Model '{model.name}' depends on unregistered external models: "
f"{', '.join(m for m in not_registered_external_models)}. "
"Please register them in the external models file. This can be done by running 'sqlmesh create_external_models'.",
)
BUILTIN_RULES = RuleSet(subclasses(__name__, Rule, (Rule,)))