Skip to content

Commit e0f922f

Browse files
authored
Chore: use information_schema in dbx _get_data_objects (#4111)
1 parent b0e455b commit e0f922f

1 file changed

Lines changed: 39 additions & 8 deletions

File tree

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
import pandas as pd
88
from sqlglot import exp
9-
9+
from sqlmesh.core.dialect import to_schema
1010
from sqlmesh.core.engine_adapter.shared import (
1111
CatalogSupport,
1212
DataObject,
13+
DataObjectType,
1314
InsertOverwriteStrategy,
14-
set_catalog,
1515
SourceQuery,
1616
)
1717
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
@@ -27,11 +27,6 @@
2727
logger = logging.getLogger(__name__)
2828

2929

30-
@set_catalog(
31-
{
32-
"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG,
33-
}
34-
)
3530
class DatabricksEngineAdapter(SparkEngineAdapter):
3631
DIALECT = "databricks"
3732
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.REPLACE_WHERE
@@ -251,7 +246,43 @@ def _set_spark_session_current_catalog(spark: PySparkSession) -> None:
251246
def _get_data_objects(
252247
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
253248
) -> t.List[DataObject]:
254-
return super()._get_data_objects(schema_name, object_names=object_names)
249+
"""
250+
Returns all the data objects that exist in the given schema and catalog.
251+
"""
252+
schema = to_schema(schema_name)
253+
catalog_name = schema.catalog or self.get_current_catalog()
254+
query = (
255+
exp.select(
256+
exp.column("table_name").as_("name"),
257+
exp.column("table_schema").as_("schema"),
258+
exp.column("table_catalog").as_("catalog"),
259+
exp.case(exp.column("table_type"))
260+
.when(exp.Literal.string("VIEW"), exp.Literal.string("view"))
261+
.when(exp.Literal.string("MATERIALIZED_VIEW"), exp.Literal.string("view"))
262+
.else_(exp.Literal.string("table"))
263+
.as_("type"),
264+
)
265+
.from_(
266+
# always query `system` information_schema
267+
exp.table_("tables", "information_schema", "system")
268+
)
269+
.where(exp.column("table_catalog").eq(catalog_name))
270+
.where(exp.column("table_schema").eq(schema.db))
271+
)
272+
273+
if object_names:
274+
query = query.where(exp.column("table_name").isin(*object_names))
275+
276+
df = self.fetchdf(query)
277+
return [
278+
DataObject(
279+
catalog=row.catalog, # type: ignore
280+
schema=row.schema, # type: ignore
281+
name=row.name, # type: ignore
282+
type=DataObjectType.from_str(row.type), # type: ignore
283+
)
284+
for row in df.itertuples()
285+
]
255286

256287
def clone_table(
257288
self,

0 commit comments

Comments
 (0)