Skip to content

Commit d8dcf20

Browse files
committed
Snowflake: Only drop databases if SQLMesh was the one who created them
1 parent 707fe3e commit d8dcf20

5 files changed

Lines changed: 81 additions & 5 deletions

File tree

sqlmesh/core/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pathlib import Path
88

99
SQLMESH = "sqlmesh"
10+
SQLMESH_MANAGED = "sqlmesh_managed"
1011
SQLMESH_PATH = Path.home() / ".sqlmesh"
1112

1213
PROD = "prod"

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1010
from sqlglot.optimizer.qualify_columns import quote_identifiers
1111

12+
import sqlmesh.core.constants as c
1213
from sqlmesh.core.dialect import to_schema
1314
from sqlmesh.core.engine_adapter.mixins import (
1415
GetCurrentCatalogFromFunctionMixin,
@@ -43,6 +44,7 @@
4344
"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG,
4445
"create_schema": CatalogSupport.REQUIRES_SET_CATALOG,
4546
"drop_schema": CatalogSupport.REQUIRES_SET_CATALOG,
47+
"drop_catalog": CatalogSupport.REQUIRES_SET_CATALOG, # needs a catalog to issue a query to information_schema.databases even though the result is global
4648
}
4749
)
4850
class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixin, RowDiffMixin):
@@ -125,10 +127,34 @@ def catalog_support(self) -> CatalogSupport:
125127
return CatalogSupport.FULL_SUPPORT
126128

127129
def _create_catalog(self, catalog_name: exp.Identifier) -> None:
128-
self.execute(exp.Create(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True))
130+
props = exp.Properties(
131+
expressions=[exp.SchemaCommentProperty(this=exp.Literal.string(c.SQLMESH_MANAGED))]
132+
)
133+
self.execute(
134+
exp.Create(
135+
this=exp.Table(this=catalog_name), kind="DATABASE", exists=True, properties=props
136+
)
137+
)
129138

130139
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
131-
self.execute(exp.Drop(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True))
140+
# only drop the catalog if it was created by SQLMesh, which is indicated by its comment matching {c.SQLMESH_MANAGED}
141+
exists_check = (
142+
exp.select(exp.Literal.number(1))
143+
.from_(exp.to_table("information_schema.databases"))
144+
.where(
145+
exp.and_(
146+
exp.column("database_name").eq(exp.Literal.string(catalog_name)),
147+
exp.column("comment").eq(exp.Literal.string(c.SQLMESH_MANAGED)),
148+
)
149+
)
150+
)
151+
normalize_identifiers(exists_check, dialect=self.dialect)
152+
if self.fetchone(exists_check, quote_identifiers=True) is not None:
153+
self.execute(exp.Drop(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True))
154+
else:
155+
logger.warning(
156+
f"Not dropping database {catalog_name.sql(dialect=self.dialect)} because there is no indication it is '{c.SQLMESH_MANAGED}'"
157+
)
132158

