Skip to content

Commit f96af04

Browse files
jnh5yclaude
andcommitted
[FLINK-39421] Metadata filter push-down for table sources
Add dedicated metadata filter push-down path through SupportsReadingMetadata. Metadata predicates are classified separately from physical predicates and pushed via applyMetadataFilters() with a dedicated MetadataFilterResult type. MetadataFilterPushDownSpec stores a pre-translated predicateRowType using metadata key names (not SQL aliases), making the serialized spec self-contained without needing a column-to-key mapping. Generated-by: Claude Code (claude-opus-4-6) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6e3768d commit f96af04

8 files changed

Lines changed: 1044 additions & 35 deletions

File tree

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.apache.flink.table.connector.source.ScanTableSource;
2626
import org.apache.flink.table.data.RowData;
2727
import org.apache.flink.table.data.utils.JoinedRowData;
28+
import org.apache.flink.table.expressions.ResolvedExpression;
2829
import org.apache.flink.table.factories.Factory;
2930
import org.apache.flink.table.types.DataType;
3031
import org.apache.flink.table.types.logical.LogicalType;
3132
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
3233

34+
import java.util.Collections;
3335
import java.util.LinkedHashMap;
3436
import java.util.List;
3537
import java.util.Map;
@@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
154156
default boolean supportsMetadataProjection() {
155157
return true;
156158
}
159+
160+
/**
161+
* Whether this source supports filtering on metadata columns.
162+
*
163+
* <p>When this method returns {@code true}, the planner may call {@link
164+
* #applyMetadataFilters(List)} during optimization with predicates expressed in metadata key
165+
* names (from {@link #listReadableMetadata()}), not SQL column aliases. Sources that do not
166+
* override this method will not receive metadata filter predicates.
167+
*
168+
* <p>This is independent of {@link SupportsFilterPushDown}, which handles physical column
169+
* predicates. A source can implement both to accept filters on physical and metadata columns.
170+
*/
171+
default boolean supportsMetadataFilterPushDown() {
172+
return false;
173+
}
174+
175+
/**
176+
* Provides a list of metadata filters in conjunctive form. A source can pick filters and return
177+
* the accepted and remaining filters. Same contract as {@link
178+
* SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
179+
*
180+
* <p>The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
181+
* not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
182+
* FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
183+
* msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
184+
* method.
185+
*/
186+
default MetadataFilterResult applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
187+
return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
188+
}
189+
190+
/**
191+
* Result of a metadata filter push down. Communicates the source's response to the planner
192+
* during optimization.
193+
*/
194+
@PublicEvolving
195+
final class MetadataFilterResult {
196+
private final List<ResolvedExpression> acceptedFilters;
197+
private final List<ResolvedExpression> remainingFilters;
198+
199+
private MetadataFilterResult(
200+
List<ResolvedExpression> acceptedFilters,
201+
List<ResolvedExpression> remainingFilters) {
202+
this.acceptedFilters = acceptedFilters;
203+
this.remainingFilters = remainingFilters;
204+
}
205+
206+
/**
207+
* Constructs a metadata filter push-down result.
208+
*
209+
* @param acceptedFilters filters consumed by the source (best effort)
210+
* @param remainingFilters filters that a subsequent operation must still apply at runtime
211+
*/
212+
public static MetadataFilterResult of(
213+
List<ResolvedExpression> acceptedFilters,
214+
List<ResolvedExpression> remainingFilters) {
215+
return new MetadataFilterResult(acceptedFilters, remainingFilters);
216+
}
217+
218+
public List<ResolvedExpression> getAcceptedFilters() {
219+
return acceptedFilters;
220+
}
221+
222+
public List<ResolvedExpression> getRemainingFilters() {
223+
return remainingFilters;
224+
}
225+
}
157226
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.abilities.source;
20+
21+
import org.apache.flink.table.api.TableException;
22+
import org.apache.flink.table.connector.source.DynamicTableSource;
23+
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
24+
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
25+
import org.apache.flink.table.expressions.Expression;
26+
import org.apache.flink.table.expressions.ResolvedExpression;
27+
import org.apache.flink.table.expressions.resolver.ExpressionResolver;
28+
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
29+
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
30+
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
31+
import org.apache.flink.table.types.logical.RowType;
32+
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
35+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
36+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
37+
38+
import org.apache.calcite.rex.RexBuilder;
39+
import org.apache.calcite.rex.RexNode;
40+
41+
import java.util.ArrayList;
42+
import java.util.List;
43+
import java.util.Objects;
44+
import java.util.Optional;
45+
import java.util.stream.Collectors;
46+
47+
import scala.Option;
48+
49+
import static org.apache.flink.util.Preconditions.checkNotNull;
50+
51+
/**
52+
* Serializes metadata filter predicates and replays them during compiled plan restoration.
53+
*
54+
* <p>Predicates are stored with a {@code predicateRowType} that already uses metadata key names
55+
* (not SQL aliases). The alias-to-key translation happens once at optimization time, so no
56+
* column-to-key mapping needs to be persisted.
57+
*/
58+
@JsonIgnoreProperties(ignoreUnknown = true)
59+
@JsonTypeName("MetadataFilterPushDown")
60+
public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
61+
62+
public static final String FIELD_NAME_PREDICATES = "predicates";
63+
public static final String FIELD_NAME_PREDICATE_ROW_TYPE = "predicateRowType";
64+
65+
@JsonProperty(FIELD_NAME_PREDICATES)
66+
private final List<RexNode> predicates;
67+
68+
/**
69+
* Row type snapshot using metadata key names. Stored because ProjectPushDownSpec may narrow the
70+
* context's row type during restore.
71+
*/
72+
@JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
73+
private final RowType predicateRowType;
74+
75+
@JsonCreator
76+
public MetadataFilterPushDownSpec(
77+
@JsonProperty(FIELD_NAME_PREDICATES) List<RexNode> predicates,
78+
@JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType predicateRowType) {
79+
this.predicates = new ArrayList<>(checkNotNull(predicates));
80+
this.predicateRowType = checkNotNull(predicateRowType);
81+
}
82+
83+
public List<RexNode> getPredicates() {
84+
return predicates;
85+
}
86+
87+
@Override
88+
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
89+
// Use stored predicateRowType; context's row type may be narrowed by ProjectPushDownSpec.
90+
MetadataFilterResult result =
91+
applyMetadataFilters(predicates, predicateRowType, tableSource, context);
92+
if (result.getAcceptedFilters().size() != predicates.size()) {
93+
throw new TableException("All metadata predicates should be accepted here.");
94+
}
95+
}
96+
97+
/**
98+
* Converts RexNode predicates to ResolvedExpressions using the given row type and calls
99+
* applyMetadataFilters on the source. The row type must already use metadata key names.
100+
*/
101+
public static MetadataFilterResult applyMetadataFilters(
102+
List<RexNode> predicates,
103+
RowType metadataKeyRowType,
104+
DynamicTableSource tableSource,
105+
SourceAbilityContext context) {
106+
if (!(tableSource instanceof SupportsReadingMetadata)) {
107+
throw new TableException(
108+
String.format(
109+
"%s does not support SupportsReadingMetadata.",
110+
tableSource.getClass().getName()));
111+
}
112+
113+
String[] fieldNames = metadataKeyRowType.getFieldNames().toArray(new String[0]);
114+
115+
RexNodeToExpressionConverter converter =
116+
new RexNodeToExpressionConverter(
117+
new RexBuilder(context.getTypeFactory()),
118+
fieldNames,
119+
context.getFunctionCatalog(),
120+
context.getCatalogManager(),
121+
Option.apply(
122+
context.getTypeFactory().buildRelNodeRowType(metadataKeyRowType)));
123+
124+
List<Expression> filters =
125+
predicates.stream()
126+
.map(
127+
p -> {
128+
scala.Option<ResolvedExpression> expr = p.accept(converter);
129+
if (expr.isDefined()) {
130+
return expr.get();
131+
} else {
132+
throw new TableException(
133+
String.format(
134+
"%s can not be converted to Expression for metadata filter push-down.",
135+
p.toString()));
136+
}
137+
})
138+
.collect(Collectors.toList());
139+
140+
ExpressionResolver resolver =
141+
ExpressionResolver.resolverFor(
142+
context.getTableConfig(),
143+
context.getClassLoader(),
144+
name -> Optional.empty(),
145+
context.getFunctionCatalog()
146+
.asLookup(
147+
str -> {
148+
throw new TableException(
149+
"We should not need to lookup any expressions at this point");
150+
}),
151+
context.getCatalogManager().getDataTypeFactory(),
152+
(sqlExpression, inputRowType, outputType) -> {
153+
throw new TableException(
154+
"SQL expression parsing is not supported at this location.");
155+
})
156+
.build();
157+
158+
return ((SupportsReadingMetadata) tableSource)
159+
.applyMetadataFilters(resolver.resolve(filters));
160+
}
161+
162+
@Override
163+
public boolean needAdjustFieldReferenceAfterProjection() {
164+
return true;
165+
}
166+
167+
@Override
168+
public String getDigests(SourceAbilityContext context) {
169+
final List<String> expressionStrs = new ArrayList<>();
170+
for (RexNode rexNode : predicates) {
171+
expressionStrs.add(
172+
FlinkRexUtil.getExpressionString(
173+
rexNode,
174+
JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
175+
}
176+
177+
return String.format(
178+
"metadataFilter=[%s]",
179+
expressionStrs.stream()
180+
.reduce((l, r) -> String.format("and(%s, %s)", l, r))
181+
.orElse(""));
182+
}
183+
184+
@Override
185+
public boolean equals(Object o) {
186+
if (this == o) {
187+
return true;
188+
}
189+
if (o == null || getClass() != o.getClass()) {
190+
return false;
191+
}
192+
if (!super.equals(o)) {
193+
return false;
194+
}
195+
MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
196+
return Objects.equals(predicates, that.predicates)
197+
&& Objects.equals(predicateRowType, that.predicateRowType);
198+
}
199+
200+
@Override
201+
public int hashCode() {
202+
return Objects.hash(super.hashCode(), predicates, predicateRowType);
203+
}
204+
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
@JsonSubTypes({
3939
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
4040
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
41+
@JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
4142
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
4243
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
4344
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),

0 commit comments

Comments
 (0)