Skip to content

Commit 05c793c

Browse files
authored
Feat: Allow virtual environments to be given dedicated catalogs (#4742)
1 parent 17a7098 commit 05c793c

21 files changed

Lines changed: 489 additions & 20 deletions

File tree

docs/guides/configuration.md

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,9 @@ This only applies to the _physical tables_ that SQLMesh creates - the views are
244244

245245
SQLMesh stores `prod` environment views in the schema in a model's name - for example, the `prod` views for a model `my_schema.users` will be located in `my_schema`.
246246

247-
By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev`.
247+
By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev` as `my_schema__dev.users`.
248+
249+
##### Show at the table level instead
248250

249251
This behavior can be changed to append a suffix at the end of a _table/view_ name instead. Appending the suffix to a table/view name means that non-prod environment views will be created in the same schema as the `prod` environment. The prod and non-prod views are differentiated by non-prod view names ending with `__<env>`.
250252

@@ -260,7 +262,7 @@ Config example:
260262

261263
=== "Python"
262264

263-
The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE` or `EnvironmentSuffixTarget.SCHEMA` (default).
265+
The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default).
264266

265267
```python linenums="1"
266268
from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget
@@ -271,16 +273,58 @@ Config example:
271273
)
272274
```
273275

274-
The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses.
276+
!!! info "Default behavior"
277+
The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses.
278+
279+
##### Show at the catalog level instead
280+
281+
If neither the schema (default) nor the table level are sufficient for your use case, you can indicate the environment at the catalog level instead.
282+
283+
This can be useful if you have downstream BI reporting tools and you would like to point them at a development environment to test something out without renaming all the table / schema references within the report query.
284+
285+
In order to achieve this, you can configure [environment_suffix_target](../reference/configuration.md#environments) like so:
286+
287+
=== "YAML"
288+
289+
```yaml linenums="1"
290+
environment_suffix_target: catalog
291+
```
292+
293+
=== "Python"
294+
295+
The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default).
296+
297+
```python linenums="1"
298+
from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget
299+
300+
config = Config(
301+
model_defaults=ModelDefaultsConfig(dialect=<dialect>),
302+
environment_suffix_target=EnvironmentSuffixTarget.CATALOG,
303+
)
304+
```
305+
306+
Given the example of a model called `my_schema.users` with a default catalog of `warehouse` this will cause the following behavior:
307+
308+
- For the `prod` environment, the default catalog as configured in the gateway will be used. So the view will be created at `warehouse.my_schema.users`
309+
- For any other environment, eg `dev`, the environment name will be appended to the default catalog. So the view will be created at `warehouse__dev.my_schema.users`
310+
- If a model is fully qualified with a catalog already, eg `finance_mart.my_schema.users`, then the environment catalog will be based off the model catalog and not the default catalog. In this example, the view will be created at `finance_mart__dev.my_schema.users`
311+
312+
313+
!!! warning "Caveats"
314+
- Using `environment_suffix_target: catalog` only works on engines that support querying across different catalogs. If your engine does not support cross-catalog queries then you will need to use `environment_suffix_target: schema` or `environment_suffix_target: table` instead.
315+
- Automatic catalog creation is not supported on all engines even if they support cross-catalog queries. For engines where it is not supported, the catalogs must be managed externally from SQLMesh and exist prior to invoking SQLMesh.
275316

276317
#### Environment view catalogs
277318

278319
By default, SQLMesh creates an environment view in the same [catalog](../concepts/glossary.md#catalog) as the physical table the view points to. The physical table's catalog is determined by either the catalog specified in the model name or the default catalog defined in the connection.
279320

280-
Some companies fully segregate `prod` and non-prod environment objects by catalog. For example, they might have a "prod" catalog that contains all `prod` environment physical tables and views and a separate "dev" catalog that contains all `dev` environment physical tables and views.
321+
It can be desirable to create `prod` and non-prod virtual layer objects in separate catalogs instead. For example, there might be a "prod" catalog that contains all `prod` environment views and a separate "dev" catalog that contains all `dev` environment views.
281322

282323
Separate prod and non-prod catalogs can also be useful if you have a CI/CD pipeline that creates environments, like the [SQLMesh Github Actions CI/CD Bot](../integrations/github.md). You might want to store the CI/CD environment objects in a dedicated catalog since there can be many of them.
283324

325+
!!! info "Virtual layer only"
326+
Note that the following setting only affects the [virtual layer](../concepts/glossary.md#virtual-layer). If you need full segregation by catalog between environments in the [physical layer](../concepts/glossary.md#physical-layer) as well, see the [Isolated Systems Guide](../guides/isolated_systems.md).
327+
284328
To configure separate catalogs, provide a mapping from [regex patterns](https://en.wikipedia.org/wiki/Regular_expression) to catalog names. SQLMesh will compare the name of an environment to the regex patterns; when it finds a match it will store the environment's objects in the corresponding catalog.
285329

286330
SQLMesh evaluates the regex patterns in the order defined in the configuration; it uses the catalog for the first matching pattern. If no match is found, the catalog defined in the model or the default catalog defined on the connection will be used.
@@ -317,6 +361,9 @@ With the example configuration above, SQLMesh would evaluate environment names a
317361
* If the environment name starts with `dev`, the catalog will be `dev`.
318362
* If the environment name starts with `analytics_repo`, the catalog will be `cicd`.
319363

364+
!!! warning
365+
This feature is mutually exclusive with `environment_suffix_target: catalog` in order to prevent ambiguous mappings from being defined. Attempting to specify both `environment_catalog_mapping` and `environment_suffix_target: catalog` will raise an error on project load
366+
320367
*Note:* This feature is only available for engines that support querying across catalogs. At the time of writing, the following engines are **NOT** supported:
321368

322369
* [MySQL](../integrations/engines/mysql.md)

examples/sushi/config.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,17 @@
128128
)
129129

130130

131-
environment_suffix_config = Config(
131+
environment_suffix_table_config = Config(
132132
default_connection=DuckDBConnectionConfig(),
133133
model_defaults=model_defaults,
134134
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
135135
)
136136

137+
environment_suffix_catalog_config = environment_suffix_table_config.model_copy(
138+
update={
139+
"environment_suffix_target": EnvironmentSuffixTarget.CATALOG,
140+
}
141+
)
137142

138143
CATALOGS = {
139144
"in_memory": ":memory:",

sqlmesh/core/config/common.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,22 @@
1010

1111

1212
class EnvironmentSuffixTarget(str, Enum):
13+
# Intended to create virtual environments in their own schemas, with names like "<model_schema_name>__<env name>". The view name is untouched.
14+
# For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev'
15+
# would have its virtual layer view created as 'sqlmesh_example__dev.full_model'
1316
SCHEMA = "schema"
17+
18+
# Intended to create virtual environments in the same schema as their production counterparts by adjusting the table name.
19+
# For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev'
20+
# would have its virtual layer view created as "sqlmesh_example.full_model__dev"
1421
TABLE = "table"
1522

23+
# Intended to create virtual environments in their own catalogs to preserve the schema and view name of the models
24+
# For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev'
25+
# with a default catalog of "warehouse" would have its virtual layer view created as "warehouse__dev.sqlmesh_example.full_model"
26+
# note: this only works for engines that can query across catalogs
27+
CATALOG = "catalog"
28+
1629
@property
1730
def is_schema(self) -> bool:
1831
return self == EnvironmentSuffixTarget.SCHEMA
@@ -21,6 +34,10 @@ def is_schema(self) -> bool:
2134
def is_table(self) -> bool:
2235
return self == EnvironmentSuffixTarget.TABLE
2336

37+
@property
38+
def is_catalog(self) -> bool:
39+
return self == EnvironmentSuffixTarget.CATALOG
40+
2441
@classproperty
2542
def default(cls) -> EnvironmentSuffixTarget:
2643
return EnvironmentSuffixTarget.SCHEMA

sqlmesh/core/config/root.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,15 @@ def _normalize_identifiers(key: str) -> None:
242242
},
243243
)
244244

245+
if (
246+
self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG
247+
and self.environment_catalog_mapping
248+
):
249+
raise ConfigError(
250+
f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n"
251+
"Please specify one or the other"
252+
)
253+
245254
if self.environment_catalog_mapping:
246255
_normalize_identifiers("environment_catalog_mapping")
247256
if self.physical_schema_mapping:

sqlmesh/core/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pathlib import Path
88

99
SQLMESH = "sqlmesh"
10+
SQLMESH_MANAGED = "sqlmesh_managed"
1011
SQLMESH_PATH = Path.home() / ".sqlmesh"
1112

1213
PROD = "prod"

sqlmesh/core/engine_adapter/base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class EngineAdapter:
101101
SUPPORTS_VIEW_SCHEMA = True
102102
SUPPORTS_CLONING = False
103103
SUPPORTS_MANAGED_MODELS = False
104+
SUPPORTS_CREATE_DROP_CATALOG = False
104105
SCHEMA_DIFFER = SchemaDiffer()
105106
SUPPORTS_TUPLE_IN = True
106107
HAS_VIEW_BINDING = False
@@ -1217,6 +1218,22 @@ def drop_view(
12171218
**kwargs,
12181219
)
12191220

1221+
def create_catalog(self, catalog_name: str | exp.Identifier) -> None:
1222+
return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect))
1223+
1224+
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
1225+
raise SQLMeshError(
1226+
f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine."
1227+
)
1228+
1229+
def drop_catalog(self, catalog_name: str | exp.Identifier) -> None:
1230+
return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect))
1231+
1232+
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
1233+
raise SQLMeshError(
1234+
f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine."
1235+
)
1236+
12201237
def columns(
12211238
self, table_name: TableName, include_pseudo_columns: bool = False
12221239
) -> t.Dict[str, exp.DataType]:

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import typing as t
44
from sqlglot import exp
5+
from pathlib import Path
56

67
from sqlmesh.core.engine_adapter.mixins import (
78
GetCurrentCatalogFromFunctionMixin,
@@ -35,6 +36,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
3536
)
3637
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
3738
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
39+
SUPPORTS_CREATE_DROP_CATALOG = True
3840

3941
@property
4042
def catalog_support(self) -> CatalogSupport:
@@ -44,6 +46,18 @@ def set_current_catalog(self, catalog: str) -> None:
4446
"""Sets the catalog name of the current connection."""
4547
self.execute(exp.Use(this=exp.to_identifier(catalog)))
4648

49+
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
50+
db_filename = f"{catalog_name.output_name}.db"
51+
self.execute(
52+
exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True)
53+
)
54+
55+
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
56+
db_file_path = Path(f"{catalog_name.output_name}.db")
57+
self.execute(exp.Detach(this=catalog_name, exists=True))
58+
if db_file_path.exists():
59+
db_file_path.unlink()
60+
4761
def _df_to_source_queries(
4862
self,
4963
df: DF,

sqlmesh/core/engine_adapter/shared.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,16 @@ def is_clustered(self) -> bool:
173173

174174

175175
class CatalogSupport(Enum):
176+
# The engine has no concept of catalogs
176177
UNSUPPORTED = 1
178+
179+
# The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables
177180
SINGLE_CATALOG_ONLY = 2
181+
182+
# The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding
178183
REQUIRES_SET_CATALOG = 3
184+
185+
# The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first)
179186
FULL_SUPPORT = 4
180187

181188
@property

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1010
from sqlglot.optimizer.qualify_columns import quote_identifiers
1111

12+
import sqlmesh.core.constants as c
1213
from sqlmesh.core.dialect import to_schema
1314
from sqlmesh.core.engine_adapter.mixins import (
1415
GetCurrentCatalogFromFunctionMixin,
@@ -43,6 +44,7 @@
4344
"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG,
4445
"create_schema": CatalogSupport.REQUIRES_SET_CATALOG,
4546
"drop_schema": CatalogSupport.REQUIRES_SET_CATALOG,
47+
"drop_catalog": CatalogSupport.REQUIRES_SET_CATALOG, # needs a catalog to issue a query to information_schema.databases even though the result is global
4648
}
4749
)
4850
class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixin, RowDiffMixin):
@@ -52,6 +54,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi
5254
SUPPORTS_CLONING = True
5355
SUPPORTS_MANAGED_MODELS = True
5456
CURRENT_CATALOG_EXPRESSION = exp.func("current_database")
57+
SUPPORTS_CREATE_DROP_CATALOG = True
5558
SCHEMA_DIFFER = SchemaDiffer(
5659
parameterized_type_defaults={
5760
exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)],
@@ -123,6 +126,36 @@ def snowpark(self) -> t.Optional[SnowparkSession]:
123126
def catalog_support(self) -> CatalogSupport:
124127
return CatalogSupport.FULL_SUPPORT
125128

129+
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
130+
props = exp.Properties(
131+
expressions=[exp.SchemaCommentProperty(this=exp.Literal.string(c.SQLMESH_MANAGED))]
132+
)
133+
self.execute(
134+
exp.Create(
135+
this=exp.Table(this=catalog_name), kind="DATABASE", exists=True, properties=props
136+
)
137+
)
138+
139+
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
140+
# only drop the catalog if it was created by SQLMesh, which is indicated by its comment matching {c.SQLMESH_MANAGED}
141+
exists_check = (
142+
exp.select(exp.Literal.number(1))
143+
.from_(exp.to_table("information_schema.databases"))
144+
.where(
145+
exp.and_(
146+
exp.column("database_name").eq(exp.Literal.string(catalog_name)),
147+
exp.column("comment").eq(exp.Literal.string(c.SQLMESH_MANAGED)),
148+
)
149+
)
150+
)
151+
normalize_identifiers(exists_check, dialect=self.dialect)
152+
if self.fetchone(exists_check, quote_identifiers=True) is not None:
153+
self.execute(exp.Drop(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True))
154+
else:
155+
logger.warning(
156+
f"Not dropping database {catalog_name.sql(dialect=self.dialect)} because there is no indication it is '{c.SQLMESH_MANAGED}'"
157+
)
158+
126159
def _create_table(
127160
self,
128161
table_name_or_schema: t.Union[exp.Schema, TableName],

sqlmesh/core/environment.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ class EnvironmentNamingInfo(PydanticModel):
4343
normalize_name: bool = True
4444
gateway_managed: bool = False
4545

46+
@property
47+
def is_dev(self) -> bool:
48+
return self.name.lower() != c.PROD
49+
4650
@field_validator("name", mode="before")
4751
@classmethod
4852
def _sanitize_name(cls, v: str) -> str:

0 commit comments

Comments
 (0)