Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions docs/guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ This only applies to the _physical tables_ that SQLMesh creates - the views are

SQLMesh stores `prod` environment views in the schema in a model's name - for example, the `prod` views for a model `my_schema.users` will be located in `my_schema`.

By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev`.
By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev` as `my_schema__dev.users`.

##### Show at the table level instead

This behavior can be changed to append a suffix at the end of a _table/view_ name instead. Appending the suffix to a table/view name means that non-prod environment views will be created in the same schema as the `prod` environment. The prod and non-prod views are differentiated by non-prod view names ending with `__<env>`.

Expand All @@ -260,7 +262,7 @@ Config example:

=== "Python"

The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE` or `EnvironmentSuffixTarget.SCHEMA` (default).
The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default).

```python linenums="1"
from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget
Expand All @@ -271,16 +273,58 @@ Config example:
)
```

The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses.
!!! info "Default behavior"
The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses.

##### Show at the catalog level instead

If neither the schema (default) nor the table level are sufficient for your use case, you can indicate the environment at the catalog level instead.

This can be useful if you have downstream BI reporting tools and you would like to point them at a development environment to test something out without renaming all the table / schema references within the report query.

In order to achieve this, you can configure [environment_suffix_target](../reference/configuration.md#environments) like so:

=== "YAML"

```yaml linenums="1"
environment_suffix_target: catalog
```

=== "Python"

The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default).

```python linenums="1"
from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget

