From 1f59413a1ceee278e5f303270e96da30aef512b7 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Thu, 19 Jun 2025 01:37:32 +0000 Subject: [PATCH 1/3] Feat: Allow virtual environments to be given dedicated catalogs --- docs/guides/configuration.md | 55 ++++++++- examples/sushi/config.py | 7 +- sqlmesh/core/config/common.py | 17 +++ sqlmesh/core/config/root.py | 9 ++ sqlmesh/core/constants.py | 1 + sqlmesh/core/engine_adapter/base.py | 17 +++ sqlmesh/core/engine_adapter/duckdb.py | 14 +++ sqlmesh/core/engine_adapter/shared.py | 7 ++ sqlmesh/core/engine_adapter/snowflake.py | 33 +++++ sqlmesh/core/environment.py | 4 + sqlmesh/core/snapshot/definition.py | 18 +-- sqlmesh/core/snapshot/evaluator.py | 14 +++ sqlmesh/core/state_sync/common.py | 28 ++++- tests/conftest.py | 15 ++- .../engine_adapter/integration/__init__.py | 8 ++ .../integration/test_integration_snowflake.py | 38 +++++- tests/core/engine_adapter/test_duckdb.py | 14 +++ tests/core/engine_adapter/test_snowflake.py | 19 +++ tests/core/test_config.py | 49 ++++++++ tests/core/test_integration.py | 116 +++++++++++++++++- tests/core/test_snapshot.py | 26 +++- 21 files changed, 489 insertions(+), 20 deletions(-) diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index 361171d937..e954f0817d 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -244,7 +244,9 @@ This only applies to the _physical tables_ that SQLMesh creates - the views are SQLMesh stores `prod` environment views in the schema in a model's name - for example, the `prod` views for a model `my_schema.users` will be located in `my_schema`. -By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev`. +By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev` as `my_schema__dev.users`. + +##### Show at the table level instead This behavior can be changed to append a suffix at the end of a _table/view_ name instead. Appending the suffix to a table/view name means that non-prod environment views will be created in the same schema as the `prod` environment. The prod and non-prod views are differentiated by non-prod view names ending with `__`. @@ -260,7 +262,7 @@ Config example: === "Python" - The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE` or `EnvironmentSuffixTarget.SCHEMA` (default). + The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default). ```python linenums="1" from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget @@ -271,16 +273,58 @@ Config example: ) ``` -The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses. +!!! info "Default behavior" + The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses. + +##### Show at the catalog level instead + +If neither the schema (default) nor the table level are sufficient for your use case, you may indicate the environment at the catalog level instead. + +This can be useful if you have downstream BI reporting tools and you would like to point them at a development environment to test something out without renaming all the table / schema references within the report query. + +In order to achieve this, you may configure [environment_suffix_target](../reference/configuration.md#environments) like so: + +=== "YAML" + + ```yaml linenums="1" + environment_suffix_target: catalog + ``` + +=== "Python" + + The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default). + + ```python linenums="1" + from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget + + config = Config( + model_defaults=ModelDefaultsConfig(dialect=), + environment_suffix_target=EnvironmentSuffixTarget.CATALOG, + ) + ``` + +Given the example of a model called `my_schema.users` with a default catalog of `warehouse` this will cause the following behavior: + +- For the `prod` environment, the default catalog as configured in the gateway will be used. So the view will be created at `warehouse.my_schema.users` +- For any other environment, eg `dev`, the environment name will be appended to the default catalog. So the view will be created at `warehouse__dev.my_schema.users` +- If a model is fully qualified with a catalog already, eg `finance_mart.my_schema.users`, then the environment catalog will be based off the model catalog and not the default catalog. In this example, the view will be created at `finance_mart__dev.my_schema.users` + + +!!! warning "Caveats" + - Using `environment_suffix_target: catalog` only works on engines that support querying across different catalogs. If your engine does not support cross-catalog queries then you will need to use `environment_suffix_target: schema` or `environment_suffix_target: table` instead. + - Automatic catalog creation is not supported on all engines even if they support cross-catalog queries. For engines where it is not supported, the catalogs must exist prior to invoking SQLMesh. #### Environment view catalogs By default, SQLMesh creates an environment view in the same [catalog](../concepts/glossary.md#catalog) as the physical table the view points to. The physical table's catalog is determined by either the catalog specified in the model name or the default catalog defined in the connection. -Some companies fully segregate `prod` and non-prod environment objects by catalog. For example, they might have a "prod" catalog that contains all `prod` environment physical tables and views and a separate "dev" catalog that contains all `dev` environment physical tables and views. +It can be desirable to create `prod` and non-prod virtual layer objects in separate catalogs instead. For example, there might be a "prod" catalog that contains all `prod` environment views and a separate "dev" catalog that contains all `dev` environment views. Separate prod and non-prod catalogs can also be useful if you have a CI/CD pipeline that creates environments, like the [SQLMesh Github Actions CI/CD Bot](../integrations/github.md). You might want to store the CI/CD environment objects in a dedicated catalog since there can be many of them. +!!! info "Virtual layer only" + Note that the following setting only affects the [virtual layer](../concepts/glossary.md#virtual-layer). If you need full segregation by catalog between environments in the [physical layer](../concepts/glossary.md#physical-layer) as well, see the [Isolated Systems Guide](../guides/isolated_systems.md). + To configure separate catalogs, provide a mapping from [regex patterns](https://en.wikipedia.org/wiki/Regular_expression) to catalog names. SQLMesh will compare the name of an environment to the regex patterns; when it finds a match it will store the environment's objects in the corresponding catalog. SQLMesh evaluates the regex patterns in the order defined in the configuration; it uses the catalog for the first matching pattern. If no match is found, the catalog defined in the model or the default catalog defined on the connection will be used. @@ -317,6 +361,9 @@ With the example configuration above, SQLMesh would evaluate environment names a * If the environment name starts with `dev`, the catalog will be `dev`. * If the environment name starts with `analytics_repo`, the catalog will be `cicd`. +!!! warning + This feature is mutually exclusive with `environment_suffix_target: catalog` in order to prevent ambiguous mappings from being defined. Attempting to specify both settings will raise an error on project load + *Note:* This feature is only available for engines that support querying across catalogs. At the time of writing, the following engines are **NOT** supported: * [MySQL](../integrations/engines/mysql.md) diff --git a/examples/sushi/config.py b/examples/sushi/config.py index c59675cf5b..bbe9ec7988 100644 --- a/examples/sushi/config.py +++ b/examples/sushi/config.py @@ -128,12 +128,17 @@ ) -environment_suffix_config = Config( +environment_suffix_table_config = Config( default_connection=DuckDBConnectionConfig(), model_defaults=model_defaults, environment_suffix_target=EnvironmentSuffixTarget.TABLE, ) +environment_suffix_catalog_config = environment_suffix_table_config.model_copy( + update={ + "environment_suffix_target": EnvironmentSuffixTarget.CATALOG, + } +) CATALOGS = { "in_memory": ":memory:", diff --git a/sqlmesh/core/config/common.py b/sqlmesh/core/config/common.py index 4efdb41647..8a1bcdc6ea 100644 --- a/sqlmesh/core/config/common.py +++ b/sqlmesh/core/config/common.py @@ -10,9 +10,22 @@ class EnvironmentSuffixTarget(str, Enum): + # Intended to create virtual environments in their own schemas, with names like "__". The view name is untouched. + # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' + # would have its virtual layer view created as 'sqlmesh_example__dev.full_model' SCHEMA = "schema" + + # Intended to create virtual environments in the same schema as their production counterparts by adjusting the table name. + # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' + # would have its virtual layer view created as "sqlmesh_example.full_model__dev" TABLE = "table" + # Intended to create virtual environments in their own catalogs to preserve the schema and view name of the models + # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' + # would have its virtual layer view created as "dev.sqlmesh_example.full_model" + # note: this only works for engines that can query across catalogs + CATALOG = "catalog" + @property def is_schema(self) -> bool: return self == EnvironmentSuffixTarget.SCHEMA @@ -21,6 +34,10 @@ def is_schema(self) -> bool: def is_table(self) -> bool: return self == EnvironmentSuffixTarget.TABLE + @property + def is_catalog(self) -> bool: + return self == EnvironmentSuffixTarget.CATALOG + @classproperty def default(cls) -> EnvironmentSuffixTarget: return EnvironmentSuffixTarget.SCHEMA diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 1d53235f73..6727a90613 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -242,6 +242,15 @@ def _normalize_identifiers(key: str) -> None: }, ) + if ( + self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG + and self.environment_catalog_mapping + ): + raise ConfigError( + f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n" + "Please specify one or thr other" + ) + if self.environment_catalog_mapping: _normalize_identifiers("environment_catalog_mapping") if self.physical_schema_mapping: diff --git a/sqlmesh/core/constants.py b/sqlmesh/core/constants.py index 2ab592f368..27d6cf0d7f 100644 --- a/sqlmesh/core/constants.py +++ b/sqlmesh/core/constants.py @@ -7,6 +7,7 @@ from pathlib import Path SQLMESH = "sqlmesh" +SQLMESH_MANAGED = "sqlmesh_managed" SQLMESH_PATH = Path.home() / ".sqlmesh" PROD = "prod" diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 29ca98fc2b..5eee191ced 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -101,6 +101,7 @@ class EngineAdapter: SUPPORTS_VIEW_SCHEMA = True SUPPORTS_CLONING = False SUPPORTS_MANAGED_MODELS = False + SUPPORTS_CREATE_DROP_CATALOG = False SCHEMA_DIFFER = SchemaDiffer() SUPPORTS_TUPLE_IN = True HAS_VIEW_BINDING = False @@ -1217,6 +1218,22 @@ def drop_view( **kwargs, ) + def create_catalog(self, catalog_name: str | exp.Identifier) -> None: + return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) + + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + raise NotImplementedError( + f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." + ) + + def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: + return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + raise NotImplementedError( + f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." + ) + def columns( self, table_name: TableName, include_pseudo_columns: bool = False ) -> t.Dict[str, exp.DataType]: diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index f7f72f9692..169a7a7f94 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -2,6 +2,7 @@ import typing as t from sqlglot import exp +from pathlib import Path from sqlmesh.core.engine_adapter.mixins import ( GetCurrentCatalogFromFunctionMixin, @@ -35,6 +36,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, ) COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY + SUPPORTS_CREATE_DROP_CATALOG = True @property def catalog_support(self) -> CatalogSupport: @@ -44,6 +46,18 @@ def set_current_catalog(self, catalog: str) -> None: """Sets the catalog name of the current connection.""" self.execute(exp.Use(this=exp.to_identifier(catalog))) + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + db_filename = f"{catalog_name.output_name}.db" + self.execute( + exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True) + ) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + db_file_path = Path(f"{catalog_name.output_name}.db") + self.execute(exp.Detach(this=catalog_name, exists=True)) + if db_file_path.exists(): + db_file_path.unlink() + def _df_to_source_queries( self, df: DF, diff --git a/sqlmesh/core/engine_adapter/shared.py b/sqlmesh/core/engine_adapter/shared.py index e1d93b5e2f..1d882de02f 100644 --- a/sqlmesh/core/engine_adapter/shared.py +++ b/sqlmesh/core/engine_adapter/shared.py @@ -173,9 +173,16 @@ def is_clustered(self) -> bool: class CatalogSupport(Enum): + # The engine has no concept of catalogs UNSUPPORTED = 1 + + # The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables SINGLE_CATALOG_ONLY = 2 + + # The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding REQUIRES_SET_CATALOG = 3 + + # The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first) FULL_SUPPORT = 4 @property diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 032c1da4fe..71ffc10f48 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -9,6 +9,7 @@ from sqlglot.optimizer.normalize_identifiers import normalize_identifiers from sqlglot.optimizer.qualify_columns import quote_identifiers +import sqlmesh.core.constants as c from sqlmesh.core.dialect import to_schema from sqlmesh.core.engine_adapter.mixins import ( GetCurrentCatalogFromFunctionMixin, @@ -43,6 +44,7 @@ "_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG, "create_schema": CatalogSupport.REQUIRES_SET_CATALOG, "drop_schema": CatalogSupport.REQUIRES_SET_CATALOG, + "drop_catalog": CatalogSupport.REQUIRES_SET_CATALOG, # needs a catalog to issue a query to information_schema.databases even though the result is global } ) class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixin, RowDiffMixin): @@ -52,6 +54,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi SUPPORTS_CLONING = True SUPPORTS_MANAGED_MODELS = True CURRENT_CATALOG_EXPRESSION = exp.func("current_database") + SUPPORTS_CREATE_DROP_CATALOG = True SCHEMA_DIFFER = SchemaDiffer( parameterized_type_defaults={ exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)], @@ -123,6 +126,36 @@ def snowpark(self) -> t.Optional[SnowparkSession]: def catalog_support(self) -> CatalogSupport: return CatalogSupport.FULL_SUPPORT + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + props = exp.Properties( + expressions=[exp.SchemaCommentProperty(this=exp.Literal.string(c.SQLMESH_MANAGED))] + ) + self.execute( + exp.Create( + this=exp.Table(this=catalog_name), kind="DATABASE", exists=True, properties=props + ) + ) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + # only drop the catalog if it was created by SQLMesh, which is indicated by its comment matching {c.SQLMESH_MANAGED} + exists_check = ( + exp.select(exp.Literal.number(1)) + .from_(exp.to_table("information_schema.databases")) + .where( + exp.and_( + exp.column("database_name").eq(exp.Literal.string(catalog_name)), + exp.column("comment").eq(exp.Literal.string(c.SQLMESH_MANAGED)), + ) + ) + ) + normalize_identifiers(exists_check, dialect=self.dialect) + if self.fetchone(exists_check, quote_identifiers=True) is not None: + self.execute(exp.Drop(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True)) + else: + logger.warning( + f"Not dropping database {catalog_name.sql(dialect=self.dialect)} because there is no indication it is '{c.SQLMESH_MANAGED}'" + ) + def _create_table( self, table_name_or_schema: t.Union[exp.Schema, TableName], diff --git a/sqlmesh/core/environment.py b/sqlmesh/core/environment.py index 891d299df8..13ca1c5485 100644 --- a/sqlmesh/core/environment.py +++ b/sqlmesh/core/environment.py @@ -43,6 +43,10 @@ class EnvironmentNamingInfo(PydanticModel): normalize_name: bool = True gateway_managed: bool = False + @property + def is_dev(self) -> bool: + return self.name.lower() != c.PROD + @field_validator("name", mode="before") @classmethod def _sanitize_name(cls, v: str) -> str: diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index ba422bcdcb..573d3bc75d 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -274,13 +274,19 @@ def table_for_environment( def catalog_for_environment( self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None ) -> t.Optional[str]: - if environment_naming_info.catalog_name_override: + catalog_name: t.Optional[str] = None + if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog: + catalog_name = f"{self.catalog}__{environment_naming_info.name}" + elif environment_naming_info.catalog_name_override: catalog_name = environment_naming_info.catalog_name_override + + if catalog_name: return ( normalize_identifiers(catalog_name, dialect=dialect).name if environment_naming_info.normalize_name else catalog_name ) + return self.catalog def schema_for_environment( @@ -295,10 +301,7 @@ def schema_for_environment( if normalize: schema = normalize_identifiers(schema, dialect=dialect).name - if ( - environment_naming_info.name.lower() != c.PROD - and environment_naming_info.suffix_target.is_schema - ): + if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema: env_name = environment_naming_info.name if normalize: env_name = normalize_identifiers(env_name, dialect=dialect).name @@ -311,10 +314,7 @@ def table_name_for_environment( self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None ) -> str: table = self.table - if ( - environment_naming_info.name.lower() != c.PROD - and environment_naming_info.suffix_target.is_table - ): + if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table: env_name = environment_naming_info.name if environment_naming_info.normalize_name: env_name = normalize_identifiers(env_name, dialect=dialect).name diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 39a8f5147c..f2042583d0 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -251,6 +251,8 @@ def promote( # A schema can be shared across multiple engines, so we need to group by gateway for gateway, tables in tables_by_gateway.items(): + if environment_naming_info.suffix_target.is_catalog: + self._create_catalogs(tables=tables, gateway=gateway) self._create_schemas(tables=tables, gateway=gateway) deployability_index = deployability_index or DeployabilityIndex.all_deployable() @@ -1114,6 +1116,18 @@ def _audit( blocking=blocking, ) + def _create_catalogs( + self, + tables: t.Iterable[t.Union[exp.Table, str]], + gateway: t.Optional[str] = None, + ) -> None: + # attempt to create catalogs for the virtual layer if possible + adapter = self.get_adapter(gateway) + if adapter.SUPPORTS_CREATE_DROP_CATALOG: + unique_catalogs = {t.catalog for t in [exp.to_table(maybe_t) for maybe_t in tables]} + for catalog_name in unique_catalogs: + adapter.create_catalog(catalog_name) + def _create_schemas( self, tables: t.Iterable[t.Union[exp.Table, str]], diff --git a/sqlmesh/core/state_sync/common.py b/sqlmesh/core/state_sync/common.py index d5e20e9e8e..12899da82e 100644 --- a/sqlmesh/core/state_sync/common.py +++ b/sqlmesh/core/state_sync/common.py @@ -30,7 +30,9 @@ def cleanup_expired_views( console: t.Optional[Console] = None, ) -> None: expired_schema_environments = [ - environment for environment in environments if environment.suffix_target.is_schema + environment + for environment in environments + if environment.suffix_target.is_schema or environment.suffix_target.is_catalog ] expired_table_environments = [ environment for environment in environments if environment.suffix_target.is_table @@ -42,8 +44,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin return engine_adapters.get(gateway, default_adapter) return default_adapter + catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set() + # Drop the schemas for the expired environments - for engine_adapter, expired_catalog, expired_schema in { + for engine_adapter, expired_catalog, expired_schema, suffix_target in { ( (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), snapshot.qualified_view_name.catalog_for_environment( @@ -52,6 +56,7 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin snapshot.qualified_view_name.schema_for_environment( environment.naming_info, dialect=engine_adapter.dialect ), + environment.suffix_target, ) for environment in expired_schema_environments for snapshot in environment.snapshots @@ -64,6 +69,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin ignore_if_not_exists=True, cascade=True, ) + + if suffix_target.is_catalog and expired_catalog: + catalogs_to_drop.add((engine_adapter, expired_catalog)) + if console: console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) except Exception as e: @@ -96,6 +105,21 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin else: raise SQLMeshError(message) from e + # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs + # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog' + for engine_adapter, catalog in catalogs_to_drop: + if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG: + try: + engine_adapter.drop_catalog(catalog) + if console: + console.update_cleanup_progress(catalog) + except Exception as e: + message = f"Failed to drop the expired environment catalog '{catalog}': {e}" + if warn_on_delete_failure: + logger.warning(message) + else: + raise SQLMeshError(message) from e + def transactional() -> t.Callable[[t.Callable], t.Callable]: def decorator(func: t.Callable) -> t.Callable: diff --git a/tests/conftest.py b/tests/conftest.py index 492da9db58..574c802c0e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -105,12 +105,25 @@ def qualified_views(self) -> t.List[exp.Table]: @property def schemas(self) -> t.List[str]: + return self.schemas_in_catalog(self.engine_adapter.get_current_catalog() or "") + + def schemas_in_catalog(self, catalog_name: str) -> t.List[str]: return self._get_single_col( - f"SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '{self.engine_adapter.get_current_catalog()}' and {self._system_schema_filter('schema_name')}", + f"SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '{catalog_name}' and {self._system_schema_filter('schema_name')}", "schema_name", self.engine_adapter, ) + @property + def catalogs(self) -> t.Set[str]: + return set( + self._get_single_col( + f"SELECT database_name FROM duckdb_databases() WHERE internal=false", + "database_name", + self.engine_adapter, + ) + ) + def _system_schema_filter(self, col: str) -> str: return f"{col} not in ('information_schema', 'pg_catalog', 'main')" diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 3aa8529ef6..7e35b832be 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -205,6 +205,9 @@ def __init__( self._schemas: t.List[ str ] = [] # keep track of any schemas returned from self.schema() / self.table() so we can drop them at the end + self._catalogs: t.List[ + str + ] = [] # keep track of any catalogs created via self.create_catalog() so we can drop them at the end @property def test_type(self) -> str: @@ -685,6 +688,8 @@ def create_catalog(self, catalog_name: str): except Exception: pass + self._catalogs.append(catalog_name) + def drop_catalog(self, catalog_name: str): if self.dialect == "bigquery": return # bigquery cannot create/drop catalogs @@ -707,6 +712,9 @@ def cleanup(self, ctx: t.Optional[Context] = None): schema_name=schema_name, ignore_if_not_exists=True, cascade=True ) + for catalog_name in set(self._catalogs): + self.drop_catalog(catalog_name) + self.engine_adapter.close() def upsert_sql_model(self, model_definition: str) -> t.Tuple[Context, SqlModel]: diff --git a/tests/core/engine_adapter/integration/test_integration_snowflake.py b/tests/core/engine_adapter/integration/test_integration_snowflake.py index 314a9e0f20..12e45f1f14 100644 --- a/tests/core/engine_adapter/integration/test_integration_snowflake.py +++ b/tests/core/engine_adapter/integration/test_integration_snowflake.py @@ -181,7 +181,7 @@ def _get_data_object(table: exp.Table) -> DataObject: assert not metadata.is_clustered -def test_create_iceberg_table(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter) -> None: +def test_create_iceberg_table(ctx: TestContext) -> None: # Note: this test relies on a default Catalog and External Volume being configured in Snowflake # ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration#set-a-default-catalog-at-the-account-database-or-schema-level # ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume#set-a-default-external-volume-at-the-account-database-or-schema-level @@ -271,3 +271,39 @@ def execute(context: ExecutionContext, start: datetime, **kwargs) -> DataFrame: query = exp.select("*").from_(table) df = ctx.engine_adapter.fetchdf(query, quote_identifiers=True) assert len(df) == 10 + + +def test_create_drop_catalog(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter): + non_sqlmesh_managed_catalog = ctx.add_test_suffix("external_catalog") + sqlmesh_managed_catalog = ctx.add_test_suffix("env_dev") + + initial_catalog = engine_adapter.get_current_catalog() + assert initial_catalog + + ctx.create_catalog( + non_sqlmesh_managed_catalog + ) # create via TestContext so the sqlmesh_managed comment doesnt get added + ctx._catalogs.append(sqlmesh_managed_catalog) # so it still gets cleaned up if the test fails + + engine_adapter.create_catalog( + sqlmesh_managed_catalog + ) # create via EngineAdapter so the sqlmesh_managed comment is added + + def fetch_database_names() -> t.Set[str]: + engine_adapter.set_current_catalog(initial_catalog) + return { + str(r[0]) + for r in engine_adapter.fetchall( + f"select database_name from information_schema.databases where database_name like '%{ctx.test_id}'" + ) + } + + assert fetch_database_names() == {non_sqlmesh_managed_catalog, sqlmesh_managed_catalog} + + engine_adapter.drop_catalog( + non_sqlmesh_managed_catalog + ) # no-op: catalog is not SQLMesh-managed + assert fetch_database_names() == {non_sqlmesh_managed_catalog, sqlmesh_managed_catalog} + + engine_adapter.drop_catalog(sqlmesh_managed_catalog) # works, catalog is SQLMesh-managed + assert fetch_database_names() == {non_sqlmesh_managed_catalog} diff --git a/tests/core/engine_adapter/test_duckdb.py b/tests/core/engine_adapter/test_duckdb.py index 93ef72e874..543b2e2f18 100644 --- a/tests/core/engine_adapter/test_duckdb.py +++ b/tests/core/engine_adapter/test_duckdb.py @@ -89,3 +89,17 @@ def test_temporary_table(make_mocked_engine_adapter: t.Callable, duck_conn): assert to_sql_calls(adapter) == [ 'CREATE TEMPORARY TABLE IF NOT EXISTS "test_table" ("a" INT, "b" INT)', ] + + +def test_create_catalog(make_mocked_engine_adapter: t.Callable) -> None: + adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + adapter.create_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == ["ATTACH IF NOT EXISTS 'foo.db' AS \"foo\""] + + +def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: + adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + adapter.drop_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"'] diff --git a/tests/core/engine_adapter/test_snowflake.py b/tests/core/engine_adapter/test_snowflake.py index f0a47b3393..4ca13ee8f9 100644 --- a/tests/core/engine_adapter/test_snowflake.py +++ b/tests/core/engine_adapter/test_snowflake.py @@ -814,3 +814,22 @@ def test_create_view_with_schema_and_grants( # materialized view - COPY GRANTS goes before the column list """CREATE OR REPLACE MATERIALIZED VIEW "target_materialized_view" COPY GRANTS ("ID", "NAME") COMMENT='materialized **view** from integration test' AS SELECT 1 AS "ID", 'foo' AS "NAME\"""", ] + + +def test_create_catalog(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter) -> None: + adapter = snowflake_mocked_engine_adapter + adapter.create_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == [ + "CREATE DATABASE IF NOT EXISTS \"foo\" COMMENT='sqlmesh_managed'" + ] + + +def test_drop_catalog(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter) -> None: + adapter = snowflake_mocked_engine_adapter + adapter.drop_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == [ + """SELECT 1 FROM "INFORMATION_SCHEMA"."DATABASES" WHERE "DATABASE_NAME" = 'foo' AND "COMMENT" = 'sqlmesh_managed'""", + 'DROP DATABASE IF EXISTS "foo"', + ] diff --git a/tests/core/test_config.py b/tests/core/test_config.py index dea9fb16da..e3b7a8e612 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -16,6 +16,7 @@ BigQueryConnectionConfig, MotherDuckConnectionConfig, BuiltInSchedulerConfig, + EnvironmentSuffixTarget, ) from sqlmesh.core.config.connection import DuckDBAttachOptions, RedshiftConnectionConfig from sqlmesh.core.config.feature_flag import DbtFeatureFlag, FeatureFlag @@ -1055,3 +1056,51 @@ def test_loader_for_migrated_dbt_project(tmp_path: Path): ) assert config.loader == MigratedDbtProjectLoader + + +def test_environment_suffix_target_catalog(tmp_path: Path) -> None: + config_path = tmp_path / "config.yaml" + config_path.write_text(""" + gateways: + warehouse: + connection: + type: duckdb + + default_gateway: warehouse + + model_defaults: + dialect: duckdb + + environment_suffix_target: catalog +""") + + config = load_config_from_paths( + Config, + project_paths=[config_path], + ) + + assert config.environment_suffix_target == EnvironmentSuffixTarget.CATALOG + assert not config.environment_catalog_mapping + + config_path.write_text(""" + gateways: + warehouse: + connection: + type: duckdb + + default_gateway: warehouse + + model_defaults: + dialect: duckdb + + environment_suffix_target: catalog + + environment_catalog_mapping: + '.*': "foo" +""") + + with pytest.raises(ConfigError, match=r"mutually exclusive"): + config = load_config_from_paths( + Config, + project_paths=[config_path], + ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 8725318506..f6e696ab01 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -11,6 +11,7 @@ import numpy as np # noqa: TID253 import pandas as pd # noqa: TID253 import pytest +from pytest import MonkeyPatch from pathlib import Path from sqlmesh.core.console import set_console, get_console, TerminalConsole from sqlmesh.core.config.naming import NameInferenceConfig @@ -34,6 +35,7 @@ ModelDefaultsConfig, DuckDBConnectionConfig, ) +from sqlmesh.core.config.common import EnvironmentSuffixTarget from sqlmesh.core.console import Console, get_console from sqlmesh.core.context import Context from sqlmesh.core.config.categorizer import CategorizerConfig @@ -5259,7 +5261,9 @@ def test_invalidating_environment(sushi_context: Context): def test_environment_suffix_target_table(init_and_plan_context: t.Callable): - context, plan = init_and_plan_context("examples/sushi", config="environment_suffix_config") + context, plan = init_and_plan_context( + "examples/sushi", config="environment_suffix_table_config" + ) context.apply(plan) metadata = DuckDBMetadata.from_context(context) environments_schemas = {"sushi"} @@ -5295,6 +5299,116 @@ def test_environment_suffix_target_table(init_and_plan_context: t.Callable): } == set() +def test_environment_suffix_target_catalog(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: + monkeypatch.chdir(tmp_path) + + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + default_connection=DuckDBConnectionConfig(catalogs={"main_warehouse": ":memory:"}), + environment_suffix_target=EnvironmentSuffixTarget.CATALOG, + ) + + assert config.default_connection + + models_dir = tmp_path / "models" + models_dir.mkdir() + + (models_dir / "model.sql").write_text(""" + MODEL ( + name example_schema.test_model, + kind FULL + ); + + SELECT '1' as a""") + + (models_dir / "fqn_model.sql").write_text(""" + MODEL ( + name memory.example_fqn_schema.test_model_fqn, + kind FULL + ); + + SELECT '1' as a""") + + ctx = Context(config=config, paths=tmp_path) + + metadata = DuckDBMetadata.from_context(ctx) + assert ctx.default_catalog == "main_warehouse" + assert metadata.catalogs == {"main_warehouse", "memory"} + + ctx.plan(auto_apply=True) + + # prod should go to the default catalog and not be overridden to a catalog called 'prod' + assert ( + ctx.engine_adapter.fetchone("select * from main_warehouse.example_schema.test_model")[0] # type: ignore + == "1" + ) + assert ( + ctx.engine_adapter.fetchone("select * from memory.example_fqn_schema.test_model_fqn")[0] # type: ignore + == "1" + ) + assert metadata.catalogs == {"main_warehouse", "memory"} + assert metadata.schemas_in_catalog("main_warehouse") == [ + "example_schema", + "sqlmesh__example_schema", + ] + assert metadata.schemas_in_catalog("memory") == [ + "example_fqn_schema", + "sqlmesh__example_fqn_schema", + ] + + # dev should be overridden to go to a catalogs called 'main_warehouse__dev' and 'memory__dev' + ctx.plan(environment="dev", include_unmodified=True, auto_apply=True) + assert ( + ctx.engine_adapter.fetchone("select * from main_warehouse__dev.example_schema.test_model")[ + 0 + ] # type: ignore + == "1" + ) + assert ( + ctx.engine_adapter.fetchone("select * from memory__dev.example_fqn_schema.test_model_fqn")[ + 0 + ] # type: ignore + == "1" + ) + assert metadata.catalogs == {"main_warehouse", "main_warehouse__dev", "memory", "memory__dev"} + + # schemas in dev envs should match prod and not have a suffix + assert metadata.schemas_in_catalog("main_warehouse") == [ + "example_schema", + "sqlmesh__example_schema", + ] + assert metadata.schemas_in_catalog("main_warehouse__dev") == ["example_schema"] + assert metadata.schemas_in_catalog("memory") == [ + "example_fqn_schema", + "sqlmesh__example_fqn_schema", + ] + assert metadata.schemas_in_catalog("memory__dev") == ["example_fqn_schema"] + + ctx.invalidate_environment("dev", sync=True) + + # dev catalogs cleaned up + assert metadata.catalogs == {"main_warehouse", "memory"} + + # prod catalogs still contain physical layer and views still work + assert metadata.schemas_in_catalog("main_warehouse") == [ + "example_schema", + "sqlmesh__example_schema", + ] + assert metadata.schemas_in_catalog("memory") == [ + "example_fqn_schema", + "sqlmesh__example_fqn_schema", + ] + + assert ( + ctx.engine_adapter.fetchone("select * from main_warehouse.example_schema.test_model")[0] # type: ignore + == "1" + ) + assert ( + ctx.engine_adapter.fetchone("select * from memory.example_fqn_schema.test_model_fqn")[0] # type: ignore + == "1" + ) + + def test_environment_catalog_mapping(init_and_plan_context: t.Callable): environments_schemas = {"raw", "sushi"} diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index 387c799e95..e4eb12c522 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -1780,7 +1780,31 @@ def test_is_valid_start(make_snapshot): EnvironmentNamingInfo(name="dev", catalog_name_override="g-h"), '"g-h".default__dev."e-f"', ), - (QualifiedViewName(table="e-f"), EnvironmentNamingInfo(name="dev"), 'default__dev."e-f"'), + ( + QualifiedViewName(table="e-f"), + EnvironmentNamingInfo(name="dev"), + 'default__dev."e-f"', + ), + # EnvironmentSuffixTarget.CATALOG + ( + QualifiedViewName( + catalog="default-foo", schema_name="sqlmesh_example", table="full_model" + ), + EnvironmentNamingInfo( + name="dev", + suffix_target=EnvironmentSuffixTarget.CATALOG, + ), + '"default-foo__dev".sqlmesh_example.full_model', + ), + ( + QualifiedViewName(catalog="default", schema_name="sqlmesh_example", table="full_model"), + EnvironmentNamingInfo( + name=c.PROD, + catalog_name_override=None, + suffix_target=EnvironmentSuffixTarget.CATALOG, + ), + "default.sqlmesh_example.full_model", + ), ), ) def test_qualified_view_name(qualified_view_name, environment_naming_info, expected): From 71f1e2f35333c8ad55679cc6a236f7a9d56c3f08 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Thu, 19 Jun 2025 01:51:58 +0000 Subject: [PATCH 2/3] typos --- docs/guides/configuration.md | 8 ++++---- sqlmesh/core/config/common.py | 2 +- sqlmesh/core/config/root.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index e954f0817d..901d35e6b2 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -278,11 +278,11 @@ Config example: ##### Show at the catalog level instead -If neither the schema (default) nor the table level are sufficient for your use case, you may indicate the environment at the catalog level instead. +If neither the schema (default) nor the table level are sufficient for your use case, you can indicate the environment at the catalog level instead. This can be useful if you have downstream BI reporting tools and you would like to point them at a development environment to test something out without renaming all the table / schema references within the report query. -In order to achieve this, you may configure [environment_suffix_target](../reference/configuration.md#environments) like so: +In order to achieve this, you can configure [environment_suffix_target](../reference/configuration.md#environments) like so: === "YAML" @@ -312,7 +312,7 @@ Given the example of a model called `my_schema.users` with a default catalog of !!! warning "Caveats" - Using `environment_suffix_target: catalog` only works on engines that support querying across different catalogs. If your engine does not support cross-catalog queries then you will need to use `environment_suffix_target: schema` or `environment_suffix_target: table` instead. - - Automatic catalog creation is not supported on all engines even if they support cross-catalog queries. For engines where it is not supported, the catalogs must exist prior to invoking SQLMesh. + - Automatic catalog creation is not supported on all engines even if they support cross-catalog queries. For engines where it is not supported, the catalogs must be managed externally from SQLMesh and exist prior to invoking SQLMesh. #### Environment view catalogs @@ -362,7 +362,7 @@ With the example configuration above, SQLMesh would evaluate environment names a * If the environment name starts with `analytics_repo`, the catalog will be `cicd`. !!! warning - This feature is mutually exclusive with `environment_suffix_target: catalog` in order to prevent ambiguous mappings from being defined. Attempting to specify both settings will raise an error on project load + This feature is mutually exclusive with `environment_suffix_target: catalog` in order to prevent ambiguous mappings from being defined. Attempting to specify both `environment_catalog_mapping` and `environment_suffix_target: catalog` will raise an error on project load *Note:* This feature is only available for engines that support querying across catalogs. At the time of writing, the following engines are **NOT** supported: diff --git a/sqlmesh/core/config/common.py b/sqlmesh/core/config/common.py index 8a1bcdc6ea..d7be902713 100644 --- a/sqlmesh/core/config/common.py +++ b/sqlmesh/core/config/common.py @@ -22,7 +22,7 @@ class EnvironmentSuffixTarget(str, Enum): # Intended to create virtual environments in their own catalogs to preserve the schema and view name of the models # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' - # would have its virtual layer view created as "dev.sqlmesh_example.full_model" + # with a default catalog of "warehouse" would have its virtual layer view created as "warehouse__dev.sqlmesh_example.full_model" # note: this only works for engines that can query across catalogs CATALOG = "catalog" diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 6727a90613..a8b8a2a797 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -248,7 +248,7 @@ def _normalize_identifiers(key: str) -> None: ): raise ConfigError( f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n" - "Please specify one or thr other" + "Please specify one or the other" ) if self.environment_catalog_mapping: From f13a20e46c1c559707e4427dc5c829486d815955 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Thu, 19 Jun 2025 21:50:06 +0000 Subject: [PATCH 3/3] Use SQLMeshError instead of NotImplementedError --- sqlmesh/core/engine_adapter/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 5eee191ced..a317008b1a 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1222,7 +1222,7 @@ def create_catalog(self, catalog_name: str | exp.Identifier) -> None: return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) def _create_catalog(self, catalog_name: exp.Identifier) -> None: - raise NotImplementedError( + raise SQLMeshError( f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." ) @@ -1230,7 +1230,7 @@ def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) def _drop_catalog(self, catalog_name: exp.Identifier) -> None: - raise NotImplementedError( + raise SQLMeshError( f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." )