133159
def _create_table(
134160
self,

tests/core/engine_adapter/integration/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ def __init__(
205205
self._schemas: t.List[
206206
str
207207
] = [] # keep track of any schemas returned from self.schema() / self.table() so we can drop them at the end
208+
self._catalogs: t.List[
209+
str
210+
] = [] # keep track of any catalogs created via self.create_catalog() so we can drop them at the end
208211

209212
@property
210213
def test_type(self) -> str:
@@ -685,6 +688,8 @@ def create_catalog(self, catalog_name: str):
685688
except Exception:
686689
pass
687690

691+
self._catalogs.append(catalog_name)
692+
688693
def drop_catalog(self, catalog_name: str):
689694
if self.dialect == "bigquery":
690695
return # bigquery cannot create/drop catalogs
@@ -707,6 +712,9 @@ def cleanup(self, ctx: t.Optional[Context] = None):
707712
schema_name=schema_name, ignore_if_not_exists=True, cascade=True
708713
)
709714

715+
for catalog_name in set(self._catalogs):
716+
self.drop_catalog(catalog_name)
717+
710718
self.engine_adapter.close()
711719

712720
def upsert_sql_model(self, model_definition: str) -> t.Tuple[Context, SqlModel]:

tests/core/engine_adapter/integration/test_integration_snowflake.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def _get_data_object(table: exp.Table) -> DataObject:
181181
assert not metadata.is_clustered
182182

183183

184-
def test_create_iceberg_table(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter) -> None:
184+
def test_create_iceberg_table(ctx: TestContext) -> None:
185185
# Note: this test relies on a default Catalog and External Volume being configured in Snowflake
186186
# ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration#set-a-default-catalog-at-the-account-database-or-schema-level
187187
# ref: https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume#set-a-default-external-volume-at-the-account-database-or-schema-level
@@ -271,3 +271,39 @@ def execute(context: ExecutionContext, start: datetime, **kwargs) -> DataFrame:
271271
query = exp.select("*").from_(table)
272272
df = ctx.engine_adapter.fetchdf(query, quote_identifiers=True)
273273
assert len(df) == 10
274+
275+
276+
def test_create_drop_catalog(ctx: TestContext, engine_adapter: SnowflakeEngineAdapter):
277+
non_sqlmesh_managed_catalog = ctx.add_test_suffix("external_catalog")
278+
sqlmesh_managed_catalog = ctx.add_test_suffix("env_dev")
279+
280+
initial_catalog = engine_adapter.get_current_catalog()
281+
assert initial_catalog
282+
283+
ctx.create_catalog(
284+
non_sqlmesh_managed_catalog
285+
) # create via TestContext so the sqlmesh_managed comment doesnt get added
286+
ctx._catalogs.append(sqlmesh_managed_catalog) # so it still gets cleaned up if the test fails
287+
288+
engine_adapter.create_catalog(
289+
sqlmesh_managed_catalog
290+
) # create via EngineAdapter so the sqlmesh_managed comment is added
291+
292+
def fetch_database_names() -> t.Set[str]:
293+
engine_adapter.set_current_catalog(initial_catalog)
294+
return {
295+
str(r[0])
296+
for r in engine_adapter.fetchall(
297+
f"select database_name from information_schema.databases where database_name like '%{ctx.test_id}'"
298+
)
299+
}
300+
301+
assert fetch_database_names() == {non_sqlmesh_managed_catalog, sqlmesh_managed_catalog}
302+
303+
engine_adapter.drop_catalog(
304+
non_sqlmesh_managed_catalog
305+
) # no-op: catalog is not SQLMesh-managed
306+
assert fetch_database_names() == {non_sqlmesh_managed_catalog, sqlmesh_managed_catalog}
307+
308+
engine_adapter.drop_catalog(sqlmesh_managed_catalog) # works, catalog is SQLMesh-managed
309+
assert fetch_database_names() == {non_sqlmesh_managed_catalog}

tests/core/engine_adapter/test_snowflake.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -820,11 +820,16 @@ def test_create_catalog(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter)
820820
adapter = snowflake_mocked_engine_adapter
821821
adapter.create_catalog(exp.to_identifier("foo"))
822822

823-
assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "foo"']
823+
assert to_sql_calls(adapter) == [
824+
"CREATE DATABASE IF NOT EXISTS \"foo\" COMMENT='sqlmesh_managed'"
825+
]
824826

825827

826828
def test_drop_catalog(snowflake_mocked_engine_adapter: SnowflakeEngineAdapter) -> None:
827829
adapter = snowflake_mocked_engine_adapter
828830
adapter.drop_catalog(exp.to_identifier("foo"))
829831

830-
assert to_sql_calls(adapter) == ['DROP DATABASE IF EXISTS "foo"']
832+
assert to_sql_calls(adapter) == [
833+
"""SELECT 1 FROM "INFORMATION_SCHEMA"."DATABASES" WHERE "DATABASE_NAME" = 'foo' AND "COMMENT" = 'sqlmesh_managed'""",
834+
'DROP DATABASE IF EXISTS "foo"',
835+
]

0 commit comments

Comments
 (0)