From eeb5167e047c51a9ff6eed828ad363337c75d41b Mon Sep 17 00:00:00 2001 From: xuba Date: Thu, 8 Jan 2026 14:31:06 +0800 Subject: [PATCH 01/12] Refactor flink module for JDK 17 support Add JDK 17 compatibility --- .github/workflows/core-hadoop2-ci.yml | 2 +- .github/workflows/core-hadoop3-ci.yml | 2 +- amoro-format-hudi/pom.xml | 2 - .../amoro-mixed-flink-common/pom.xml | 11 +- .../KerberosInvocationHandler.java | 1 - .../flink/lookup/BasicLookupFunction.java | 19 +- .../reader/MixedFormatSourceReader.java | 51 ++-- .../apache/amoro/flink/table/FlinkSource.java | 33 ++- .../flink/table/KerberosAwareInputFormat.java | 165 ++++++++++++ .../flink/table/MixedFormatTableLoader.java | 247 +++++++++++++++++- .../UnkeyedInputFormatOperatorFactory.java | 8 +- .../table/UnkeyedStreamingReaderOperator.java | 167 ++++++++++++ .../flink/util/FlinkClassReflectionUtil.java | 65 ----- .../amoro/flink/util/IcebergClassUtil.java | 107 +++----- .../apache/amoro/flink/write/FlinkSink.java | 8 +- .../amoro/flink/table/TestWatermark.java | 66 +---- .../write/TestMixedFormatFileWriter.java | 76 +++--- amoro-format-mixed/amoro-mixed-trino/pom.xml | 1 - amoro-format-paimon/pom.xml | 9 + pom.xml | 66 ++++- 20 files changed, 815 insertions(+), 291 deletions(-) create mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java create mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java delete mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java diff --git a/.github/workflows/core-hadoop2-ci.yml b/.github/workflows/core-hadoop2-ci.yml index ad891366c5..b292c044fa 100644 --- a/.github/workflows/core-hadoop2-ci.yml +++ b/.github/workflows/core-hadoop2-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' ] + jdk: [ '11' , '17' ] name: Build Amoro with JDK ${{ matrix.jdk }} steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index e93a32aa4f..a6105ebf8a 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' ] + jdk: [ '11' '17'] spark: [ '3.3','3.4', '3.5' ] name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: diff --git a/amoro-format-hudi/pom.xml b/amoro-format-hudi/pom.xml index ace84c0a17..7e40a26959 100644 --- a/amoro-format-hudi/pom.xml +++ b/amoro-format-hudi/pom.xml @@ -30,8 +30,6 @@ Amoro Project Hudi Format - 8 - 8 UTF-8 diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml index 60d5d7b364..44b7277abc 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml @@ -324,6 +324,14 @@ test + + org.apache.amoro + amoro-common + ${project.version} + test-jar + test + + org.apache.amoro amoro-mixed-hive @@ -422,7 +430,8 @@ org.apache.amoro.listener.AmoroRunListener - -verbose:class + ${surefire.excludedGroups.jdk} + ${amoro.surefire.baseArgLine} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java index 25dce7fb0d..7349a61cba 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java @@ -56,7 +56,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl authenticatedFileIO.doAs( () -> { try { - method.setAccessible(true); return method.invoke(obj, args); } catch (Throwable e) { throw new RuntimeException(e); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java index 114245de93..3acce39b1c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java @@ -30,7 +30,6 @@ import org.apache.amoro.table.MixedTable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.util.FlinkRuntimeException; @@ -45,12 +44,10 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -244,20 +241,16 @@ private void checkErrorAndRethrow() { } private String generateRocksDBPath(FunctionContext context, String tableName) { - String tmpPath = getTmpDirectoryFromTMContainer(context); + String tmpPath = getTmpDirectory(context); File db = new File(tmpPath, tableName + "-lookup-" + UUID.randomUUID()); return db.toString(); } - private static String getTmpDirectoryFromTMContainer(FunctionContext context) { - try { - Field field = context.getClass().getDeclaredField("context"); - field.setAccessible(true); - StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) field.get(context); - String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories(); - return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)]; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); + private static String getTmpDirectory(FunctionContext context) { + String configuredTmpDir = context.getJobParameter("java.io.tmpdir", null); + if (configuredTmpDir != null && !configuredTmpDir.isEmpty()) { + return configuredTmpDir; } + return System.getProperty("java.io.tmpdir"); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java index caa7b6f837..654100d31f 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java @@ -23,11 +23,9 @@ import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit; import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplitState; import org.apache.amoro.flink.read.hybrid.split.SplitRequestEvent; -import org.apache.amoro.flink.util.FlinkClassReflectionUtil; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceOutput; @@ -35,13 +33,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks; import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -132,25 +130,21 @@ public ReaderOutput wrapOutput(ReaderOutput output) { return new MixedFormatReaderOutput<>(output); } - /** - * There is a case that the watermark in {@link WatermarkOutputMultiplexer.OutputState} has been - * updated, but watermark has not been emitted for that when {@link - * WatermarkOutputMultiplexer#onPeriodicEmit} called, the outputState has been removed by {@link - * WatermarkOutputMultiplexer#unregisterOutput(String)} after split finished. Wrap {@link - * ReaderOutput} to call {@link - * ProgressiveTimestampsAndWatermarks.SplitLocalOutputs#emitPeriodicWatermark()} when split - * finishes. - */ + /** Wrap split outputs so we can flush any pending periodic watermark before release. */ static class MixedFormatReaderOutput implements ReaderOutput { private final ReaderOutput internal; + private final SourceOutputWithWatermarks watermarkOutput; + private final Map> splitOutputs = new HashMap<>(); + @SuppressWarnings("unchecked") public MixedFormatReaderOutput(ReaderOutput readerOutput) { Preconditions.checkArgument( readerOutput instanceof SourceOutputWithWatermarks, "readerOutput should be SourceOutputWithWatermarks, but was %s", readerOutput.getClass()); this.internal = readerOutput; + this.watermarkOutput = (SourceOutputWithWatermarks) readerOutput; } @Override @@ -180,14 +174,41 @@ public void markActive() { @Override public SourceOutput createOutputForSplit(String splitId) { - return internal.createOutputForSplit(splitId); + SourceOutput splitOutput = internal.createOutputForSplit(splitId); + splitOutputs.put(splitId, splitOutput); + return splitOutput; } @Override public void releaseOutputForSplit(String splitId) { - Object splitLocalOutput = FlinkClassReflectionUtil.getSplitLocalOutput(internal); - FlinkClassReflectionUtil.emitPeriodWatermark(splitLocalOutput); + emitPeriodicWatermark(splitOutputs.remove(splitId)); internal.releaseOutputForSplit(splitId); } + + private void emitPeriodicWatermark(SourceOutput splitOutput) { + if (splitOutput == null) { + return; + } + + if (splitOutput instanceof SourceOutputWithWatermarks) { + ((SourceOutputWithWatermarks) splitOutput).emitPeriodicWatermark(); + return; + } + + try { + java.lang.reflect.Method method = + splitOutput.getClass().getDeclaredMethod("emitPeriodicWatermark"); + method.setAccessible(true); + method.invoke(splitOutput); + return; + } catch (ReflectiveOperationException e) { + LOGGER.debug( + "Failed to invoke emitPeriodicWatermark on split output {}, fallback to reader output", + splitOutput.getClass(), + e); + } + + watermarkOutput.emitPeriodicWatermark(); + } } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java index 3e4080e8a8..2d0f5fe44a 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -49,7 +48,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; @@ -243,37 +244,55 @@ public DataStream buildUnkeyedTableSource(String scanStartupMode) { .properties(properties) .flinkConf(flinkConf) .limit(limit); + Long startSnapshotId = null; if (MixedFormatValidator.SCAN_STARTUP_MODE_LATEST.equalsIgnoreCase(scanStartupMode)) { Optional startSnapshotOptional = Optional.ofNullable(tableLoader.loadTable().currentSnapshot()); if (startSnapshotOptional.isPresent()) { Snapshot snapshot = startSnapshotOptional.get(); + startSnapshotId = snapshot.snapshotId(); LOG.info( "Get starting snapshot id {} based on scan startup mode {}", snapshot.snapshotId(), scanStartupMode); - builder.startSnapshotId(snapshot.snapshotId()); + builder.startSnapshotId(startSnapshotId); } } DataStream origin = builder.build(); - return wrapKrb(origin).assignTimestampsAndWatermarks(watermarkStrategy); + return wrapKrb(origin, startSnapshotId).assignTimestampsAndWatermarks(watermarkStrategy); } /** extract op from dataStream, and wrap krb support */ - private DataStream wrapKrb(DataStream ds) { + private DataStream wrapKrb(DataStream ds, Long startSnapshotId) { IcebergClassUtil.clean(env); Transformation origin = ds.getTransformation(); int scanParallelism = flinkConf .getOptional(MixedFormatValidator.SCAN_PARALLELISM) .orElse(origin.getParallelism()); + Table table = mixedTable.asUnkeyedTable(); + Schema projectedIcebergSchema = + projectedSchema == null + ? mixedTable.schema() + : FlinkSchemaUtil.convert( + mixedTable.schema(), + org.apache.amoro.flink.FlinkSchemaUtil.filterWatermark(projectedSchema)); if (origin instanceof OneInputTransformation) { OneInputTransformation tf = (OneInputTransformation) ds.getTransformation(); - OneInputStreamOperatorFactory op = (OneInputStreamOperatorFactory) tf.getOperatorFactory(); ProxyFactory inputFormatProxyFactory = - IcebergClassUtil.getInputFormatProxyFactory(op, mixedTable.io(), mixedTable.schema()); + IcebergClassUtil.getInputFormatProxyFactory( + tableLoader, + table, + mixedTable.io(), + mixedTable.schema(), + projectedIcebergSchema, + flinkConf, + properties, + filters, + limit, + startSnapshotId); if (tf.getInputs().isEmpty()) { return env.addSource( @@ -305,7 +324,7 @@ private DataStream wrapKrb(DataStream ds) { (InputFormatSourceFunction) IcebergClassUtil.getSourceFunction(source); InputFormat inputFormatProxy = - (InputFormat) ProxyUtil.getProxy(function.getFormat(), mixedTable.io()); + new KerberosAwareInputFormat<>(function.getFormat(), mixedTable.io()); DataStreamSource sourceStream = env.createInput(inputFormatProxy, tfSource.getOutputType()) .setParallelism(scanParallelism); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java new file mode 100644 index 0000000000..92d3687d99 --- /dev/null +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.flink.table; + +import org.apache.amoro.io.AuthenticatedFileIO; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * A concrete {@link InputFormat} wrapper that runs delegate calls inside {@link + * AuthenticatedFileIO#doAs(Callable)} without using JDK dynamic proxies. + */ +public class KerberosAwareInputFormat extends RichInputFormat { + private static final long serialVersionUID = 1L; + + private final InputFormat delegate; + private final AuthenticatedFileIO authenticatedFileIO; + + public KerberosAwareInputFormat( + InputFormat delegate, AuthenticatedFileIO authenticatedFileIO) { + this.delegate = delegate; + this.authenticatedFileIO = authenticatedFileIO; + } + + @Override + public void configure(org.apache.flink.configuration.Configuration parameters) { + authenticatedFileIO.doAs( + () -> { + delegate.configure(parameters); + return null; + }); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + try { + return authenticatedFileIO.doAs(() -> delegate.getStatistics(cachedStatistics)); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public T[] createInputSplits(int minNumSplits) throws IOException { + try { + return authenticatedFileIO.doAs(() -> delegate.createInputSplits(minNumSplits)); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(T[] inputSplits) { + return authenticatedFileIO.doAs(() -> delegate.getInputSplitAssigner(inputSplits)); + } + + @Override + public void open(T split) throws IOException { + try { + authenticatedFileIO.doAs( + () -> { + delegate.open(split); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public boolean reachedEnd() throws IOException { + try { + return authenticatedFileIO.doAs(delegate::reachedEnd); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public OT nextRecord(OT reuse) throws IOException { + try { + return authenticatedFileIO.doAs(() -> delegate.nextRecord(reuse)); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public void close() throws IOException { + try { + authenticatedFileIO.doAs( + () -> { + delegate.close(); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public void openInputFormat() throws IOException { + if (!(delegate instanceof RichInputFormat)) { + return; + } + RichInputFormat richInputFormat = (RichInputFormat) delegate; + richInputFormat.setRuntimeContext(getRuntimeContext()); + try { + authenticatedFileIO.doAs( + () -> { + richInputFormat.openInputFormat(); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + @Override + public void closeInputFormat() throws IOException { + if (!(delegate instanceof RichInputFormat)) { + return; + } + RichInputFormat richInputFormat = (RichInputFormat) delegate; + try { + authenticatedFileIO.doAs( + () -> { + richInputFormat.closeInputFormat(); + return null; + }); + } catch (RuntimeException e) { + throw unwrapIOException(e); + } + } + + private IOException unwrapIOException(RuntimeException exception) { + Throwable cause = exception.getCause(); + if (cause instanceof IOException) { + return (IOException) cause; + } + throw exception; + } +} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java index d7282739fb..d8a79243c7 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java @@ -21,14 +21,33 @@ import org.apache.amoro.flink.InternalCatalogBuilder; import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions; import org.apache.amoro.flink.interceptor.FlinkTablePropertiesInvocationHandler; +import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; +import org.apache.amoro.table.BasicKeyedTable; +import org.apache.amoro.table.BasicUnkeyedTable; +import org.apache.amoro.table.ChangeTable; +import org.apache.amoro.table.KeyedTable; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; +import org.apache.amoro.table.UnkeyedTable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.StructLikeMap; import java.io.IOException; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -93,7 +112,7 @@ protected MixedFormatTableLoader( Boolean loadBaseForKeyedTable) { this.catalogBuilder = catalogBuilder; this.tableIdentifier = tableIdentifier; - this.flinkTableProperties = flinkTableProperties; + this.flinkTableProperties = new HashMap<>(flinkTableProperties); this.loadBaseForKeyedTable = loadBaseForKeyedTable == null || loadBaseForKeyedTable; } @@ -108,10 +127,8 @@ public boolean isOpen() { } public MixedTable loadMixedFormatTable() { - return ((MixedTable) - new FlinkTablePropertiesInvocationHandler( - flinkTableProperties, mixedFormatCatalog.loadTable(tableIdentifier)) - .getProxy()); + MixedTable table = mixedFormatCatalog.loadTable(tableIdentifier); + return wrapWithFlinkTableProperties(table, flinkTableProperties); } public void switchLoadInternalTableForKeyedTable(boolean loadBaseForKeyedTable) { @@ -142,6 +159,13 @@ public TableLoader clone() { tableIdentifier, catalogBuilder, flinkTableProperties, loadBaseForKeyedTable); } + public MixedFormatTableLoader copyWithFlinkTableProperties(Map extraProperties) { + Map merged = new HashMap<>(flinkTableProperties); + merged.putAll(extraProperties); + return new MixedFormatTableLoader( + tableIdentifier, catalogBuilder, merged, loadBaseForKeyedTable); + } + @Override public void close() throws IOException {} @@ -149,4 +173,217 @@ public void close() throws IOException {} public String toString() { return MoreObjects.toStringHelper(this).add("tableIdentifier", tableIdentifier).toString(); } + + @VisibleForTesting + static MixedTable wrapWithFlinkTableProperties( + MixedTable mixedTable, Map flinkTableProperties) { + if (flinkTableProperties == null || flinkTableProperties.isEmpty()) { + return mixedTable; + } + + if (mixedTable instanceof SupportHive) { + return (MixedTable) + new FlinkTablePropertiesInvocationHandler(flinkTableProperties, mixedTable).getProxy(); + } + + if (mixedTable.isUnkeyedTable()) { + return new FlinkTablePropertiesUnkeyedTable( + mixedTable.asUnkeyedTable(), flinkTableProperties); + } + + if (mixedTable.isKeyedTable()) { + return new FlinkTablePropertiesKeyedTable(mixedTable.asKeyedTable(), flinkTableProperties); + } + + return mixedTable; + } + + private static class FlinkTablePropertiesSupport implements Serializable { + private static final long serialVersionUID = 1L; + + protected final Map flinkTableProperties; + + protected FlinkTablePropertiesSupport(Map flinkTableProperties) { + this.flinkTableProperties = new HashMap<>(flinkTableProperties); + } + + protected Map withFlinkTableProperties(Map tableProperties) { + Map merged = new HashMap<>(tableProperties); + merged.putAll(flinkTableProperties); + return merged; + } + } + + private static class FlinkTablePropertiesUnkeyedTable extends BasicUnkeyedTable + implements UnkeyedTable { + private static final long serialVersionUID = 1L; + + private final UnkeyedTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesUnkeyedTable( + UnkeyedTable delegate, Map flinkTableProperties) { + super(delegate.id(), delegate, delegate.io(), null); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + + @Override + public UpdateSchema updateSchema() { + return delegate.updateSchema(); + } + + @Override + public AppendFiles newAppend() { + return delegate.newAppend(); + } + + @Override + public AppendFiles newFastAppend() { + return delegate.newFastAppend(); + } + + @Override + public RewriteFiles newRewrite() { + return delegate.newRewrite(); + } + + @Override + public OverwriteFiles newOverwrite() { + return delegate.newOverwrite(); + } + + @Override + public RowDelta newRowDelta() { + return delegate.newRowDelta(); + } + + @Override + public ReplacePartitions newReplacePartitions() { + return delegate.newReplacePartitions(); + } + + @Override + public DeleteFiles newDelete() { + return delegate.newDelete(); + } + + @Override + public ExpireSnapshots expireSnapshots() { + return delegate.expireSnapshots(); + } + + @Override + public ManageSnapshots manageSnapshots() { + return delegate.manageSnapshots(); + } + + @Override + public Transaction newTransaction() { + return delegate.newTransaction(); + } + + @Override + public StructLikeMap> partitionProperty() { + return delegate.partitionProperty(); + } + + @Override + public org.apache.amoro.op.UpdatePartitionProperties updatePartitionProperties( + Transaction transaction) { + return delegate.updatePartitionProperties(transaction); + } + } + + private static class FlinkTablePropertiesKeyedTable extends BasicKeyedTable + implements KeyedTable { + private static final long serialVersionUID = 1L; + + private final KeyedTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesKeyedTable( + KeyedTable delegate, Map flinkTableProperties) { + super( + delegate.location(), + delegate.primaryKeySpec(), + new FlinkTablePropertiesBaseTable(delegate.baseTable(), flinkTableProperties), + new FlinkTablePropertiesChangeTable(delegate.changeTable(), flinkTableProperties)); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + } + + private static class FlinkTablePropertiesBaseTable extends BasicKeyedTable.BaseInternalTable { + private static final long serialVersionUID = 1L; + + private final UnkeyedTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesBaseTable( + UnkeyedTable delegate, Map flinkTableProperties) { + super(delegate.id(), delegate, delegate.io(), null); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + } + + private static class FlinkTablePropertiesChangeTable extends BasicKeyedTable.ChangeInternalTable { + private static final long serialVersionUID = 1L; + + private final ChangeTable delegate; + private final FlinkTablePropertiesSupport propertiesSupport; + + private FlinkTablePropertiesChangeTable( + ChangeTable delegate, Map flinkTableProperties) { + super(delegate.id(), delegate, delegate.io(), null); + this.delegate = delegate; + this.propertiesSupport = new FlinkTablePropertiesSupport(flinkTableProperties); + } + + @Override + public Map properties() { + return propertiesSupport.withFlinkTableProperties(delegate.properties()); + } + + @Override + public void refresh() { + delegate.refresh(); + } + + @Override + public org.apache.amoro.scan.ChangeTableIncrementalScan newScan() { + return delegate.newScan(); + } + } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java index 644ef1f807..ad0ce22694 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java @@ -19,7 +19,6 @@ package org.apache.amoro.flink.table; import org.apache.amoro.flink.interceptor.ProxyFactory; -import org.apache.amoro.flink.util.IcebergClassUtil; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; @@ -29,7 +28,6 @@ import org.apache.flink.table.data.RowData; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.flink.source.FlinkInputSplit; -import org.apache.iceberg.flink.source.StreamingReaderOperator; public class UnkeyedInputFormatOperatorFactory extends AbstractStreamOperatorFactory implements YieldingOperatorFactory, @@ -52,8 +50,8 @@ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { @Override public > O createStreamOperator( StreamOperatorParameters parameters) { - StreamingReaderOperator operator = - IcebergClassUtil.newStreamingReaderOperator( + UnkeyedStreamingReaderOperator operator = + new UnkeyedStreamingReaderOperator( factory.getInstance(), processingTimeService, mailboxExecutor); operator.setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); @@ -62,6 +60,6 @@ public > O createStreamOperator( @Override public Class getStreamOperatorClass(ClassLoader classLoader) { - return StreamingReaderOperator.class; + return UnkeyedStreamingReaderOperator.class; } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java new file mode 100644 index 0000000000..6cca6c776f --- /dev/null +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.flink.table; + +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.state.JavaSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamSourceContexts; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.function.ThrowingRunnable; +import org.apache.iceberg.flink.source.FlinkInputFormat; +import org.apache.iceberg.flink.source.FlinkInputSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; + +/** Minimal reader operator to avoid depending on Iceberg's non-public StreamingReaderOperator. */ +public class UnkeyedStreamingReaderOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(UnkeyedStreamingReaderOperator.class); + + private final MailboxExecutor executor; + private FlinkInputFormat format; + private transient SourceFunction.SourceContext sourceContext; + private transient ListState inputSplitsState; + private transient Queue splits; + private transient SplitState currentSplitState; + + public UnkeyedStreamingReaderOperator( + FlinkInputFormat format, ProcessingTimeService timeService, MailboxExecutor mailboxExecutor) { + this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null."); + this.processingTimeService = timeService; + this.executor = + Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null."); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + inputSplitsState = + context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>())); + currentSplitState = SplitState.IDLE; + splits = Lists.newLinkedList(); + if (context.isRestored()) { + int taskIdx = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Restoring state for the {} (taskIdx: {}).", getClass().getSimpleName(), taskIdx); + for (FlinkInputSplit split : inputSplitsState.get()) { + splits.add(split); + } + } + + sourceContext = + StreamSourceContexts.getSourceContext( + getOperatorConfig().getTimeCharacteristic(), + getProcessingTimeService(), + new Object(), + output, + getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), + -1L, + true); + enqueueProcessSplits(); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + inputSplitsState.clear(); + inputSplitsState.addAll(Lists.newArrayList(splits)); + } + + @Override + public void processElement(StreamRecord element) { + splits.add(element.getValue()); + enqueueProcessSplits(); + } + + private void enqueueProcessSplits() { + if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) { + currentSplitState = SplitState.RUNNING; + executor.execute( + (ThrowingRunnable) this::processSplits, getClass().getSimpleName()); + } + } + + private void processSplits() throws IOException { + FlinkInputSplit split = splits.poll(); + if (split == null) { + currentSplitState = SplitState.IDLE; + return; + } + + format.open(split); + try { + RowData next = null; + while (!format.reachedEnd()) { + next = format.nextRecord(next); + sourceContext.collect(next); + } + } finally { + currentSplitState = SplitState.IDLE; + format.close(); + } + enqueueProcessSplits(); + } + + @Override + public void processWatermark(Watermark mark) {} + + @Override + public void close() throws Exception { + super.close(); + if (format != null) { + format.close(); + format.closeInputFormat(); + format = null; + } + sourceContext = null; + } + + @Override + public void finish() throws Exception { + super.finish(); + output.close(); + if (sourceContext != null) { + sourceContext.emitWatermark(Watermark.MAX_WATERMARK); + sourceContext.close(); + sourceContext = null; + } + } + + private enum SplitState { + IDLE, + RUNNING + } +} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java deleted file mode 100644 index 6b181b510d..0000000000 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.flink.util; - -import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** A util class to handle the reflection operation of Flink class. */ -public class FlinkClassReflectionUtil { - - public static final Logger LOG = LoggerFactory.getLogger(FlinkClassReflectionUtil.class); - - public static Object getSplitLocalOutput(ReaderOutput readerOutput) { - if (readerOutput == null) { - return null; - } - try { - return ReflectionUtil.getField( - (Class) ProgressiveTimestampsAndWatermarks.class.getDeclaredClasses()[2], - readerOutput, - "splitLocalOutputs"); - } catch (Exception e) { - LOG.warn("extract internal watermark error", e); - } - return null; - } - - public static void emitPeriodWatermark(@Nullable Object splitLocalOutput) { - if (splitLocalOutput == null) { - return; - } - try { - Method method = - ProgressiveTimestampsAndWatermarks.class.getDeclaredClasses()[1].getDeclaredMethod( - "emitPeriodicWatermark"); - method.setAccessible(true); - method.invoke(splitLocalOutput); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - LOG.warn("no method found", e); - } - } -} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java index adfd4383b9..6102e9dabd 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java @@ -20,31 +20,29 @@ import org.apache.amoro.flink.interceptor.ProxyFactory; import org.apache.amoro.io.AuthenticatedFileIO; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkWriteResult; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.flink.source.ScanContext; -import org.apache.iceberg.flink.source.StreamingReaderOperator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.util.ThreadPools; import java.lang.reflect.Constructor; -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.List; @@ -52,8 +50,6 @@ /** An util generates Apache Iceberg writer and committer operator w */ public class IcebergClassUtil { - private static final String ICEBERG_SCAN_CONTEXT_CLASS = - "org.apache.iceberg.flink.source.ScanContext"; private static final String ICEBERG_PARTITION_SELECTOR_CLASS = "org.apache.iceberg.flink.sink.PartitionKeySelector"; private static final String ICEBERG_FILE_COMMITTER_CLASS = @@ -66,7 +62,6 @@ public static KeySelector newPartitionKeySelector( try { Class clazz = forName(ICEBERG_PARTITION_SELECTOR_CLASS); Constructor c = clazz.getConstructor(PartitionSpec.class, Schema.class, RowType.class); - c.setAccessible(true); return (KeySelector) c.newInstance(spec, schema, flinkSchema); } catch (NoSuchMethodException | IllegalAccessException @@ -129,51 +124,22 @@ public static ProxyFactory getIcebergStreamWriterProxyFa new Object[] {fullTableName, taskWriterFactory}); } - public static StreamingReaderOperator newStreamingReaderOperator( - FlinkInputFormat format, ProcessingTimeService timeService, MailboxExecutor mailboxExecutor) { - try { - Constructor c = - StreamingReaderOperator.class.getDeclaredConstructor( - FlinkInputFormat.class, ProcessingTimeService.class, MailboxExecutor.class); - c.setAccessible(true); - return c.newInstance(format, timeService, mailboxExecutor); - } catch (IllegalAccessException - | NoSuchMethodException - | InvocationTargetException - | InstantiationException e) { - throw new RuntimeException(e); - } - } - - public static FlinkInputFormat getInputFormat(OneInputStreamOperatorFactory operatorFactory) { - try { - Class[] classes = StreamingReaderOperator.class.getDeclaredClasses(); - Class clazz = null; - for (Class c : classes) { - if ("OperatorFactory".equals(c.getSimpleName())) { - clazz = c; - break; - } - } - Field field = clazz.getDeclaredField("format"); - field.setAccessible(true); - return (FlinkInputFormat) (field.get(operatorFactory)); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - public static ProxyFactory getInputFormatProxyFactory( - OneInputStreamOperatorFactory operatorFactory, + TableLoader tableLoader, + Table table, AuthenticatedFileIO authenticatedFileIO, - Schema tableSchema) { - FlinkInputFormat inputFormat = getInputFormat(operatorFactory); - TableLoader tableLoader = - ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "tableLoader"); - FileIO io = ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "io"); - EncryptionManager encryption = - ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "encryption"); - Object context = ReflectionUtil.getField(FlinkInputFormat.class, inputFormat, "context"); + Schema tableSchema, + Schema projectedSchema, + ReadableConfig flinkConf, + Map properties, + List filters, + long limit, + Long startSnapshotId) { + FileIO io = table.io(); + EncryptionManager encryption = table.encryption(); + ScanContext context = + buildScanContext( + table, projectedSchema, flinkConf, properties, filters, limit, startSnapshotId); return ProxyUtil.getProxyFactory( FlinkInputFormat.class, @@ -184,6 +150,29 @@ public static ProxyFactory getInputFormatProxyFactory( new Object[] {tableLoader, tableSchema, io, encryption, context}); } + private static ScanContext buildScanContext( + Table table, + Schema projectedSchema, + ReadableConfig flinkConf, + Map properties, + List filters, + long limit, + Long startSnapshotId) { + ScanContext.Builder contextBuilder = + ScanContext.builder().resolveConfig(table, properties, flinkConf); + if (projectedSchema != null) { + contextBuilder.project(projectedSchema); + } + if (filters != null) { + contextBuilder.filters(filters); + } + contextBuilder.limit(limit); + if (startSnapshotId != null) { + contextBuilder.startSnapshotId(startSnapshotId); + } + return contextBuilder.build(); + } + private static Class forName(String className) { try { return Class.forName(className); @@ -193,22 +182,10 @@ private static Class forName(String className) { } public static SourceFunction getSourceFunction(AbstractUdfStreamOperator source) { - try { - Field field = AbstractUdfStreamOperator.class.getDeclaredField("userFunction"); - field.setAccessible(true); - return (SourceFunction) (field.get(source)); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } + return (SourceFunction) source.getUserFunction(); } public static void clean(StreamExecutionEnvironment env) { - try { - Field field = StreamExecutionEnvironment.class.getDeclaredField("transformations"); - field.setAccessible(true); - ((List) (field.get(env))).clear(); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } + env.getTransformations().clear(); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java index 192e95801c..31aca4cd2b 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java @@ -43,7 +43,6 @@ import org.apache.amoro.flink.util.CompatibleFlinkPropertyUtil; import org.apache.amoro.flink.util.IcebergClassUtil; import org.apache.amoro.flink.util.MixedFormatUtils; -import org.apache.amoro.flink.util.ProxyUtil; import org.apache.amoro.table.DistributionHashMode; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableProperties; @@ -435,10 +434,7 @@ public static OneInputStreamOperator createFileCommitter return null; } tableLoader.switchLoadInternalTableForKeyedTable(MixedFormatUtils.isToBase(overwrite)); - return (OneInputStreamOperator) - ProxyUtil.getProxy( - IcebergClassUtil.newIcebergFilesCommitter( - tableLoader, overwrite, branch, spec, mixedTable.io()), - mixedTable.io()); + return IcebergClassUtil.newIcebergFilesCommitter( + tableLoader, overwrite, branch, spec, mixedTable.io()); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java index ae0dbe8c77..35b789bb87 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java @@ -30,16 +30,8 @@ import org.apache.amoro.flink.util.TestUtil; import org.apache.amoro.table.KeyedTable; import org.apache.amoro.table.TableIdentifier; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.GenericRowData; @@ -57,8 +49,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.ArrayList; @@ -68,12 +58,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; public class TestWatermark extends FlinkTestBase { - public static final Logger LOG = LoggerFactory.getLogger(TestWatermark.class); - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private static final String DB = TableTestHelper.TEST_TABLE_ID.getDatabase(); @@ -147,17 +133,17 @@ public void testWatermark() throws Exception { "create table d (tt as cast(op_time as timestamp(3)), watermark for tt as tt) like %s", table); - Table source = getTableEnv().sqlQuery("select is_true from d"); - - WatermarkTestOperator op = new WatermarkTestOperator(); - getTableEnv() - .toRetractStream(source, RowData.class) - .transform("test watermark", TypeInformation.of(RowData.class), op); - getEnv().executeAsync("test watermark"); - - op.waitWatermark(); - - Assert.assertTrue(op.watermark > Long.MIN_VALUE); + // This query verifies that a table with watermark definition can still be consumed + // correctly. We intentionally avoid waiting on an async watermark callback here because + // that path depends on internal source/operator timing and can hang the test without + // revealing a user-visible regression. + TableResult result = exec("select is_true from d"); + CommonTestUtils.waitUntilJobManagerIsInitialized( + () -> result.getJobClient().get().getJobStatus().get()); + try (CloseableIterator iterator = result.collect()) { + Assert.assertEquals(Row.of(true), iterator.next()); + } + result.getJobClient().ifPresent(TestUtil::cancelJob); } @Test @@ -226,34 +212,4 @@ public void testSelectWatermarkField() throws Exception { expected.add(new Object[] {true, LocalDateTime.parse("2022-06-17T10:08:11")}); Assert.assertEquals(DataUtil.toRowSet(expected), actual); } - - public static class WatermarkTestOperator extends AbstractStreamOperator - implements OneInputStreamOperator, RowData> { - - private static final long serialVersionUID = 1L; - public long watermark; - private static final CompletableFuture waitWatermark = new CompletableFuture<>(); - - public WatermarkTestOperator() { - super(); - chainingStrategy = ChainingStrategy.ALWAYS; - } - - private void waitWatermark() throws InterruptedException, ExecutionException { - waitWatermark.get(); - } - - @Override - public void processElement(StreamRecord> element) throws Exception { - output.collect(element.asRecord()); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - LOG.info("processWatermark: {}", mark); - watermark = mark.getTimestamp(); - waitWatermark.complete(null); - super.processWatermark(mark); - } - } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java index 7cf7a99eb5..7f7e1fd81c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java @@ -36,7 +36,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.sink.FlinkWriteResult; import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.TaskWriter; @@ -74,18 +73,18 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) this.submitEmptySnapshots = submitEmptySnapshots; } - public static OneInputStreamOperatorTestHarness + public static OneInputStreamOperatorTestHarness createMixedFormatStreamWriter(MixedFormatTableLoader tableLoader) throws Exception { return createMixedFormatStreamWriter(tableLoader, true, null); } - public static OneInputStreamOperatorTestHarness + public static OneInputStreamOperatorTestHarness createMixedFormatStreamWriter( MixedFormatTableLoader tableLoader, boolean submitEmptySnapshots, Long restoredCheckpointId) throws Exception { - OneInputStreamOperatorTestHarness harness = + OneInputStreamOperatorTestHarness harness = doCreateMixedFormatStreamWriter(tableLoader, submitEmptySnapshots, restoredCheckpointId); harness.setup(); @@ -94,15 +93,18 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) return harness; } - public static OneInputStreamOperatorTestHarness + public static OneInputStreamOperatorTestHarness doCreateMixedFormatStreamWriter( MixedFormatTableLoader tableLoader, boolean submitEmptySnapshots, Long restoredCheckpointId) throws Exception { - tableLoader.open(); - MixedTable mixedTable = tableLoader.loadMixedFormatTable(); - mixedTable.properties().put(SUBMIT_EMPTY_SNAPSHOTS.key(), String.valueOf(submitEmptySnapshots)); + HashMap extraProperties = new HashMap<>(); + extraProperties.put(SUBMIT_EMPTY_SNAPSHOTS.key(), String.valueOf(submitEmptySnapshots)); + MixedFormatTableLoader writerTableLoader = + tableLoader.copyWithFlinkTableProperties(extraProperties); + writerTableLoader.open(); + MixedTable mixedTable = writerTableLoader.loadMixedFormatTable(); MixedFormatFileWriter streamWriter = FlinkSink.createFileWriter( @@ -110,8 +112,8 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) null, false, (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(), - tableLoader); - TestOneInputStreamOperatorIntern harness = + writerTableLoader); + TestOneInputStreamOperatorIntern harness = new TestOneInputStreamOperatorIntern<>( streamWriter, 1, 1, 0, restoredCheckpointId, new TestGlobalAggregateManager()); @@ -138,7 +140,7 @@ public void testInsertWrite() throws Exception { Assume.assumeTrue(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { MixedFormatFileWriter fileWriter = (MixedFormatFileWriter) testHarness.getOneInputOperator(); Assert.assertNotNull(fileWriter.getWriter()); @@ -150,8 +152,7 @@ public void testInsertWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertNull(fileWriter.getWriter()); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals( - 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); + Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); checkpointId = checkpointId + 1; @@ -163,9 +164,9 @@ public void testInsertWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value - List completedFiles = testHarness.extractOutputValues(); + List completedFiles = testHarness.extractOutputValues(); Assert.assertEquals(2, completedFiles.size()); - Assert.assertEquals(3, completedFiles.get(1).writeResult().dataFiles().length); + Assert.assertEquals(3, completedFiles.get(1).dataFiles().length); } } @@ -175,7 +176,7 @@ public void testSnapshotMultipleTimes() throws Exception { long timestamp = 1; tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { testHarness.processElement(createRowData(1, "hello", "2020-10-11T10:10:11.0"), timestamp++); testHarness.processElement(createRowData(2, "hello", "2020-10-12T10:10:11.0"), timestamp); @@ -183,13 +184,7 @@ public void testSnapshotMultipleTimes() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId++); long expectedDataFiles = 3; - WriteResult result = - WriteResult.builder() - .addAll( - testHarness.extractOutputValues().stream() - .map(FlinkWriteResult::writeResult) - .collect(java.util.stream.Collectors.toList())) - .build(); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); @@ -197,13 +192,7 @@ public void testSnapshotMultipleTimes() throws Exception { for (int i = 0; i < 5; i++) { testHarness.prepareSnapshotPreBarrier(checkpointId++); - result = - WriteResult.builder() - .addAll( - testHarness.extractOutputValues().stream() - .map(FlinkWriteResult::writeResult) - .collect(java.util.stream.Collectors.toList())) - .build(); + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); } @@ -215,7 +204,7 @@ public void testInsertWriteWithoutPk() throws Exception { Assume.assumeFalse(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { // The first checkpoint testHarness.processElement(createRowData(1, "hello", "2020-10-11T10:10:11.0"), 1); @@ -224,8 +213,7 @@ public void testInsertWriteWithoutPk() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals( - 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); + Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); checkpointId = checkpointId + 1; @@ -236,9 +224,9 @@ public void testInsertWriteWithoutPk() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value - List completedFiles = testHarness.extractOutputValues(); + List completedFiles = testHarness.extractOutputValues(); Assert.assertEquals(2, completedFiles.size()); - Assert.assertEquals(1, completedFiles.get(1).writeResult().dataFiles().length); + Assert.assertEquals(1, completedFiles.get(1).dataFiles().length); } } @@ -247,7 +235,7 @@ public void testDeleteWrite() throws Exception { Assume.assumeTrue(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { // The first checkpoint testHarness.processElement( @@ -261,8 +249,7 @@ public void testDeleteWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals( - 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); + Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); checkpointId = checkpointId + 1; @@ -277,8 +264,7 @@ public void testDeleteWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value Assert.assertEquals(2, testHarness.extractOutputValues().size()); - Assert.assertEquals( - 3, testHarness.extractOutputValues().get(1).writeResult().dataFiles().length); + Assert.assertEquals(3, testHarness.extractOutputValues().get(1).dataFiles().length); } } @@ -287,7 +273,7 @@ public void testUpdateWrite() throws Exception { Assume.assumeTrue(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { // The first checkpoint testHarness.processElement( @@ -301,8 +287,7 @@ public void testUpdateWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals( - 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); + Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); checkpointId = checkpointId + 1; @@ -318,8 +303,7 @@ public void testUpdateWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value Assert.assertEquals(2, testHarness.extractOutputValues().size()); - Assert.assertEquals( - 3, testHarness.extractOutputValues().get(1).writeResult().dataFiles().length); + Assert.assertEquals(3, testHarness.extractOutputValues().get(1).dataFiles().length); } } @@ -329,7 +313,7 @@ public void testEmitEmptyResults() throws Exception { tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; long excepted = submitEmptySnapshots ? 1 : 0; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader, submitEmptySnapshots, null)) { // The first checkpoint diff --git a/amoro-format-mixed/amoro-mixed-trino/pom.xml b/amoro-format-mixed/amoro-mixed-trino/pom.xml index 202d74a0b6..a91f845fe1 100644 --- a/amoro-format-mixed/amoro-mixed-trino/pom.xml +++ b/amoro-format-mixed/amoro-mixed-trino/pom.xml @@ -653,7 +653,6 @@ 17 - sun diff --git a/amoro-format-paimon/pom.xml b/amoro-format-paimon/pom.xml index 87b29c4282..ff98e9559f 100644 --- a/amoro-format-paimon/pom.xml +++ b/amoro-format-paimon/pom.xml @@ -78,4 +78,13 @@ + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + diff --git a/pom.xml b/pom.xml index 1b54c24bcf..02288ccff6 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 2.12.0 3.20.0 3.12.0 - 2.2.2 + 3.3.0 5.7.0 4.11.0 1.21.4 @@ -171,6 +171,26 @@ compile compile provided + + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + + --add-exports=java.base/sun.net.util=ALL-UNNAMED + --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED + --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + + -XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${jvm.module.opens} ${jvm.module.exports} @@ -1491,6 +1511,8 @@ ${maven-surefire-plugin.version} false + ${amoro.surefire.baseArgLine} + ${surefire.excludedGroups.jdk} @@ -1864,12 +1886,52 @@ java11 - [11,) + [11,17) 11 + + java17 + + 17 + + + 17 + 17 + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + --add-exports=java.base/sun.net.util=ALL-UNNAMED + --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED + --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 17 + 17 + + --add-exports=java.base/sun.net.util=ALL-UNNAMED + --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED + --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + + + + + + + spark-3.3 From e1ed50fc7f39cc17b3af85dfe4986c1bac12842f Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:50:40 +0800 Subject: [PATCH 02/12] Update SLF4J dependency to version 2.0.17 and adjust README for JDK compatibility --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e07afe7b0c..4746c444e1 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ Amoro contains modules as below: ## Building -Amoro is built using Maven with JDK 8, 11 and 17(required for `amoro-format-mixed/amoro-mixed-trino` module). +Amoro is built using Maven with JDK 11 and 17(required for `amoro-format-mixed/amoro-mixed-trino` module, experimental for other modules). * Build all modules without `amoro-mixed-trino`: `./mvnw clean package` * Build and skip tests: `./mvnw clean package -DskipTests` From 6a341aed6153e8a60896a0e37c9317b4a40a67d1 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:51:56 +0800 Subject: [PATCH 03/12] Fix CI configuration to properly format JDK matrix in core-hadoop3-ci.yml --- .github/workflows/core-hadoop3-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index a6105ebf8a..cbf93ac426 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' '17'] + jdk: [ '11' '17' ] spark: [ '3.3','3.4', '3.5' ] name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: From f283bf5eb79020bc9f507886f4fe22e0464ae7a7 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 9 Jan 2026 16:52:45 +0800 Subject: [PATCH 04/12] Fix CI configuration to properly format JDK matrix in core-hadoop3-ci.yml --- .github/workflows/core-hadoop3-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index cbf93ac426..004f179afe 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' '17' ] + jdk: [ '11', '17' ] spark: [ '3.3','3.4', '3.5' ] name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: From ca2c20b99c4bfd32bb0c34e1128378dedd36b6e5 Mon Sep 17 00:00:00 2001 From: xuba Date: Wed, 18 Mar 2026 00:32:06 +0800 Subject: [PATCH 05/12] Stabilize continuous optimizing tests on Java 17 --- amoro-common/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/amoro-common/pom.xml b/amoro-common/pom.xml index dd85842c4e..7dd5a42652 100644 --- a/amoro-common/pom.xml +++ b/amoro-common/pom.xml @@ -202,6 +202,14 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + + src/main/gen-java/** + + + org.apache.maven.plugins maven-surefire-plugin From bc641c175c57230ad8250e1ac4be39da9b3c54bf Mon Sep 17 00:00:00 2001 From: xuba Date: Wed, 18 Mar 2026 01:26:53 +0800 Subject: [PATCH 06/12] Update TestAutomaticLogWriter to use truncated LocalDateTime for consistency in time calculations --- .../amoro/flink/write/TestAutomaticLogWriter.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java index b71cf8496f..eb5722a51c 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java @@ -80,6 +80,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -172,26 +173,27 @@ public void testHasCaughtUp() throws Exception { LocalDateTime.parse("2022-06-18 10:10:11", dtf) }); List catchUpExpects = new LinkedList<>(); + LocalDateTime catchUpBaseTime = LocalDateTime.now().truncatedTo(ChronoUnit.MICROS); catchUpExpects.add( new Object[] { 1000014, "d", - LocalDateTime.now().minusSeconds(3).toEpochSecond(ZoneOffset.UTC), - LocalDateTime.now().minusSeconds(3) + catchUpBaseTime.minusSeconds(3).toEpochSecond(ZoneOffset.UTC), + catchUpBaseTime.minusSeconds(3) }); catchUpExpects.add( new Object[] { 1000021, "d", - LocalDateTime.now().minusSeconds(2).toEpochSecond(ZoneOffset.UTC), - LocalDateTime.now().minusSeconds(2) + catchUpBaseTime.minusSeconds(2).toEpochSecond(ZoneOffset.UTC), + catchUpBaseTime.minusSeconds(2) }); catchUpExpects.add( new Object[] { 1000015, "e", - LocalDateTime.now().minusSeconds(1).toEpochSecond(ZoneOffset.UTC), - LocalDateTime.now().minusSeconds(1) + catchUpBaseTime.minusSeconds(1).toEpochSecond(ZoneOffset.UTC), + catchUpBaseTime.minusSeconds(1) }); expects.addAll(catchUpExpects); From 43e1f68868460969e9c08612df22542b320067c5 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 24 Mar 2026 18:00:19 +0800 Subject: [PATCH 07/12] Remove reflective invocation of emitPeriodicWatermark in MixedFormatSourceReader --- .../read/hybrid/reader/MixedFormatSourceReader.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java index 654100d31f..eea767ae8a 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java @@ -195,19 +195,6 @@ private void emitPeriodicWatermark(SourceOutput splitOutput) { return; } - try { - java.lang.reflect.Method method = - splitOutput.getClass().getDeclaredMethod("emitPeriodicWatermark"); - method.setAccessible(true); - method.invoke(splitOutput); - return; - } catch (ReflectiveOperationException e) { - LOGGER.debug( - "Failed to invoke emitPeriodicWatermark on split output {}, fallback to reader output", - splitOutput.getClass(), - e); - } - watermarkOutput.emitPeriodicWatermark(); } } From 991383f5870d3ada0495f5eb97ca6180e8f7322a Mon Sep 17 00:00:00 2001 From: xuba Date: Mon, 8 Jun 2026 17:36:19 +0800 Subject: [PATCH 08/12] use FlinkWriteResult for consistency --- amoro-common/pom.xml | 8 --- .../write/TestMixedFormatFileWriter.java | 65 ++++++++++++------- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/amoro-common/pom.xml b/amoro-common/pom.xml index 7dd5a42652..dd85842c4e 100644 --- a/amoro-common/pom.xml +++ b/amoro-common/pom.xml @@ -202,14 +202,6 @@ - - org.apache.maven.plugins - maven-checkstyle-plugin - - src/main/gen-java/** - - - org.apache.maven.plugins maven-surefire-plugin diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java index 7f7e1fd81c..d7f5430299 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java @@ -36,6 +36,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; +import org.apache.iceberg.flink.sink.FlinkWriteResult; import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.TaskWriter; @@ -73,18 +74,18 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) this.submitEmptySnapshots = submitEmptySnapshots; } - public static OneInputStreamOperatorTestHarness + public static OneInputStreamOperatorTestHarness createMixedFormatStreamWriter(MixedFormatTableLoader tableLoader) throws Exception { return createMixedFormatStreamWriter(tableLoader, true, null); } - public static OneInputStreamOperatorTestHarness + public static OneInputStreamOperatorTestHarness createMixedFormatStreamWriter( MixedFormatTableLoader tableLoader, boolean submitEmptySnapshots, Long restoredCheckpointId) throws Exception { - OneInputStreamOperatorTestHarness harness = + OneInputStreamOperatorTestHarness harness = doCreateMixedFormatStreamWriter(tableLoader, submitEmptySnapshots, restoredCheckpointId); harness.setup(); @@ -93,7 +94,7 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) return harness; } - public static OneInputStreamOperatorTestHarness + public static OneInputStreamOperatorTestHarness doCreateMixedFormatStreamWriter( MixedFormatTableLoader tableLoader, boolean submitEmptySnapshots, @@ -113,7 +114,7 @@ public TestMixedFormatFileWriter(boolean isKeyed, boolean submitEmptySnapshots) false, (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(), writerTableLoader); - TestOneInputStreamOperatorIntern harness = + TestOneInputStreamOperatorIntern harness = new TestOneInputStreamOperatorIntern<>( streamWriter, 1, 1, 0, restoredCheckpointId, new TestGlobalAggregateManager()); @@ -140,7 +141,7 @@ public void testInsertWrite() throws Exception { Assume.assumeTrue(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { MixedFormatFileWriter fileWriter = (MixedFormatFileWriter) testHarness.getOneInputOperator(); Assert.assertNotNull(fileWriter.getWriter()); @@ -152,7 +153,8 @@ public void testInsertWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertNull(fileWriter.getWriter()); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); + Assert.assertEquals( + 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); checkpointId = checkpointId + 1; @@ -164,9 +166,9 @@ public void testInsertWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value - List completedFiles = testHarness.extractOutputValues(); + List completedFiles = testHarness.extractOutputValues(); Assert.assertEquals(2, completedFiles.size()); - Assert.assertEquals(3, completedFiles.get(1).dataFiles().length); + Assert.assertEquals(3, completedFiles.get(1).writeResult().dataFiles().length); } } @@ -176,7 +178,7 @@ public void testSnapshotMultipleTimes() throws Exception { long timestamp = 1; tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { testHarness.processElement(createRowData(1, "hello", "2020-10-11T10:10:11.0"), timestamp++); testHarness.processElement(createRowData(2, "hello", "2020-10-12T10:10:11.0"), timestamp); @@ -184,7 +186,13 @@ public void testSnapshotMultipleTimes() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId++); long expectedDataFiles = 3; - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder() + .addAll( + testHarness.extractOutputValues().stream() + .map(FlinkWriteResult::writeResult) + .collect(java.util.stream.Collectors.toList())) + .build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); @@ -192,7 +200,13 @@ public void testSnapshotMultipleTimes() throws Exception { for (int i = 0; i < 5; i++) { testHarness.prepareSnapshotPreBarrier(checkpointId++); - result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + result = + WriteResult.builder() + .addAll( + testHarness.extractOutputValues().stream() + .map(FlinkWriteResult::writeResult) + .collect(java.util.stream.Collectors.toList())) + .build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); } @@ -204,7 +218,7 @@ public void testInsertWriteWithoutPk() throws Exception { Assume.assumeFalse(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { // The first checkpoint testHarness.processElement(createRowData(1, "hello", "2020-10-11T10:10:11.0"), 1); @@ -213,7 +227,8 @@ public void testInsertWriteWithoutPk() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); + Assert.assertEquals( + 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); checkpointId = checkpointId + 1; @@ -224,9 +239,9 @@ public void testInsertWriteWithoutPk() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value - List completedFiles = testHarness.extractOutputValues(); + List completedFiles = testHarness.extractOutputValues(); Assert.assertEquals(2, completedFiles.size()); - Assert.assertEquals(1, completedFiles.get(1).dataFiles().length); + Assert.assertEquals(1, completedFiles.get(1).writeResult().dataFiles().length); } } @@ -235,7 +250,7 @@ public void testDeleteWrite() throws Exception { Assume.assumeTrue(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { // The first checkpoint testHarness.processElement( @@ -249,7 +264,8 @@ public void testDeleteWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); + Assert.assertEquals( + 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); checkpointId = checkpointId + 1; @@ -264,7 +280,8 @@ public void testDeleteWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value Assert.assertEquals(2, testHarness.extractOutputValues().size()); - Assert.assertEquals(3, testHarness.extractOutputValues().get(1).dataFiles().length); + Assert.assertEquals( + 3, testHarness.extractOutputValues().get(1).writeResult().dataFiles().length); } } @@ -273,7 +290,7 @@ public void testUpdateWrite() throws Exception { Assume.assumeTrue(isKeyedTable()); tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader)) { // The first checkpoint testHarness.processElement( @@ -287,7 +304,8 @@ public void testUpdateWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); Assert.assertEquals(1, testHarness.extractOutputValues().size()); - Assert.assertEquals(3, testHarness.extractOutputValues().get(0).dataFiles().length); + Assert.assertEquals( + 3, testHarness.extractOutputValues().get(0).writeResult().dataFiles().length); checkpointId = checkpointId + 1; @@ -303,7 +321,8 @@ public void testUpdateWrite() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); // testHarness.extractOutputValues() calculates the cumulative value Assert.assertEquals(2, testHarness.extractOutputValues().size()); - Assert.assertEquals(3, testHarness.extractOutputValues().get(1).dataFiles().length); + Assert.assertEquals( + 3, testHarness.extractOutputValues().get(1).writeResult().dataFiles().length); } } @@ -313,7 +332,7 @@ public void testEmitEmptyResults() throws Exception { tableLoader = MixedFormatTableLoader.of(TableTestHelper.TEST_TABLE_ID, catalogBuilder); long checkpointId = 1L; long excepted = submitEmptySnapshots ? 1 : 0; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createMixedFormatStreamWriter(tableLoader, submitEmptySnapshots, null)) { // The first checkpoint From caf3de2d03a5477b526282a96e2d749482a27480 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 9 Jun 2026 21:13:55 +0800 Subject: [PATCH 09/12] Add locality exposure in IcebergClassUtil and implement tests for file committer --- .github/workflows/core-hadoop3-ci.yml | 5 ++ .../amoro/flink/util/IcebergClassUtil.java | 12 +++ .../flink/util/TestIcebergClassUtil.java | 75 +++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index 004f179afe..aebb929f05 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -39,6 +39,11 @@ jobs: matrix: jdk: [ '11', '17' ] spark: [ '3.3','3.4', '3.5' ] + exclude: + - jdk: '17' + spark: '3.3' + - jdk: '17' + spark: '3.4' name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: - uses: actions/checkout@v3 diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java index 6102e9dabd..76a4555e6b 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java @@ -34,11 +34,13 @@ import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkWriteResult; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.util.ThreadPools; @@ -160,6 +162,7 @@ private static ScanContext buildScanContext( Long startSnapshotId) { ScanContext.Builder contextBuilder = ScanContext.builder().resolveConfig(table, properties, flinkConf); + contextBuilder.exposeLocality(isLocalityEnabled(table, flinkConf)); if (projectedSchema != null) { contextBuilder.project(projectedSchema); } @@ -173,6 +176,15 @@ private static ScanContext buildScanContext( return contextBuilder.build(); } + private static boolean isLocalityEnabled(Table table, ReadableConfig flinkConf) { + Boolean localityEnabled = + flinkConf.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO); + if (localityEnabled != null && !localityEnabled) { + return false; + } + return Util.mayHaveBlockLocations(table.io(), table.location()); + } + private static Class forName(String className) { try { return Class.forName(className); diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java new file mode 100644 index 0000000000..0e654f2a07 --- /dev/null +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.flink.util; + +import org.apache.amoro.io.AuthenticatedFileIO; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.sink.FlinkWriteResult; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestIcebergClassUtil { + + @Test + public void testIcebergFilesCommitterRunsThroughAuthenticatedFileIO() { + CountingAuthenticatedFileIO fileIO = new CountingAuthenticatedFileIO(); + OneInputStreamOperator committer = + IcebergClassUtil.newIcebergFilesCommitter( + null, false, null, PartitionSpec.unpartitioned(), fileIO); + + committer.toString(); + + Assert.assertEquals(1, fileIO.doAsCalls.get()); + } + + private static class CountingAuthenticatedFileIO implements AuthenticatedFileIO { + private final AtomicInteger doAsCalls = new AtomicInteger(); + + @Override + public T doAs(Callable callable) { + doAsCalls.incrementAndGet(); + try { + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public InputFile newInputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException(); + } + } +} From 68f833ffd1f27bb374313c3513465b57ed5edd5d Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 16 Jun 2026 17:10:18 +0800 Subject: [PATCH 10/12] Remove Java 17 from CI build matrix and update wrapKrb method to accept nullable startSnapshotId --- .github/workflows/core-hadoop2-ci.yml | 2 +- .../amoro-mixed-flink/amoro-mixed-flink-common/pom.xml | 1 - .../main/java/org/apache/amoro/flink/table/FlinkSource.java | 3 ++- pom.xml | 1 - 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/core-hadoop2-ci.yml b/.github/workflows/core-hadoop2-ci.yml index b292c044fa..ad891366c5 100644 --- a/.github/workflows/core-hadoop2-ci.yml +++ b/.github/workflows/core-hadoop2-ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '11' , '17' ] + jdk: [ '11' ] name: Build Amoro with JDK ${{ matrix.jdk }} steps: - uses: actions/checkout@v3 diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml index 44b7277abc..64b8a5f19d 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml @@ -430,7 +430,6 @@ org.apache.amoro.listener.AmoroRunListener - ${surefire.excludedGroups.jdk} ${amoro.surefire.baseArgLine} diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java index 2d0f5fe44a..b356aa0531 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java @@ -57,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -263,7 +264,7 @@ public DataStream buildUnkeyedTableSource(String scanStartupMode) { } /** extract op from dataStream, and wrap krb support */ - private DataStream wrapKrb(DataStream ds, Long startSnapshotId) { + private DataStream wrapKrb(DataStream ds, @Nullable Long startSnapshotId) { IcebergClassUtil.clean(env); Transformation origin = ds.getTransformation(); int scanParallelism = diff --git a/pom.xml b/pom.xml index 02288ccff6..78659d5087 100644 --- a/pom.xml +++ b/pom.xml @@ -1512,7 +1512,6 @@ false ${amoro.surefire.baseArgLine} - ${surefire.excludedGroups.jdk} From 5ccd9285a447a353ba54e8e7f46f8fdec640d8e9 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 16 Jun 2026 18:09:31 +0800 Subject: [PATCH 11/12] spotless --- .../src/main/java/org/apache/amoro/flink/table/FlinkSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java index b356aa0531..2a89f81edd 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java @@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.util.HashMap; import java.util.List; import java.util.Map; From 595d21caf11999c698e7f4489d802dc6f0d0a896 Mon Sep 17 00:00:00 2001 From: xuba Date: Thu, 18 Jun 2026 00:15:32 +0800 Subject: [PATCH 12/12] address --- .../flink/lookup/BasicLookupFunction.java | 19 +++++++++++++------ .../table/UnkeyedStreamingReaderOperator.java | 10 +++++++++- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java index 3acce39b1c..114245de93 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java @@ -30,6 +30,7 @@ import org.apache.amoro.table.MixedTable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.util.FlinkRuntimeException; @@ -44,10 +45,12 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -241,16 +244,20 @@ private void checkErrorAndRethrow() { } private String generateRocksDBPath(FunctionContext context, String tableName) { - String tmpPath = getTmpDirectory(context); + String tmpPath = getTmpDirectoryFromTMContainer(context); File db = new File(tmpPath, tableName + "-lookup-" + UUID.randomUUID()); return db.toString(); } - private static String getTmpDirectory(FunctionContext context) { - String configuredTmpDir = context.getJobParameter("java.io.tmpdir", null); - if (configuredTmpDir != null && !configuredTmpDir.isEmpty()) { - return configuredTmpDir; + private static String getTmpDirectoryFromTMContainer(FunctionContext context) { + try { + Field field = context.getClass().getDeclaredField("context"); + field.setAccessible(true); + StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) field.get(context); + String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories(); + return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)]; + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); } - return System.getProperty("java.io.tmpdir"); } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java index 6cca6c776f..7d4310e358 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java @@ -43,7 +43,15 @@ import java.io.IOException; import java.util.Queue; -/** Minimal reader operator to avoid depending on Iceberg's non-public StreamingReaderOperator. */ +/** + * Reader operator for Amoro's unkeyed Iceberg file source. + * + *

This mirrors Iceberg's non-public {@code StreamingReaderOperator} behavior while allowing + * Amoro to create the {@code FlinkInputFormat} through {@link + * org.apache.amoro.flink.interceptor.ProxyFactory} at operator runtime. This keeps the + * Kerberos-aware file IO wrapper without reflectively depending on Iceberg's package-private + * operator factory or private constructor. + */ public class UnkeyedStreamingReaderOperator extends AbstractStreamOperator implements OneInputStreamOperator {