diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java index 39c34a7209..b93050f03b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.YamlParserUtils; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.runtime.messages.Acknowledge; @@ -53,7 +54,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; import java.io.BufferedReader; import java.io.File; @@ -278,8 +278,8 @@ private Map loadFlinkConfig() { Map configDocument; Path flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML); if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) { - flinkConfPath = Paths.get(flinkConfDir + LEGACY_FLINK_CONFIG_YAML); - configDocument = new Yaml().load(Files.newInputStream(flinkConfPath)); + configDocument = new HashMap<>(); + configDocument.putAll(GlobalConfiguration.loadConfiguration(flinkConfDir).toMap()); } else { configDocument = YamlParserUtils.loadYamlFile(new File(flinkConfPath.toUri())); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java index d4e4c590a2..619a00a8a9 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java @@ -28,9 +28,14 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.utils.MemorySize; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -39,6 +44,8 @@ public class TestFlinkOptimizerContainer { FlinkOptimizerContainer container = new FlinkOptimizerContainer(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + Map containerProperties = Maps.newHashMap(); public TestFlinkOptimizerContainer() { @@ -99,6 +106,22 @@ public void testReadFlinkConfigFile() { Assert.assertEquals(flinkConfig.get(JOB_MANAGER_TOTAL_PROCESS_MEMORY), "1600m"); } + @Test + public void testReadLegacyFlinkConfigWithMetricFilterIncludes() throws Exception { + File flinkConf = tempFolder.newFile("flink-conf.yaml"); + Files.write( + flinkConf.toPath(), + String.join( + "\n", + "jobmanager.rpc.address: localhost", + "xxx.metrics.filter.includes: *RSS:*:*;Heap:Used,Max:*") + .getBytes(StandardCharsets.UTF_8)); + + Map flinkConfig = container.loadFlinkConfigForYAML(flinkConf.toURI().toURL()); + Assert.assertEquals("localhost", flinkConfig.get("jobmanager.rpc.address")); + Assert.assertEquals("*RSS:*:*;Heap:Used,Max:*", flinkConfig.get("xxx.metrics.filter.includes")); + } + @Test public void testBuildFlinkOptions() { Map containerProperties = Maps.newHashMap(this.containerProperties);