Skip to content

Commit 070f0fc

Browse files
committed
refactor partition
1 parent 4fc054f commit 070f0fc

2 files changed

Lines changed: 296 additions & 68 deletions

File tree

sqlmesh/core/engine_adapter/doris.py

Lines changed: 133 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,51 @@ def _create_table_from_columns(
486486
**kwargs,
487487
)
488488

489+
def _parse_partition_expressions(
490+
self, partitioned_by: t.List[exp.Expression]
491+
) -> t.Tuple[t.List[exp.Expression], t.Optional[str]]:
492+
"""Parse partition expressions and extract partition kind and normalized columns.
493+
494+
Returns:
495+
Tuple of (normalized_partitioned_by, partition_kind)
496+
"""
497+
parsed_partitioned_by: t.List[exp.Expression] = []
498+
partition_kind: t.Optional[str] = None
499+
500+
for expr in partitioned_by:
501+
try:
502+
# Handle Anonymous function calls like RANGE(col) or LIST(col)
503+
if isinstance(expr, exp.Anonymous) and expr.this:
504+
func_name = str(expr.this).upper()
505+
if func_name in ("RANGE", "LIST"):
506+
partition_kind = func_name
507+
# Extract column expressions from function arguments
508+
for arg in expr.expressions:
509+
if isinstance(arg, exp.Column):
510+
parsed_partitioned_by.append(arg)
511+
else:
512+
# Convert other expressions to columns if possible
513+
parsed_partitioned_by.append(exp.to_column(str(arg)))
514+
continue
515+
516+
# Handle literal strings like "RANGE(col)" or "LIST(col)"
517+
if isinstance(expr, exp.Literal) and getattr(expr, "is_string", False):
518+
text = str(expr.this)
519+
match = re.match(r"^\s*(RANGE|LIST)\s*\((.*?)\)\s*$", text, flags=re.IGNORECASE)
520+
if match:
521+
partition_kind = match.group(1).upper()
522+
inner = match.group(2)
523+
inner_cols = [c.strip().strip("`") for c in inner.split(",") if c.strip()]
524+
for col in inner_cols:
525+
parsed_partitioned_by.append(exp.to_column(col))
526+
continue
527+
except Exception:
528+
# If anything goes wrong, keep the original expr
529+
pass
530+
parsed_partitioned_by.append(expr)
531+
532+
return parsed_partitioned_by, partition_kind
533+
489534
def _build_partitioned_by_exp(
490535
self,
491536
partitioned_by: t.List[exp.Expression],
@@ -494,8 +539,21 @@ def _build_partitioned_by_exp(
494539
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
495540
catalog_name: t.Optional[str] = None,
496541
**kwargs: t.Any,
497-
) -> t.Optional[t.Union[exp.PartitionedByProperty, exp.PartitionByRangeProperty, exp.Property]]:
498-
"""Doris supports range and list partition, but sqlglot only supports range partition."""
542+
) -> t.Optional[
543+
t.Union[
544+
exp.PartitionedByProperty,
545+
exp.PartitionByRangeProperty,
546+
exp.PartitionByListProperty,
547+
exp.Property,
548+
]
549+
]:
550+
"""Build Doris partitioning expression.
551+
552+
Supports both RANGE and LIST partition syntaxes using sqlglot's doris dialect nodes.
553+
The partition kind is chosen by:
554+
- inferred from partitioned_by expressions like 'RANGE(col)' or 'LIST(col)'
555+
- otherwise inferred from the provided 'partitions' strings: if any contains 'VALUES IN' -> LIST; else RANGE.
556+
"""
499557
partitions = kwargs.get("partitions")
500558
create_expressions = None
501559

@@ -512,6 +570,9 @@ def to_raw_sql(expr: t.Union[exp.Literal, exp.Var, str, t.Any]) -> exp.Var:
512570
# Fallback: return as is
513571
return expr
514572

