Skip to content

Commit ea72563

Browse files
author
vishnu
committed
added support for iceberg tables on S3 table buckets using athena engine
Signed-off-by: vishnu <vishnu.t@redbus.com>
1 parent da2e5fc commit ea72563

1 file changed

Lines changed: 92 additions & 18 deletions

File tree

sqlmesh/core/engine_adapter/athena.py

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,9 @@ def s3_warehouse_location_or_raise(self) -> str:
7878

7979
@property
8080
def catalog_support(self) -> CatalogSupport:
81-
# Athena has the concept of catalogs but the current catalog is set in the connection parameters with no way to query or change it after that
82-
# It also cant create new catalogs, you have to configure them in AWS. Typically, catalogs that are not "awsdatacatalog"
83-
# are pointers to the "awsdatacatalog" of other AWS accounts
84-
return CatalogSupport.SINGLE_CATALOG_ONLY
81+
# Athena supports querying and writing to multiple catalogs (e.g. awsdatacatalog and s3tablescatalog)
82+
# without needing a SET CATALOG command.
83+
return CatalogSupport.FULL_SUPPORT
8584

8685
def create_state_table(
8786
self,
@@ -105,6 +104,9 @@ def _get_data_objects(
105104
"""
106105
schema_name = to_schema(schema_name)
107106
schema = schema_name.db
107+
108+
info_schema_tables = exp.table_("tables", db="information_schema", catalog=schema_name.catalog, alias="t")
109+
108110
query = (
109111
exp.select(
110112
exp.column("table_catalog").as_("catalog"),
@@ -118,7 +120,7 @@ def _get_data_objects(
118120
.else_(exp.column("table_type", table="t"))
119121
.as_("type"),
120122
)
121-
.from_(exp.to_table("information_schema.tables", alias="t"))
123+
.from_(info_schema_tables)
122124
.where(exp.column("table_schema", table="t").eq(schema))
123125
)
124126
if object_names:
@@ -141,9 +143,12 @@ def columns(
141143
) -> t.Dict[str, exp.DataType]:
142144
table = exp.to_table(table_name)
143145
# note: the data_type column contains the full parameterized type, eg 'varchar(10)'
146+
147+
info_schema_columns = exp.table_("columns", db="information_schema", catalog=table.catalog)
148+
144149
query = (
145150
exp.select("column_name", "data_type")
146-
.from_("information_schema.columns")
151+
.from_(info_schema_columns)
147152
.where(exp.column("table_schema").eq(table.db), exp.column("table_name").eq(table.name))
148153
.order_by("ordinal_position")
149154
)
@@ -197,6 +202,11 @@ def _build_create_table_exp(
197202
else:
198203
table = table_name_or_schema
199204

205+
table_format = kwargs.pop("table_format", None)
206+
if not table_format and table_properties and "table_format" in table_properties:
207+
tf = table_properties.get("table_format")
208+
table_format = tf.name if isinstance(tf, exp.Literal) else str(tf)
209+
200210
properties = self._build_table_properties_exp(
201211
table=table,
202212
expression=expression,
@@ -205,10 +215,11 @@ def _build_create_table_exp(
205215
table_properties=table_properties,
206216
table_description=table_description,
207217
table_kind=table_kind,
218+
table_format=table_format,
208219
**kwargs,
209220
)
210221

211-
is_hive = self._table_type(kwargs.get("table_format", None)) == "hive"
222+
is_hive = self._table_type(table_format) == "hive"
212223

213224
# Filter any PARTITIONED BY properties from the main column list since they cant be specified in both places
214225
# ref: https://docs.aws.amazon.com/athena/latest/ug/partitions.html
@@ -247,17 +258,36 @@ def _build_table_properties_exp(
247258
**kwargs: t.Any,
248259
) -> t.Optional[exp.Properties]:
249260
properties: t.List[exp.Expr] = []
250-
table_properties = table_properties or {}
261+
table_properties = table_properties.copy() if table_properties else {}
262+
263+
s3_table_prop = table_properties.pop("s3_table", None)
264+
is_s3_table = False
265+
if s3_table_prop is not None:
266+
if isinstance(s3_table_prop, exp.Boolean):
267+
is_s3_table = s3_table_prop.this
268+
elif isinstance(s3_table_prop, exp.Literal):
269+
is_s3_table = s3_table_prop.name.lower() in ("true", "1")
270+
else:
271+
is_s3_table = str(s3_table_prop).lower() in ("true", "1")
272+
elif table and table.catalog and table.catalog.startswith("s3tablescatalog/"):
273+
is_s3_table = True
274+
275+
tf = table_properties.pop("table_format", None)
276+
if not table_format and tf:
277+
table_format = tf.name if isinstance(tf, exp.Literal) else str(tf)
251278

252279
is_hive = self._table_type(table_format) == "hive"
253280
is_iceberg = not is_hive
254281

282+
if is_s3_table and is_hive:
283+
raise SQLMeshError("Amazon S3 Tables only support the Iceberg format")
284+
255285
if is_hive and not expression:
256286
# Hive tables are CREATE EXTERNAL TABLE, Iceberg tables are CREATE TABLE
257287
# Unless it's a CTAS, those are always CREATE TABLE
258288
properties.append(exp.ExternalProperty())
259289

260-
if table_format:
290+
if table_format and not is_s3_table:
261291
properties.append(
262292
exp.Property(this=exp.var("table_type"), value=exp.Literal.string(table_format))
263293
)
@@ -279,9 +309,30 @@ def _build_table_properties_exp(
279309
else:
280310
schema_expressions = partitioned_by
281311

282-
properties.append(
283-
exp.PartitionedByProperty(this=exp.Schema(expressions=schema_expressions))
284-
)
312+
if is_hive:
313+
properties.append(
314+
exp.PartitionedByProperty(this=exp.Schema(expressions=schema_expressions))
315+
)
316+
else:
317+
if is_s3_table:
318+
array_exprs = []
319+
for e in schema_expressions:
320+
e_copy = e.copy()
321+
e_copy.transform(
322+
lambda n: n.name if isinstance(n, exp.Identifier) else n, copy=False
323+
)
324+
expr_sql = e_copy.sql(dialect="athena")
325+
array_exprs.append(exp.Literal.string(expr_sql))
326+
327+
properties.append(
328+
exp.Property(
329+
this=exp.var("partitioning"), value=exp.Array(expressions=array_exprs)
330+
)
331+
)
332+
else:
333+
properties.append(
334+
exp.PartitionedByProperty(this=exp.Schema(expressions=schema_expressions))
335+
)
285336

286337
if clustered_by:
287338
# Athena itself supports CLUSTERED BY, via the syntax CLUSTERED BY (col) INTO <n> BUCKETS
@@ -293,13 +344,16 @@ def _build_table_properties_exp(
293344

294345
if storage_format:
295346
if is_iceberg:
296-
# TBLPROPERTIES('format'='parquet')
297-
table_properties["format"] = exp.Literal.string(storage_format)
347+
if not is_s3_table or storage_format.lower() == "parquet":
348+
# TBLPROPERTIES('format'='parquet')
349+
table_properties["format"] = exp.Literal.string(storage_format)
350+
elif is_s3_table and storage_format.lower() != "parquet":
351+
raise SQLMeshError("Amazon S3 Tables only support the PARQUET storage format")
298352
else:
299353
# STORED AS PARQUET
300354
properties.append(exp.FileFormatProperty(this=storage_format))
301355

302-
if table and (location := self._table_location_or_raise(table_properties, table)):
356+
if table and not is_s3_table and (location := self._table_location_or_raise(table_properties, table)):
303357
properties.append(location)
304358

305359
if is_iceberg and expression:
@@ -308,8 +362,28 @@ def _build_table_properties_exp(
308362
# ref: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties
309363
properties.append(exp.Property(this=exp.var("is_external"), value="false"))
310364

311-
for name, value in table_properties.items():
312-
properties.append(exp.Property(this=exp.var(name), value=value))
365+
if not is_s3_table:
366+
for name, value in table_properties.items():
367+
properties.append(exp.Property(this=exp.var(name), value=value))
368+
elif is_s3_table:
369+
# According to AWS documentation for S3 Tables CTAS queries:
370+
# "The `table_type` property defaults to `ICEBERG`, so you don't need to explicitly specify it"
371+
# "If you don't specify a format, the system automatically uses `PARQUET`"
372+
# We explicitly prevent all TBLPROPERTIES because Athena doesn't support them during CTAS
373+
if expression:
374+
# the only property allowed in CTAS for S3 Tables is 'format' (which we captured above)
375+
format_val = table_properties.pop("format", exp.Literal.string("PARQUET"))
376+
# Ensure it's uppercase PARQUET for S3 Tables just to be safe as per AWS examples
377+
if isinstance(format_val, exp.Literal) and format_val.name.lower() == "parquet":
378+
format_val = exp.Literal.string("PARQUET")
379+
properties.append(exp.Property(this=exp.var("format"), value=format_val))
380+
381+
if table_properties:
382+
logging.warning(f"Ignoring unsupported table properties for S3 Table CTAS: {list(table_properties.keys())}")
383+
else:
384+
# Standard CREATE TABLE for S3 Tables allows properties
385+
for name, value in table_properties.items():
386+
properties.append(exp.Property(this=exp.var(name), value=value))
313387

314388
if properties:
315389
return exp.Properties(expressions=properties)
@@ -613,7 +687,7 @@ def _boto3_client(self, name: str) -> t.Any:
613687
conn = self.connection
614688
return conn.session.client(
615689
name,
616-
region_name=conn.region_name,
690+
region_name=conn.region_̀name,
617691
config=conn.config,
618692
**conn._client_kwargs,
619693
) # type: ignore

0 commit comments

Comments
 (0)