diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index 361171d937..901d35e6b2 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -244,7 +244,9 @@ This only applies to the _physical tables_ that SQLMesh creates - the views are SQLMesh stores `prod` environment views in the schema in a model's name - for example, the `prod` views for a model `my_schema.users` will be located in `my_schema`. -By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev`. +By default, for non-prod environments SQLMesh creates a new schema that appends the environment name to the model name's schema. For example, by default the view for a model `my_schema.users` in a SQLMesh environment named `dev` will be located in the schema `my_schema__dev` as `my_schema__dev.users`. + +##### Show at the table level instead This behavior can be changed to append a suffix at the end of a _table/view_ name instead. Appending the suffix to a table/view name means that non-prod environment views will be created in the same schema as the `prod` environment. The prod and non-prod views are differentiated by non-prod view names ending with `__`. @@ -260,7 +262,7 @@ Config example: === "Python" - The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE` or `EnvironmentSuffixTarget.SCHEMA` (default). + The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default). ```python linenums="1" from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget @@ -271,16 +273,58 @@ Config example: ) ``` -The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses. +!!! info "Default behavior" + The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean interface for accessing the views. However, if you are deploying SQLMesh in an environment with tight restrictions on schema creation then this can be a useful way of reducing the number of schemas SQLMesh uses. + +##### Show at the catalog level instead + +If neither the schema (default) nor the table level are sufficient for your use case, you can indicate the environment at the catalog level instead. + +This can be useful if you have downstream BI reporting tools and you would like to point them at a development environment to test something out without renaming all the table / schema references within the report query. + +In order to achieve this, you can configure [environment_suffix_target](../reference/configuration.md#environments) like so: + +=== "YAML" + + ```yaml linenums="1" + environment_suffix_target: catalog + ``` + +=== "Python" + + The Python `environment_suffix_target` argument takes an `EnvironmentSuffixTarget` enumeration with a value of `EnvironmentSuffixTarget.TABLE`, `EnvironmentSuffixTarget.CATALOG` or `EnvironmentSuffixTarget.SCHEMA` (default). + + ```python linenums="1" + from sqlmesh.core.config import Config, ModelDefaultsConfig, EnvironmentSuffixTarget + + config = Config( + model_defaults=ModelDefaultsConfig(dialect=), + environment_suffix_target=EnvironmentSuffixTarget.CATALOG, + ) + ``` + +Given the example of a model called `my_schema.users` with a default catalog of `warehouse` this will cause the following behavior: + +- For the `prod` environment, the default catalog as configured in the gateway will be used. So the view will be created at `warehouse.my_schema.users` +- For any other environment, eg `dev`, the environment name will be appended to the default catalog. So the view will be created at `warehouse__dev.my_schema.users` +- If a model is fully qualified with a catalog already, eg `finance_mart.my_schema.users`, then the environment catalog will be based off the model catalog and not the default catalog. In this example, the view will be created at `finance_mart__dev.my_schema.users` + + +!!! warning "Caveats" + - Using `environment_suffix_target: catalog` only works on engines that support querying across different catalogs. If your engine does not support cross-catalog queries then you will need to use `environment_suffix_target: schema` or `environment_suffix_target: table` instead. + - Automatic catalog creation is not supported on all engines even if they support cross-catalog queries. For engines where it is not supported, the catalogs must be managed externally from SQLMesh and exist prior to invoking SQLMesh. #### Environment view catalogs By default, SQLMesh creates an environment view in the same [catalog](../concepts/glossary.md#catalog) as the physical table the view points to. The physical table's catalog is determined by either the catalog specified in the model name or the default catalog defined in the connection. -Some companies fully segregate `prod` and non-prod environment objects by catalog. For example, they might have a "prod" catalog that contains all `prod` environment physical tables and views and a separate "dev" catalog that contains all `dev` environment physical tables and views. +It can be desirable to create `prod` and non-prod virtual layer objects in separate catalogs instead. For example, there might be a "prod" catalog that contains all `prod` environment views and a separate "dev" catalog that contains all `dev` environment views. Separate prod and non-prod catalogs can also be useful if you have a CI/CD pipeline that creates environments, like the [SQLMesh Github Actions CI/CD Bot](../integrations/github.md). You might want to store the CI/CD environment objects in a dedicated catalog since there can be many of them. +!!! info "Virtual layer only" + Note that the following setting only affects the [virtual layer](../concepts/glossary.md#virtual-layer). If you need full segregation by catalog between environments in the [physical layer](../concepts/glossary.md#physical-layer) as well, see the [Isolated Systems Guide](../guides/isolated_systems.md). + To configure separate catalogs, provide a mapping from [regex patterns](https://en.wikipedia.org/wiki/Regular_expression) to catalog names. SQLMesh will compare the name of an environment to the regex patterns; when it finds a match it will store the environment's objects in the corresponding catalog. SQLMesh evaluates the regex patterns in the order defined in the configuration; it uses the catalog for the first matching pattern. If no match is found, the catalog defined in the model or the default catalog defined on the connection will be used. @@ -317,6 +361,9 @@ With the example configuration above, SQLMesh would evaluate environment names a * If the environment name starts with `dev`, the catalog will be `dev`. * If the environment name starts with `analytics_repo`, the catalog will be `cicd`. +!!! warning + This feature is mutually exclusive with `environment_suffix_target: catalog` in order to prevent ambiguous mappings from being defined. Attempting to specify both `environment_catalog_mapping` and `environment_suffix_target: catalog` will raise an error on project load + *Note:* This feature is only available for engines that support querying across catalogs. At the time of writing, the following engines are **NOT** supported: * [MySQL](../integrations/engines/mysql.md) diff --git a/examples/sushi/config.py b/examples/sushi/config.py index c59675cf5b..bbe9ec7988 100644 --- a/examples/sushi/config.py +++ b/examples/sushi/config.py @@ -128,12 +128,17 @@ ) -environment_suffix_config = Config( +environment_suffix_table_config = Config( default_connection=DuckDBConnectionConfig(), model_defaults=model_defaults, environment_suffix_target=EnvironmentSuffixTarget.TABLE, ) +environment_suffix_catalog_config = environment_suffix_table_config.model_copy( + update={ + "environment_suffix_target": EnvironmentSuffixTarget.CATALOG, + } +) CATALOGS = { "in_memory": ":memory:", diff --git a/sqlmesh/core/config/common.py b/sqlmesh/core/config/common.py index 4efdb41647..d7be902713 100644 --- a/sqlmesh/core/config/common.py +++ b/sqlmesh/core/config/common.py @@ -10,9 +10,22 @@ class EnvironmentSuffixTarget(str, Enum): + # Intended to create virtual environments in their own schemas, with names like "__". The view name is untouched. + # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' + # would have its virtual layer view created as 'sqlmesh_example__dev.full_model' SCHEMA = "schema" + + # Intended to create virtual environments in the same schema as their production counterparts by adjusting the table name. + # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' + # would have its virtual layer view created as "sqlmesh_example.full_model__dev" TABLE = "table" + # Intended to create virtual environments in their own catalogs to preserve the schema and view name of the models + # For example, a model named 'sqlmesh_example.full_model' created in an environment called 'dev' + # with a default catalog of "warehouse" would have its virtual layer view created as "warehouse__dev.sqlmesh_example.full_model" + # note: this only works for engines that can query across catalogs + CATALOG = "catalog" + @property def is_schema(self) -> bool: return self == EnvironmentSuffixTarget.SCHEMA @@ -21,6 +34,10 @@ def is_schema(self) -> bool: def is_table(self) -> bool: return self == EnvironmentSuffixTarget.TABLE + @property + def is_catalog(self) -> bool: + return self == EnvironmentSuffixTarget.CATALOG + @classproperty def default(cls) -> EnvironmentSuffixTarget: return EnvironmentSuffixTarget.SCHEMA diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 1d53235f73..a8b8a2a797 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -242,6 +242,15 @@ def _normalize_identifiers(key: str) -> None: }, ) + if ( + self.environment_suffix_target == EnvironmentSuffixTarget.CATALOG + and self.environment_catalog_mapping + ): + raise ConfigError( + f"'environment_suffix_target: catalog' is mutually exclusive with 'environment_catalog_mapping'.\n" + "Please specify one or the other" + ) + if self.environment_catalog_mapping: _normalize_identifiers("environment_catalog_mapping") if self.physical_schema_mapping: diff --git a/sqlmesh/core/constants.py b/sqlmesh/core/constants.py index 2ab592f368..27d6cf0d7f 100644 --- a/sqlmesh/core/constants.py +++ b/sqlmesh/core/constants.py @@ -7,6 +7,7 @@ from pathlib import Path SQLMESH = "sqlmesh" +SQLMESH_MANAGED = "sqlmesh_managed" SQLMESH_PATH = Path.home() / ".sqlmesh" PROD = "prod" diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 29ca98fc2b..a317008b1a 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -101,6 +101,7 @@ class EngineAdapter: SUPPORTS_VIEW_SCHEMA = True SUPPORTS_CLONING = False SUPPORTS_MANAGED_MODELS = False + SUPPORTS_CREATE_DROP_CATALOG = False SCHEMA_DIFFER = SchemaDiffer() SUPPORTS_TUPLE_IN = True HAS_VIEW_BINDING = False @@ -1217,6 +1218,22 @@ def drop_view( **kwargs, ) + def create_catalog(self, catalog_name: str | exp.Identifier) -> None: + return self._create_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) + + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + raise SQLMeshError( + f"Unable to create catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." + ) + + def drop_catalog(self, catalog_name: str | exp.Identifier) -> None: + return self._drop_catalog(exp.parse_identifier(catalog_name, dialect=self.dialect)) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + raise SQLMeshError( + f"Unable to drop catalog '{catalog_name.sql(dialect=self.dialect)}' as automatic catalog management is not implemented in the {self.dialect} engine." + ) + def columns( self, table_name: TableName, include_pseudo_columns: bool = False ) -> t.Dict[str, exp.DataType]: diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index f7f72f9692..169a7a7f94 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -2,6 +2,7 @@ import typing as t from sqlglot import exp +from pathlib import Path from sqlmesh.core.engine_adapter.mixins import ( GetCurrentCatalogFromFunctionMixin, @@ -35,6 +36,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, ) COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY + SUPPORTS_CREATE_DROP_CATALOG = True @property def catalog_support(self) -> CatalogSupport: @@ -44,6 +46,18 @@ def set_current_catalog(self, catalog: str) -> None: """Sets the catalog name of the current connection.""" self.execute(exp.Use(this=exp.to_identifier(catalog))) + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + db_filename = f"{catalog_name.output_name}.db" + self.execute( + exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True) + ) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + db_file_path = Path(f"{catalog_name.output_name}.db") + self.execute(exp.Detach(this=catalog_name, exists=True)) + if db_file_path.exists(): + db_file_path.unlink() + def _df_to_source_queries( self, df: DF, diff --git a/sqlmesh/core/engine_adapter/shared.py b/sqlmesh/core/engine_adapter/shared.py index e1d93b5e2f..1d882de02f 100644 --- a/sqlmesh/core/engine_adapter/shared.py +++ b/sqlmesh/core/engine_adapter/shared.py @@ -173,9 +173,16 @@ def is_clustered(self) -> bool: class CatalogSupport(Enum): + # The engine has no concept of catalogs UNSUPPORTED = 1 + + # The engine has a concept of catalogs, but they are isolated from each other and cannot reference each others tables SINGLE_CATALOG_ONLY = 2 + + # The engine supports multiple catalogs but some operations require a SET CATALOG query to set the active catalog before proceeding REQUIRES_SET_CATALOG = 3 + + # The engine supports multiple catalogs and can unambiguously target a specific catalog when performing operations (without running SET CATALOG first) FULL_SUPPORT = 4 @property diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 032c1da4fe..71ffc10f48 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -9,6 +9,7 @@ from sqlglot.optimizer.normalize_identifiers import normalize_identifiers from sqlglot.optimizer.qualify_columns import quote_identifiers +import sqlmesh.core.constants as c from sqlmesh.core.dialect import to_schema from sqlmesh.core.engine_adapter.mixins import ( GetCurrentCatalogFromFunctionMixin, @@ -43,6 +44,7 @@ "_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG, "create_schema": CatalogSupport.REQUIRES_SET_CATALOG, "drop_schema": CatalogSupport.REQUIRES_SET_CATALOG, + "drop_catalog": CatalogSupport.REQUIRES_SET_CATALOG, # needs a catalog to issue a query to information_schema.databases even though the result is global } ) class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixin, RowDiffMixin): @@ -52,6 +54,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi SUPPORTS_CLONING = True SUPPORTS_MANAGED_MODELS = True CURRENT_CATALOG_EXPRESSION = exp.func("current_database") + SUPPORTS_CREATE_DROP_CATALOG = True SCHEMA_DIFFER = SchemaDiffer( parameterized_type_defaults={ exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)], @@ -123,6 +126,36 @@ def snowpark(self) -> t.Optional[SnowparkSession]: def catalog_support(self) -> CatalogSupport: return CatalogSupport.FULL_SUPPORT + def _create_catalog(self, catalog_name: exp.Identifier) -> None: + props = exp.Properties( + expressions=[exp.SchemaCommentProperty(this=exp.Literal.string(c.SQLMESH_MANAGED))] + ) + self.execute( + exp.Create( + this=exp.Table(this=catalog_name), kind="DATABASE", exists=True, properties=props + ) + ) + + def _drop_catalog(self, catalog_name: exp.Identifier) -> None: + # only drop the catalog if it was created by SQLMesh, which is indicated by its comment matching {c.SQLMESH_MANAGED} + exists_check = ( + exp.select(exp.Literal.number(1)) + .from_(exp.to_table("information_schema.databases")) + .where( + exp.and_( + exp.column("database_name").eq(exp.Literal.string(catalog_name)), + exp.column("comment").eq(exp.Literal.string(c.SQLMESH_MANAGED)), + ) + ) + ) + normalize_identifiers(exists_check, dialect=self.dialect) + if self.fetchone(exists_check, quote_identifiers=True) is not None: + self.execute(exp.Drop(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True)) + else: + logger.warning( + f"Not dropping database {catalog_name.sql(dialect=self.dialect)} because there is no indication it is '{c.SQLMESH_MANAGED}'" + ) + def _create_table( self, table_name_or_schema: t.Union[exp.Schema, TableName], diff --git a/sqlmesh/core/environment.py b/sqlmesh/core/environment.py index 891d299df8..13ca1c5485 100644 --- a/sqlmesh/core/environment.py +++ b/sqlmesh/core/environment.py @@ -43,6 +43,10 @@ class EnvironmentNamingInfo(PydanticModel): normalize_name: bool = True gateway_managed: bool = False + @property + def is_dev(self) -> bool: + return self.name.lower() != c.PROD + @field_validator("name", mode="before") @classmethod def _sanitize_name(cls, v: str) -> str: diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index ba422bcdcb..573d3bc75d 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -274,13 +274,19 @@ def table_for_environment( def catalog_for_environment( self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None ) -> t.Optional[str]: - if environment_naming_info.catalog_name_override: + catalog_name: t.Optional[str] = None + if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_catalog: + catalog_name = f"{self.catalog}__{environment_naming_info.name}" + elif environment_naming_info.catalog_name_override: catalog_name = environment_naming_info.catalog_name_override + + if catalog_name: return ( normalize_identifiers(catalog_name, dialect=dialect).name if environment_naming_info.normalize_name else catalog_name ) + return self.catalog def schema_for_environment( @@ -295,10 +301,7 @@ def schema_for_environment( if normalize: schema = normalize_identifiers(schema, dialect=dialect).name - if ( - environment_naming_info.name.lower() != c.PROD - and environment_naming_info.suffix_target.is_schema - ): + if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_schema: env_name = environment_naming_info.name if normalize: env_name = normalize_identifiers(env_name, dialect=dialect).name @@ -311,10 +314,7 @@ def table_name_for_environment( self, environment_naming_info: EnvironmentNamingInfo, dialect: DialectType = None ) -> str: table = self.table - if ( - environment_naming_info.name.lower() != c.PROD - and environment_naming_info.suffix_target.is_table - ): + if environment_naming_info.is_dev and environment_naming_info.suffix_target.is_table: env_name = environment_naming_info.name if environment_naming_info.normalize_name: env_name = normalize_identifiers(env_name, dialect=dialect).name diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 39a8f5147c..f2042583d0 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -251,6 +251,8 @@ def promote( # A schema can be shared across multiple engines, so we need to group by gateway for gateway, tables in tables_by_gateway.items(): + if environment_naming_info.suffix_target.is_catalog: + self._create_catalogs(tables=tables, gateway=gateway) self._create_schemas(tables=tables, gateway=gateway) deployability_index = deployability_index or DeployabilityIndex.all_deployable() @@ -1114,6 +1116,18 @@ def _audit( blocking=blocking, ) + def _create_catalogs( + self, + tables: t.Iterable[t.Union[exp.Table, str]], + gateway: t.Optional[str] = None, + ) -> None: + # attempt to create catalogs for the virtual layer if possible + adapter = self.get_adapter(gateway) + if adapter.SUPPORTS_CREATE_DROP_CATALOG: + unique_catalogs = {t.catalog for t in [exp.to_table(maybe_t) for maybe_t in tables]} + for catalog_name in unique_catalogs: + adapter.create_catalog(catalog_name) + def _create_schemas( self, tables: t.Iterable[t.Union[exp.Table, str]], diff --git a/sqlmesh/core/state_sync/common.py b/sqlmesh/core/state_sync/common.py index d5e20e9e8e..12899da82e 100644 --- a/sqlmesh/core/state_sync/common.py +++ b/sqlmesh/core/state_sync/common.py @@ -30,7 +30,9 @@ def cleanup_expired_views( console: t.Optional[Console] = None, ) -> None: expired_schema_environments = [ - environment for environment in environments if environment.suffix_target.is_schema + environment + for environment in environments + if environment.suffix_target.is_schema or environment.suffix_target.is_catalog ] expired_table_environments = [ environment for environment in environments if environment.suffix_target.is_table @@ -42,8 +44,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin return engine_adapters.get(gateway, default_adapter) return default_adapter + catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set() + # Drop the schemas for the expired environments - for engine_adapter, expired_catalog, expired_schema in { + for engine_adapter, expired_catalog, expired_schema, suffix_target in { ( (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), snapshot.qualified_view_name.catalog_for_environment( @@ -52,6 +56,7 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin snapshot.qualified_view_name.schema_for_environment( environment.naming_info, dialect=engine_adapter.dialect ), + environment.suffix_target, ) for environment in expired_schema_environments for snapshot in environment.snapshots @@ -64,6 +69,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin ignore_if_not_exists=True, cascade=True, ) + + if suffix_target.is_catalog and expired_catalog: + catalogs_to_drop.add((engine_adapter, expired_catalog)) + if console: console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) except Exception as e: @@ -96,6 +105,21 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin else: raise SQLMeshError(message) from e + # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs + # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog' + for engine_adapter, catalog in catalogs_to_drop: + if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG: + try: + engine_adapter.drop_catalog(catalog) + if console: + console.update_cleanup_progress(catalog) + except Exception as e: + message = f"Failed to drop the expired environment catalog '{catalog}': {e}" + if warn_on_delete_failure: + logger.warning(message) + else: + raise SQLMeshError(message) from e + def transactional() -> t.Callable[[t.Callable], t.Callable]: def decorator(func: t.Callable) -> t.Callable: diff --git a/tests/conftest.py b/tests/conftest.py index 492da9db58..574c802c0e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -105,12 +105,25 @@ def qualified_views(self) -> t.List[exp.Table]: @property def schemas(self) -> t.List[str]: + return self.schemas_in_catalog(self.engine_adapter.get_current_catalog() or "") + + def schemas_in_catalog(self, catalog_name: str) -> t.List[str]: return self._get_single_col( - f"SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '{self.engine_adapter.get_current_catalog()}' and {self._system_schema_filter('schema_name')}", + f"SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '{catalog_name}' and {self._system_schema_filter('schema_name')}", "schema_name", self.engine_adapter, ) + @property + def catalogs(self) -> t.Set[str]: + return set( + self._get_single_col( + f"SELECT database_name FROM duckdb_databases() WHERE internal=false", + "database_name", + self.engine_adapter, + ) + ) + def _system_schema_filter(self, col: str) -> str: return f"{col} not in ('information_schema', 'pg_catalog', 'main')" diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 3aa8529ef6..7e35b832be 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -205,6 +205,9 @@ def __init__( self._schemas: t.List[ str ] = [] # keep track of any schemas returned from self.schema() / self.table() so we can drop them at the end + self._catalogs: t.List[ + str + ] = [] # keep track of any catalogs created via self.create_catalog() so we can drop them at the end @property def test_type(self) -> str: @@ -685,6 +688,8 @@ def create_catalog(self, catalog_name: str): except Exception: pass + self._catalogs.append(catalog_name) + def drop_catalog(self, catalog_name: str): if self.dialect == "bigquery": return # bigquery cannot create/drop catalogs @@ -707,6 +712,9 @@ def cleanup(self, ctx: t.Optional[Context] = None): schema_name=schema_name, ignore_if_not_exists=True, cascade=True ) + for catalog_name in set(self._catalogs): + self.drop_catalog(catalog_name) + self.engine_adapter.close() def upsert_sql_model(self, model_definition: str) -> t.Tuple[Context, SqlModel]: diff --git a/tests/core/engine_adapter/integration/test_integration_snowflake.py b/tests/core/engine_adapter/integration/test_integration_snowflake.py index 314a9e0f20..12e45f1f14 100644 --- a/tests/core/engine_adapter/integration/test_integration_snowflake.py +++ b/tests/core/engine_adapter/integration/test_integration_snowflake.py @@ -181,7 +181,7 @@ def _get_data_object(table: exp.Table) -> DataObject: assert not metadata.is_clustered -def test_create_iceberg_table(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter) -> None: +def test_create_iceberg_table(ctx: TestContext) -> None: # Note: this test relies on a default Catalog and External Volume being configured in Snowflake # ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration#set-a-default-catalog-at-the-account-database-or-schema-level # ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume#set-a-default-external-volume-at-the-account-database-or-schema-level @@ -271,3 +271,39 @@ def execute(context: ExecutionContext, start: datetime, **kwargs) -> DataFrame: query = exp.select("*").from_(table) df = ctx.engine_adapter.fetchdf(query, quote_identifiers=True) assert len(df) == 10 + + +def test_create_drop_catalog(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter): + non_sqlmesh_managed_catalog = ctx.add_test_suffix("external_catalog") + sqlmesh_managed_catalog = ctx.add_test_suffix("env_dev") + + initial_catalog = engine_adapter.get_current_catalog() + assert initial_catalog + + ctx.create_catalog( + non_sqlmesh_managed_catalog + ) # create via TestContext so the sqlmesh_managed comment doesnt get added + ctx._catalogs.append(sqlmesh_managed_catalog) # so it still gets cleaned up if the test fails + + engine_adapter.create_catalog( + sqlmesh_managed_catalog + ) # create via EngineAdapter so the sqlmesh_managed comment is added + + def fetch_database_names() -> t.Set[str]: + engine_adapter.set_current_catalog(initial_catalog) + return { + str(r[0]) + for r in engine_adapter.fetchall( + f"select database_name from information_schema.databases where database_name like '%{ctx.test_id}'" + ) + } + + assert fetch_database_names() == {non_sqlmesh_managed_catalog, sqlmesh_managed_catalog} + + engine_adapter.drop_catalog( + non_sqlmesh_managed_catalog + ) # no-op: catalog is not SQLMesh-managed + assert fetch_database_names() == {non_sqlmesh_managed_catalog, sqlmesh_managed_catalog} + + engine_adapter.drop_catalog(sqlmesh_managed_catalog) # works, catalog is SQLMesh-managed + assert fetch_database_names() == {non_sqlmesh_managed_catalog} diff --git a/tests/core/engine_adapter/test_duckdb.py b/tests/core/engine_adapter/test_duckdb.py index 93ef72e874..543b2e2f18 100644 --- a/tests/core/engine_adapter/test_duckdb.py +++ b/tests/core/engine_adapter/test_duckdb.py @@ -89,3 +89,17 @@ def test_temporary_table(make_mocked_engine_adapter: t.Callable, duck_conn): assert to_sql_calls(adapter) == [ 'CREATE TEMPORARY TABLE IF NOT EXISTS "test_table" ("a" INT, "b" INT)', ] + + +def test_create_catalog(make_mocked_engine_adapter: t.Callable) -> None: + adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + adapter.create_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == ["ATTACH IF NOT EXISTS 'foo.db' AS \"foo\""] + + +def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: + adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + adapter.drop_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"'] diff --git a/tests/core/engine_adapter/test_snowflake.py b/tests/core/engine_adapter/test_snowflake.py index f0a47b3393..4ca13ee8f9 100644 --- a/tests/core/engine_adapter/test_snowflake.py +++ b/tests/core/engine_adapter/test_snowflake.py @@ -814,3 +814,22 @@ def test_create_view_with_schema_and_grants( # materialized view - COPY GRANTS goes before the column list """CREATE OR REPLACE MATERIALIZED VIEW "target_materialized_view" COPY GRANTS ("ID", "NAME") COMMENT='materialized **view** from integration test' AS SELECT 1 AS "ID", 'foo' AS "NAME\"""", ] + + +def test_create_catalog(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter) -> None: + adapter = snowflake_mocked_engine_adapter + adapter.create_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == [ + "CREATE DATABASE IF NOT EXISTS \"foo\" COMMENT='sqlmesh_managed'" + ] + + +def test_drop_catalog(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter) -> None: + adapter = snowflake_mocked_engine_adapter + adapter.drop_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == [ + """SELECT 1 FROM "INFORMATION_SCHEMA"."DATABASES" WHERE "DATABASE_NAME" = 'foo' AND "COMMENT" = 'sqlmesh_managed'""", + 'DROP DATABASE IF EXISTS "foo"', + ] diff --git a/tests/core/test_config.py b/tests/core/test_config.py index dea9fb16da..e3b7a8e612 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -16,6 +16,7 @@ BigQueryConnectionConfig, MotherDuckConnectionConfig, BuiltInSchedulerConfig, + EnvironmentSuffixTarget, ) from sqlmesh.core.config.connection import DuckDBAttachOptions, RedshiftConnectionConfig from sqlmesh.core.config.feature_flag import DbtFeatureFlag, FeatureFlag @@ -1055,3 +1056,51 @@ def test_loader_for_migrated_dbt_project(tmp_path: Path): ) assert config.loader == MigratedDbtProjectLoader + + +def test_environment_suffix_target_catalog(tmp_path: Path) -> None: + config_path = tmp_path / "config.yaml" + config_path.write_text(""" + gateways: + warehouse: + connection: + type: duckdb + + default_gateway: warehouse + + model_defaults: + dialect: duckdb + + environment_suffix_target: catalog +""") + + config = load_config_from_paths( + Config, + project_paths=[config_path], + ) + + assert config.environment_suffix_target == EnvironmentSuffixTarget.CATALOG + assert not config.environment_catalog_mapping + + config_path.write_text(""" + gateways: + warehouse: + connection: + type: duckdb + + default_gateway: warehouse + + model_defaults: + dialect: duckdb + + environment_suffix_target: catalog + + environment_catalog_mapping: + '.*': "foo" +""") + + with pytest.raises(ConfigError, match=r"mutually exclusive"): + config = load_config_from_paths( + Config, + project_paths=[config_path], + ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 8725318506..f6e696ab01 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -11,6 +11,7 @@ import numpy as np # noqa: TID253 import pandas as pd # noqa: TID253 import pytest +from pytest import MonkeyPatch from pathlib import Path from sqlmesh.core.console import set_console, get_console, TerminalConsole from sqlmesh.core.config.naming import NameInferenceConfig @@ -34,6 +35,7 @@ ModelDefaultsConfig, DuckDBConnectionConfig, ) +from sqlmesh.core.config.common import EnvironmentSuffixTarget from sqlmesh.core.console import Console, get_console from sqlmesh.core.context import Context from sqlmesh.core.config.categorizer import CategorizerConfig @@ -5259,7 +5261,9 @@ def test_invalidating_environment(sushi_context: Context): def test_environment_suffix_target_table(init_and_plan_context: t.Callable): - context, plan = init_and_plan_context("examples/sushi", config="environment_suffix_config") + context, plan = init_and_plan_context( + "examples/sushi", config="environment_suffix_table_config" + ) context.apply(plan) metadata = DuckDBMetadata.from_context(context) environments_schemas = {"sushi"} @@ -5295,6 +5299,116 @@ def test_environment_suffix_target_table(init_and_plan_context: t.Callable): } == set() +def test_environment_suffix_target_catalog(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: + monkeypatch.chdir(tmp_path) + + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + default_connection=DuckDBConnectionConfig(catalogs={"main_warehouse": ":memory:"}), + environment_suffix_target=EnvironmentSuffixTarget.CATALOG, + ) + + assert config.default_connection + + models_dir = tmp_path / "models" + models_dir.mkdir() + + (models_dir / "model.sql").write_text(""" + MODEL ( + name example_schema.test_model, + kind FULL + ); + + SELECT '1' as a""") + + (models_dir / "fqn_model.sql").write_text(""" + MODEL ( + name memory.example_fqn_schema.test_model_fqn, + kind FULL + ); + + SELECT '1' as a""") + + ctx = Context(config=config, paths=tmp_path) + + metadata = DuckDBMetadata.from_context(ctx) + assert ctx.default_catalog == "main_warehouse" + assert metadata.catalogs == {"main_warehouse", "memory"} + + ctx.plan(auto_apply=True) + + # prod should go to the default catalog and not be overridden to a catalog called 'prod' + assert ( + ctx.engine_adapter.fetchone("select * from main_warehouse.example_schema.test_model")[0] # type: ignore + == "1" + ) + assert ( + ctx.engine_adapter.fetchone("select * from memory.example_fqn_schema.test_model_fqn")[0] # type: ignore + == "1" + ) + assert metadata.catalogs == {"main_warehouse", "memory"} + assert metadata.schemas_in_catalog("main_warehouse") == [ + "example_schema", + "sqlmesh__example_schema", + ] + assert metadata.schemas_in_catalog("memory") == [ + "example_fqn_schema", + "sqlmesh__example_fqn_schema", + ] + + # dev should be overridden to go to a catalogs called 'main_warehouse__dev' and 'memory__dev' + ctx.plan(environment="dev", include_unmodified=True, auto_apply=True) + assert ( + ctx.engine_adapter.fetchone("select * from main_warehouse__dev.example_schema.test_model")[ + 0 + ] # type: ignore + == "1" + ) + assert ( + ctx.engine_adapter.fetchone("select * from memory__dev.example_fqn_schema.test_model_fqn")[ + 0 + ] # type: ignore + == "1" + ) + assert metadata.catalogs == {"main_warehouse", "main_warehouse__dev", "memory", "memory__dev"} + + # schemas in dev envs should match prod and not have a suffix + assert metadata.schemas_in_catalog("main_warehouse") == [ + "example_schema", + "sqlmesh__example_schema", + ] + assert metadata.schemas_in_catalog("main_warehouse__dev") == ["example_schema"] + assert metadata.schemas_in_catalog("memory") == [ + "example_fqn_schema", + "sqlmesh__example_fqn_schema", + ] + assert metadata.schemas_in_catalog("memory__dev") == ["example_fqn_schema"] + + ctx.invalidate_environment("dev", sync=True) + + # dev catalogs cleaned up + assert metadata.catalogs == {"main_warehouse", "memory"} + + # prod catalogs still contain physical layer and views still work + assert metadata.schemas_in_catalog("main_warehouse") == [ + "example_schema", + "sqlmesh__example_schema", + ] + assert metadata.schemas_in_catalog("memory") == [ + "example_fqn_schema", + "sqlmesh__example_fqn_schema", + ] + + assert ( + ctx.engine_adapter.fetchone("select * from main_warehouse.example_schema.test_model")[0] # type: ignore + == "1" + ) + assert ( + ctx.engine_adapter.fetchone("select * from memory.example_fqn_schema.test_model_fqn")[0] # type: ignore + == "1" + ) + + def test_environment_catalog_mapping(init_and_plan_context: t.Callable): environments_schemas = {"raw", "sushi"} diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index 387c799e95..e4eb12c522 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -1780,7 +1780,31 @@ def test_is_valid_start(make_snapshot): EnvironmentNamingInfo(name="dev", catalog_name_override="g-h"), '"g-h".default__dev."e-f"', ), - (QualifiedViewName(table="e-f"), EnvironmentNamingInfo(name="dev"), 'default__dev."e-f"'), + ( + QualifiedViewName(table="e-f"), + EnvironmentNamingInfo(name="dev"), + 'default__dev."e-f"', + ), + # EnvironmentSuffixTarget.CATALOG + ( + QualifiedViewName( + catalog="default-foo", schema_name="sqlmesh_example", table="full_model" + ), + EnvironmentNamingInfo( + name="dev", + suffix_target=EnvironmentSuffixTarget.CATALOG, + ), + '"default-foo__dev".sqlmesh_example.full_model', + ), + ( + QualifiedViewName(catalog="default", schema_name="sqlmesh_example", table="full_model"), + EnvironmentNamingInfo( + name=c.PROD, + catalog_name_override=None, + suffix_target=EnvironmentSuffixTarget.CATALOG, + ), + "default.sqlmesh_example.full_model", + ), ), ) def test_qualified_view_name(qualified_view_name, environment_naming_info, expected):