573+
# Parse partition kind and columns from partitioned_by expressions
574+
partitioned_by, partition_kind = self._parse_partition_expressions(partitioned_by)
575+
515576
if partitions:
516577
if isinstance(partitions, exp.Tuple):
517578
create_expressions = [
@@ -525,10 +586,40 @@ def to_raw_sql(expr: t.Union[exp.Literal, exp.Var, str, t.Any]) -> exp.Var:
525586
else:
526587
create_expressions = [to_raw_sql(partitions)]
527588

528-
return exp.PartitionByRangeProperty(
529-
partition_expressions=partitioned_by,
530-
create_expressions=create_expressions,
531-
)
589+
# Infer partition kind from partitions text if not explicitly provided
590+
inferred_list = False
591+
if partition_kind is None and create_expressions:
592+
try:
593+
texts = [getattr(e, "this", "").upper() for e in create_expressions]
594+
inferred_list = any("VALUES IN" in t for t in texts)
595+
except Exception:
596+
inferred_list = False
597+
if partition_kind:
598+
kind_upper = str(partition_kind).upper()
599+
is_list = kind_upper == "LIST"
600+
else:
601+
is_list = inferred_list
602+
603+
try:
604+
if is_list:
605+
return exp.PartitionByListProperty(
606+
partition_expressions=partitioned_by,
607+
create_expressions=create_expressions,
608+
)
609+
return exp.PartitionByRangeProperty(
610+
partition_expressions=partitioned_by,
611+
create_expressions=create_expressions,
612+
)
613+
except TypeError:
614+
if is_list:
615+
return exp.PartitionByListProperty(
616+
partition_expressions=partitioned_by,
617+
create_expressions=create_expressions,
618+
)
619+
return exp.PartitionByRangeProperty(
620+
partition_expressions=partitioned_by,
621+
create_expressions=create_expressions,
622+
)
532623

533624
def _build_table_properties_exp(
534625
self,
@@ -757,46 +848,43 @@ def _parse_trigger_string(
757848
)
758849

759850
# Handle duplicate_key - only handle Tuple expressions or single Column expressions
851+
# Both tables and materialized views support duplicate keys in Doris
760852
duplicate_key = table_properties_copy.pop("duplicate_key", None)
761853
if duplicate_key is not None:
762-
if not is_materialized_view:
763-
if isinstance(duplicate_key, exp.Tuple):
764-
# Extract column names from Tuple expressions
765-
column_names = []
766-
for expr in duplicate_key.expressions:
767-
if (
768-
isinstance(expr, exp.Column)
769-
and hasattr(expr, "this")
770-
and hasattr(expr.this, "this")
771-
):
772-
column_names.append(str(expr.this.this))
773-
elif hasattr(expr, "this"):
774-
column_names.append(str(expr.this))
775-
else:
776-
column_names.append(str(expr))
777-
properties.append(
778-
exp.DuplicateKeyProperty(
779-
expressions=[exp.to_column(k) for k in column_names]
780-
)
781-
)
782-
elif isinstance(duplicate_key, exp.Column):
783-
# Handle as single column
784-
if hasattr(duplicate_key, "this") and hasattr(duplicate_key.this, "this"):
785-
column_name = str(duplicate_key.this.this)
854+
if isinstance(duplicate_key, exp.Tuple):
855+
# Extract column names from Tuple expressions
856+
column_names = []
857+
for expr in duplicate_key.expressions:
858+
if (
859+
isinstance(expr, exp.Column)
860+
and hasattr(expr, "this")
861+
and hasattr(expr.this, "this")
862+
):
863+
column_names.append(str(expr.this.this))
864+
elif hasattr(expr, "this"):
865+
column_names.append(str(expr.this))
786866
else:
787-
column_name = str(duplicate_key.this)
788-
properties.append(
789-
exp.DuplicateKeyProperty(expressions=[exp.to_column(column_name)])
790-
)
791-
elif isinstance(duplicate_key, exp.Literal):
792-
properties.append(
793-
exp.DuplicateKeyProperty(expressions=[exp.to_column(duplicate_key.this)])
794-
)
795-
elif isinstance(duplicate_key, str):
796-
properties.append(
797-
exp.DuplicateKeyProperty(expressions=[exp.to_column(duplicate_key)])
798-
)
799-
# Note: Materialized views don't typically use duplicate_key, so we skip it
867+
column_names.append(str(expr))
868+
properties.append(
869+
exp.DuplicateKeyProperty(expressions=[exp.to_column(k) for k in column_names])
870+
)
871+
elif isinstance(duplicate_key, exp.Column):
872+
# Handle as single column
873+
if hasattr(duplicate_key, "this") and hasattr(duplicate_key.this, "this"):
874+
column_name = str(duplicate_key.this.this)
875+
else:
876+
column_name = str(duplicate_key.this)
877+
properties.append(
878+
exp.DuplicateKeyProperty(expressions=[exp.to_column(column_name)])
879+
)
880+
elif isinstance(duplicate_key, exp.Literal):
881+
properties.append(
882+
exp.DuplicateKeyProperty(expressions=[exp.to_column(duplicate_key.this)])
883+
)
884+
elif isinstance(duplicate_key, str):
885+
properties.append(
886+
exp.DuplicateKeyProperty(expressions=[exp.to_column(duplicate_key)])
887+
)
800888

801889
if table_description:
802890
properties.append(
@@ -808,30 +896,8 @@ def _parse_trigger_string(
808896
# Handle partitioning
809897
add_partition = True
810898
if partitioned_by:
811-
normalized_partitioned_by: t.List[exp.Expression] = []
812-
for expr in partitioned_by:
813-
try:
814-
# Handle literal strings like "RANGE(col)" or "LIST(col)"
815-
if isinstance(expr, exp.Literal) and getattr(expr, "is_string", False):
816-
text = str(expr.this)
817-
match = re.match(
818-
r"^\s*(RANGE|LIST)\s*\((.*?)\)\s*$", text, flags=re.IGNORECASE
819-
)
820-
if match:
821-
inner = match.group(2)
822-
inner_cols = [
823-
c.strip().strip("`") for c in inner.split(",") if c.strip()
824-
]
825-
for col in inner_cols:
826-
normalized_partitioned_by.append(exp.to_column(col))
827-
continue
828-
except Exception:
829-
# If anything goes wrong, keep the original expr
830-
pass
831-
normalized_partitioned_by.append(expr)
832-
833-
# Replace with normalized expressions
834-
partitioned_by = normalized_partitioned_by
899+
# Parse and normalize partition expressions
900+
partitioned_by, _ = self._parse_partition_expressions(partitioned_by)
835901
# For tables, check if partitioned_by columns are in unique_key; for materialized views, allow regardless
836902
if unique_key is not None and not is_materialized_view:
837903
# Extract key column names from unique_key (only Tuple or Column expressions)

0 commit comments

Comments
 (0)