config = Config(
model_defaults=ModelDefaultsConfig(dialect=<dialect>),
environment_suffix_target=EnvironmentSuffixTarget.CATALOG,
)
```

Given the example of a model called `my_schema.users` with a default catalog of `warehouse` this will cause the following behavior:

- For the `prod` environment, the default catalog as configured in the gateway will be used. So the view will be created at `warehouse.my_schema.users`
- For any other environment, eg `dev`, the environment name will be appended to the default catalog. So the view will be created at `warehouse__dev.my_schema.users`
- If a model is fully qualified with a catalog already, eg `finance_mart.my_schema.users`, then the environment catalog will be based off the model catalog and not the default catalog. In this example, the view will be created at `finance_mart__dev.my_schema.users`


!!! warning "Caveats"
- Using `environment_suffix_target: catalog` only works on engines that support querying across different catalogs. If your engine does not support cross-catalog queries then you will need to use `environment_suffix_target: schema` or `environment_suffix_target: table` instead.
- Automatic catalog creation is not supported on all engines even if they support cross-catalog queries. For engines where it is not supported, the catalogs must be managed externally from SQLMesh and exist prior to invoking SQLMesh.

#### Environment view catalogs

By default, SQLMesh creates an environment view in the same [catalog](../concepts/glossary.md#catalog) as the physical table the view points to. The physical table's catalog is determined by either the catalog specified in the model name or the default catalog defined in the connection.

Some companies fully segregate `prod` and non-prod environment objects by catalog. For example, they might have a "prod" catalog that contains all `prod` environment physical tables and views and a separate "dev" catalog that contains all `dev` environment physical tables and views.
It can be desirable to create `prod` and non-prod virtual layer objects in separate catalogs instead. For example, there might be a "prod" catalog that contains all `prod` environment views and a separate "dev" catalog that contains all `dev` environment views.

Separate prod and non-prod catalogs can also be useful if you have a CI/CD pipeline that creates environments, like the [SQLMesh Github Actions CI/CD Bot](../integrations/github.md). You might want to store the CI/CD environment objects in a dedicated catalog since there can be many of them.

!!! info "Virtual layer only"
Note that the following setting only affects the [virtual layer](../concepts/glossary.md#virtual-layer). If you need full segregation by catalog between environments in the [physical layer](../concepts/glossary.md#physical-layer) as well, see the [Isolated Systems Guide](../guides/isolated_systems.md).

To configure separate catalogs, provide a mapping from [regex patterns](https://en.wikipedia.org/wiki/Regular_expression) to catalog names. SQLMesh will compare the name of an environment to the regex patterns; when it finds a match it will store the environment's objects in the corresponding catalog.

SQLMesh evaluates the regex patterns in the order defined in the configuration; it uses the catalog for the first matching pattern. If no match is found, the catalog defined in the model or the default catalog defined on the connection will be used.
Expand Down Expand Up @@ -317,6 +361,9 @@ With the example configuration above, SQLMesh would evaluate environment names a
* If the environment name starts with `dev`, the catalog will be `dev`.
* If the environment name starts with `analytics_repo`, the catalog will be `cicd`.

!!! warning
This feature is mutually exclusive with `environment_suffix_target: catalog` in order to prevent ambiguous mappings from being defined. Attempting to specify both `environment_catalog_mapping` and `environment_suffix_target: catalog` will raise an error on project load

*Note:* This feature is only available for engines that support querying across catalogs. At the time of writing, the following engines are **NOT** supported:

* [MySQL](../integrations/engines/mysql.md)
Expand Down
7 changes: 6 additions & 1 deletion examples/sushi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,17 @@
)


environment_suffix_config = Config(
environment_suffix_table_config = Config(
default_connection=DuckDBConnectionConfig(),
model_defaults=model_defaults,
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
)

environment_suffix_catalog_config = environment_suffix_table_config.model_copy(
update={
"environment_suffix_target": EnvironmentSuffixTarget.CATALOG,
}
)

CATALOGS = {
"in_memory": ":memory:",
Expand Down
17 changes: 17 additions & 0 deletions sqlmesh/core/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,22 @@


class EnvironmentSuffixTarget(str, Enum):
# Intended to create virtual environments in their own schemas, with names like "<model_schema_name>__<env name>". The view name is untouched.
# For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev'
# would have its virtual layer view created as 'sqlmesh_example__dev.full_model'
SCHEMA = "schema"

# Intended to create virtual environments in the same schema as their production counterparts by adjusting the table name.
# For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev'
# would have its virtual layer view created as "sqlmesh_example.full_model__dev"
TABLE = "table"

# Intended to create virtual environments in their own catalogs to preserve the schema and view name of the models
# For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev'
# with a default catalog of "warehouse" would have its virtual layer view created as "warehouse__dev.sqlmesh_example.full_model"
# note: this only works for engines that can query across catalogs
CATALOG = "catalog"

@property
def is_schema(self) -> bool:
return self == EnvironmentSuffixTarget.SCHEMA
Expand All @@ -21,6 +34,10 @@ def is_schema(self) -> bool:
def is_table(self) -> bool:
return self == EnvironmentSuffixTarget.TABLE

@property
def is_catalog(self) -> bool:
return self == EnvironmentSuffixTarget.CATALOG

@classproperty
def default(cls) -> EnvironmentSuffixTarget:
return EnvironmentSuffixTarget.SCHEMA
Expand Down
9 changes: 9 additions & 0 deletions sqlmesh/core/config/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ def _normalize_identifiers(key: str) -> None:
},
)

if (
self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG
and self.environment_catalog_mapping
):
raise ConfigError(
f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n"
"Please specify one or the other"
)

if self.environment_catalog_mapping:
_normalize_identifiers("environment_catalog_mapping")
if self.physical_schema_mapping:
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path

SQLMESH = "sqlmesh"
SQLMESH_MANAGED = "sqlmesh_managed"
SQLMESH_PATH = Path.home() / ".sqlmesh"

PROD = "prod"
Expand Down
17 changes: 17 additions & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class EngineAdapter:
SUPPORTS_VIEW_SCHEMA = True
SUPPORTS_CLONING = False
SUPPORTS_MANAGED_MODELS = False
SUPPORTS_CREATE_DROP_CATALOG = False
SCHEMA_DIFFER = SchemaDiffer()
SUPPORTS_TUPLE_IN = True
HAS_VIEW_BINDING = False
Expand Down Expand Up @@ -1217,6 +1218,22 @@ def drop_view(
**kwargs,
)

def create_catalog(self, catalog_name: str | exp.Identifier) -> None:
return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect))

def _create_catalog(self, catalog_name: exp.Identifier) -> None:
raise SQLMeshError(
f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine."
)

def drop_catalog(self, catalog_name: str | exp.Identifier) -> None:
return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect))

def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
raise SQLMeshError(
f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine."
)

def columns(
self, table_name: TableName, include_pseudo_columns: bool = False
) -> t.Dict[str, exp.DataType]:
Expand Down
14 changes: 14 additions & 0 deletions sqlmesh/core/engine_adapter/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import typing as t
from sqlglot import exp
from pathlib import Path

from sqlmesh.core.engine_adapter.mixins import (
GetCurrentCatalogFromFunctionMixin,
Expand Down Expand Up @@ -35,6 +36,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
)
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
SUPPORTS_CREATE_DROP_CATALOG = True

@property
def catalog_support(self) -> CatalogSupport:
Expand All @@ -44,6 +46,18 @@ def set_current_catalog(self, catalog: str) -> None:
"""Sets the catalog name of the current connection."""
self.execute(exp.Use(this=exp.to_identifier(catalog)))

def _create_catalog(self, catalog_name: exp.Identifier) -> None:
db_filename = f"{catalog_name.output_name}.db"
self.execute(
exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True)
)

def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
db_file_path = Path(f"{catalog_name.output_name}.db")
self.execute(exp.Detach(this=catalog_name, exists=True))
if db_file_path.exists():
db_file_path.unlink()

def _df_to_source_queries(
self,
df: DF,
Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/core/engine_adapter/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,16 @@ def is_clustered(self) -> bool:


class CatalogSupport(Enum):
# The engine has no concept of catalogs
UNSUPPORTED = 1

# The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables
SINGLE_CATALOG_ONLY = 2

# The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding
REQUIRES_SET_CATALOG = 3

# The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first)
FULL_SUPPORT = 4

@property
Expand Down
33 changes: 33 additions & 0 deletions sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
from sqlglot.optimizer.qualify_columns import quote_identifiers

import sqlmesh.core.constants as c
from sqlmesh.core.dialect import to_schema
from sqlmesh.core.engine_adapter.mixins import (
GetCurrentCatalogFromFunctionMixin,
Expand Down Expand Up @@ -43,6 +44,7 @@
"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG,
"create_schema": CatalogSupport.REQUIRES_SET_CATALOG,
"drop_schema": CatalogSupport.REQUIRES_SET_CATALOG,
"drop_catalog": CatalogSupport.REQUIRES_SET_CATALOG, # needs a catalog to issue a query to information_schema.databases even though the result is global
}
)
class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixin, RowDiffMixin):
Expand All @@ -52,6 +54,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi
SUPPORTS_CLONING = True
SUPPORTS_MANAGED_MODELS = True
CURRENT_CATALOG_EXPRESSION = exp.func("current_database")
SUPPORTS_CREATE_DROP_CATALOG = True
SCHEMA_DIFFER = SchemaDiffer(
parameterized_type_defaults={
exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)],
Expand Down Expand Up @@ -123,6 +126,36 @@ def snowpark(self) -> t.Optional[SnowparkSession]:
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.FULL_SUPPORT

def _create_catalog(self, catalog_name: exp.Identifier) -> None:
props = exp.Properties(
expressions=[exp.SchemaCommentProperty(this=exp.Literal.string(c.SQLMESH_MANAGED))]
)
self.execute(
exp.Create(
this=exp.Table(this=catalog_name), kind="DATABASE", exists=True, properties=props
)
)

def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
# only drop the catalog if it was created by SQLMesh, which is indicated by its comment matching {c.SQLMESH_MANAGED}
exists_check = (
exp.select(exp.Literal.number(1))
.from_(exp.to_table("information_schema.databases"))
.where(
exp.and_(
exp.column("database_name").eq(exp.Literal.string(catalog_name)),
exp.column("comment").eq(exp.Literal.string(c.SQLMESH_MANAGED)),
)
)
)
normalize_identifiers(exists_check, dialect=self.dialect)
if self.fetchone(exists_check, quote_identifiers=True) is not None:
self.execute(exp.Drop(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True))
else:
logger.warning(
f"Not dropping database {catalog_name.sql(dialect=self.dialect)} because there is no indication it is '{c.SQLMESH_MANAGED}'"
)

def _create_table(
self,
table_name_or_schema: t.Union[exp.Schema, TableName],
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class EnvironmentNamingInfo(PydanticModel):
normalize_name: bool = True
gateway_managed: bool = False

@property
def is_dev(self) -> bool:
return self.name.lower() != c.PROD

@field_validator("name", mode="before")
@classmethod
def _sanitize_name(cls, v: str) -> str:
Expand Down
18 changes: 9 additions & 9 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,19 @@ def table_for_environment(
def catalog_for_environment(
self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
) -> t.Optional[str]:
if environment_naming_info.catalog_name_override:
catalog_name: t.Optional[str] = None
if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog:
catalog_name = f"{self.catalog}__{environment_naming_info.name}"
elif environment_naming_info.catalog_name_override:
catalog_name = environment_naming_info.catalog_name_override

if catalog_name:
return (
normalize_identifiers(catalog_name, dialect=dialect).name
if environment_naming_info.normalize_name
else catalog_name
)

return self.catalog

def schema_for_environment(
Expand All @@ -295,10 +301,7 @@ def schema_for_environment(
if normalize:
schema = normalize_identifiers(schema, dialect=dialect).name

if (
environment_naming_info.name.lower() != c.PROD
and environment_naming_info.suffix_target.is_schema
):
if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema:
env_name = environment_naming_info.name
if normalize:
env_name = normalize_identifiers(env_name, dialect=dialect).name
Expand All @@ -311,10 +314,7 @@ def table_name_for_environment(
self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None
) -> str:
table = self.table
if (
environment_naming_info.name.lower() != c.PROD
and environment_naming_info.suffix_target.is_table
):
if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table:
env_name = environment_naming_info.name
if environment_naming_info.normalize_name:
env_name = normalize_identifiers(env_name, dialect=dialect).name
Expand Down
Loading