From 5b15fa3542fcf809a7a3768e56b9a4e228a5bc26 Mon Sep 17 00:00:00 2001 From: luxl Date: Wed, 17 Jun 2026 14:24:13 +0800 Subject: [PATCH 1/6] [Improvement-17330][K8s] Replace job watcher with informer Co-authored-by: Cursor --- .../task/api/k8s/impl/K8sTaskExecutor.java | 95 +++++++----- .../plugin/task/api/utils/K8sUtils.java | 13 +- .../task/api/k8s/K8sTaskExecutorTest.java | 142 +++++++++++++++++- 3 files changed, 200 insertions(+), 50 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f48169c5e150..f360167fdf77 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -73,10 +73,9 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; /** * K8sTaskExecutor used to submit k8s task to K8S @@ -199,46 +198,38 @@ public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { } public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { - CountDownLatch countDownLatch = new CountDownLatch(1); - Watcher watcher = new Watcher() { + final String jobName = job.getMetadata().getName(); + final String namespace = job.getMetadata().getNamespace(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + SharedIndexInformer informer = k8sUtils.createBatchJobInformer(jobName, namespace, + new ResourceEventHandler() { + + @Override + public void onAdd(Job watchedJob) { + // ignore initial add event, same as Watcher.Action.ADDED + } - @Override - public void eventReceived(Action action, Job job) { - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), - taskRequest.getTaskInstanceId()); - LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); - if (action == Action.DELETED) { - log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - } else if (action != Action.ADDED) { - int jobStatus = getK8sJobStatus(job); - log.info("job {} status {}", job.getMetadata().getName(), jobStatus); - if (jobStatus == TaskConstants.RUNNING_CODE) { - return; - } - setTaskStatus(jobStatus, taskInstanceId, taskResponse); - countDownLatch.countDown(); + @Override + public void onUpdate(Job oldJob, Job watchedJob) { + handleBatchJobEvent(watchedJob, taskInstanceId, taskResponse, countDownLatch); } - } finally { - LogUtils.removeTaskInstanceLogFullPathMDC(); - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } - @Override - public void onClose(WatcherException e) { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), - taskRequest.getTaskInstanceId()); - log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), e.getMessage()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - }; - try (Watch watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) { + @Override + public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), + taskRequest.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + log.error("[K8sJobExecutor-{}] fail in k8s", watchedJob.getMetadata().getName()); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + countDownLatch.countDown(); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + }); + try { boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; if (timeoutFlag) { @@ -254,6 +245,30 @@ public void onClose(WatcherException e) { } catch (Exception e) { log.error("job failed in k8s: {}", e.getMessage(), e); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } finally { + informer.stop(); + } + } + + private void handleBatchJobEvent(Job watchedJob, + String taskInstanceId, + TaskResponse taskResponse, + CountDownLatch countDownLatch) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), + taskRequest.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + log.info("event received : job:{}", watchedJob.getMetadata().getName()); + int jobStatus = getK8sJobStatus(watchedJob); + log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + setTaskStatus(jobStatus, taskInstanceId, taskResponse); + countDownLatch.countDown(); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java index a96f3ebb010e..3b19a2bb596c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java @@ -29,8 +29,8 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @Slf4j public class K8sUtils { @@ -71,15 +71,18 @@ public Boolean jobExist(String jobName, String namespace) { } } - public Watch createBatchJobWatcher(String jobName, Watcher watcher) { + public SharedIndexInformer createBatchJobInformer(String jobName, + String namespace, + ResourceEventHandler handler) { try { return client.batch() .v1() .jobs() + .inNamespace(namespace) .withName(jobName) - .watch(watcher); + .inform(handler); } catch (Exception e) { - throw new TaskException("fail to register batch job watcher", e); + throw new TaskException("fail to register batch job informer", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index d93130caee9a..ba49b5c6ced6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -17,30 +17,43 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; public class K8sTaskExecutorTest { - private static final Logger logger = LoggerFactory.getLogger(K8sTaskExecutorTest.class); - private K8sTaskExecutor k8sTaskExecutor = null; private K8sTaskMainParameters k8sTaskMainParameters = null; private final String image = "ds-dev"; @@ -51,8 +64,10 @@ public class K8sTaskExecutorTest { private final int taskInstanceId = 1000; private final String taskName = "k8s_task_test"; private Job job; + private K8sUtils k8sUtils; + @BeforeEach - public void before() { + public void before() throws Exception { TaskExecutionContext taskRequest = new TaskExecutionContext(); taskRequest.setTaskInstanceId(taskInstanceId); taskRequest.setTaskName(taskName); @@ -64,6 +79,8 @@ public void before() { requirement.setOperator("In"); requirement.setValues(Arrays.asList("1234", "123456")); k8sTaskExecutor = new K8sTaskExecutor(taskRequest); + k8sUtils = mock(K8sUtils.class); + injectK8sUtils(k8sTaskExecutor, k8sUtils); k8sTaskMainParameters = new K8sTaskMainParameters(); k8sTaskMainParameters.setImage(image); k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy); @@ -76,6 +93,121 @@ public void before() { k8sTaskExecutor.buildK8sJob(k8sTaskMainParameters); job = k8sTaskExecutor.getJob(); } + + private void injectK8sUtils(K8sTaskExecutor executor, K8sUtils mockK8sUtils) throws Exception { + Field field = executor.getClass().getSuperclass().getDeclaredField("k8sUtils"); + field.setAccessible(true); + field.set(executor, mockK8sUtils); + } + + private TaskExecutionContext getTaskRequest() throws Exception { + Field field = k8sTaskExecutor.getClass().getSuperclass().getDeclaredField("taskRequest"); + field.setAccessible(true); + return (TaskExecutionContext) field.get(k8sTaskExecutor); + } + + private WatcherHarness startBatchJobWatcher(TaskResponse taskResponse) throws InterruptedException { + WatcherHarness harness = new WatcherHarness(); + harness.informer = mock(SharedIndexInformer.class); + CountDownLatch handlerReady = new CountDownLatch(1); + AtomicReference> handlerRef = new AtomicReference<>(); + when(k8sUtils.createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any())) + .thenAnswer(invocation -> { + handlerRef.set(invocation.getArgument(2)); + handlerReady.countDown(); + return harness.informer; + }); + harness.thread = new Thread(() -> k8sTaskExecutor.registerBatchJobWatcher(job, + String.valueOf(taskInstanceId), taskResponse)); + harness.thread.start(); + Assertions.assertTrue(handlerReady.await(5, TimeUnit.SECONDS)); + harness.handler = handlerRef.get(); + return harness; + } + + private void finishWatcher(WatcherHarness harness) throws InterruptedException { + harness.thread.join(5000); + verify(harness.informer).stop(); + } + + private Job jobWithStatus(Integer succeeded, Integer failed) { + JobStatus status = new JobStatus(); + status.setSucceeded(succeeded); + status.setFailed(failed); + Job watchedJob = new Job(); + watchedJob.setMetadata(job.getMetadata()); + watchedJob.setStatus(status); + return watchedJob; + } + + @Test + public void testRegisterBatchJobInformerOnUpdateSuccess() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerOnUpdateFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onUpdate(job, jobWithStatus(null, 1)); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerOnDelete() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onDelete(job, false); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerIgnoreOnAdd() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onAdd(jobWithStatus(1, null)); + Assertions.assertTrue(harness.thread.isAlive()); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerIgnoreRunningUpdate() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onUpdate(job, jobWithStatus(null, null)); + Assertions.assertTrue(harness.thread.isAlive()); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerTimeout() throws Exception { + TaskExecutionContext taskRequest = getTaskRequest(); + taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.FAILED); + taskRequest.setTaskTimeout(1); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + private static final class WatcherHarness { + + private SharedIndexInformer informer; + private ResourceEventHandler handler; + private Thread thread; + } + @Test public void testGetK8sJobStatusNormal() { JobStatus jobStatus = new JobStatus(); From 583fb89ec2eedd44cc95d8a2031520748566ad9c Mon Sep 17 00:00:00 2001 From: det101 Date: Tue, 23 Jun 2026 11:27:45 +0800 Subject: [PATCH 2/6] [Improvement-17330][K8s] Address informer review feedback on Fabric8 6.0 Use runnableInformer with start().whenComplete() for startup failure handling, check terminal status on onAdd, align event log format, and expand unit tests. Keep Fabric8 at 6.0.0 instead of 6.4.0: the BOM upgrade affects all K8s client call sites (API cluster management, datasource, Spark on K8s, task execution) and tightens kubeconfig validation, which breaks unrelated flows such as cluster update when config is re-validated. The 6.4 stopped() API is not available on 6.0; startup errors are covered by start().whenComplete(), and watch relist handles too old resource version at runtime. --- .../task/api/k8s/impl/K8sTaskExecutor.java | 122 +++++++++++------- .../plugin/task/api/utils/K8sUtils.java | 6 +- .../task/api/k8s/K8sTaskExecutorTest.java | 35 ++++- 3 files changed, 111 insertions(+), 52 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f360167fdf77..479de2e5143e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -201,43 +201,20 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse final String jobName = job.getMetadata().getName(); final String namespace = job.getMetadata().getNamespace(); final CountDownLatch countDownLatch = new CountDownLatch(1); - SharedIndexInformer informer = k8sUtils.createBatchJobInformer(jobName, namespace, - new ResourceEventHandler() { - - @Override - public void onAdd(Job watchedJob) { - // ignore initial add event, same as Watcher.Action.ADDED - } - - @Override - public void onUpdate(Job oldJob, Job watchedJob) { - handleBatchJobEvent(watchedJob, taskInstanceId, taskResponse, countDownLatch); - } - - @Override - public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), - taskRequest.getTaskInstanceId()); - LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - log.error("[K8sJobExecutor-{}] fail in k8s", watchedJob.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - } finally { - LogUtils.removeTaskInstanceLogFullPathMDC(); - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } - }); + SharedIndexInformer informer = null; try { - boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED - || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; - if (timeoutFlag) { - Boolean timeout = !(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)); - waitTimeout(timeout); - } else { - countDownLatch.await(); - } + informer = k8sUtils.createBatchJobInformer(jobName, namespace, + createBatchJobEventHandler(taskInstanceId, taskResponse, countDownLatch)); + informer.start().whenComplete((v, ex) -> { + if (ex != null && countDownLatch.getCount() > 0) { + withTaskLogContext(() -> { + log.error("[K8sJobExecutor-{}] fail in k8s: {}", jobName, ex.getMessage(), ex); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + countDownLatch.countDown(); + }); + } + }); + awaitJobCompletion(countDownLatch); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); Thread.currentThread().interrupt(); @@ -246,32 +223,79 @@ public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { log.error("job failed in k8s: {}", e.getMessage(), e); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); } finally { - informer.stop(); + if (informer != null) { + informer.stop(); + } } } - private void handleBatchJobEvent(Job watchedJob, - String taskInstanceId, - TaskResponse taskResponse, - CountDownLatch countDownLatch) { + private ResourceEventHandler createBatchJobEventHandler(String taskInstanceId, TaskResponse taskResponse, + CountDownLatch countDownLatch) { + return new ResourceEventHandler() { + + @Override + public void onAdd(Job watchedJob) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: ADD", watchedJob.getMetadata().getName()); + handleBatchJobTerminalStatus(watchedJob, taskInstanceId, taskResponse, countDownLatch); + }); + } + + @Override + public void onUpdate(Job oldJob, Job watchedJob) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: UPDATE", watchedJob.getMetadata().getName()); + handleBatchJobTerminalStatus(watchedJob, taskInstanceId, taskResponse, countDownLatch); + }); + } + + @Override + public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: DELETE", watchedJob.getMetadata().getName()); + log.error("[K8sJobExecutor-{}] fail in k8s", watchedJob.getMetadata().getName()); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + countDownLatch.countDown(); + }); + } + }; + } + + private void awaitJobCompletion(CountDownLatch countDownLatch) throws InterruptedException { + boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED + || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + if (timeoutFlag) { + if (!countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)) { + waitTimeout(true); + } + } else { + countDownLatch.await(); + } + } + + private void withTaskLogContext(Runnable action) { try { LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), taskRequest.getTaskInstanceId()); LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - log.info("event received : job:{}", watchedJob.getMetadata().getName()); - int jobStatus = getK8sJobStatus(watchedJob); - log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); - if (jobStatus == TaskConstants.RUNNING_CODE) { - return; - } - setTaskStatus(jobStatus, taskInstanceId, taskResponse); - countDownLatch.countDown(); + action.run(); } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } + private void handleBatchJobTerminalStatus(Job watchedJob, String taskInstanceId, TaskResponse taskResponse, + CountDownLatch countDownLatch) { + int jobStatus = getK8sJobStatus(watchedJob); + log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + setTaskStatus(jobStatus, taskInstanceId, taskResponse); + countDownLatch.countDown(); + } + private void parsePodLogOutput() { ExecutorService collectPodLogExecutorService = ThreadUtils .newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskRequest.getTaskName()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java index 3b19a2bb596c..63ef986f0e9f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java @@ -75,12 +75,14 @@ public SharedIndexInformer createBatchJobInformer(String jobName, String namespace, ResourceEventHandler handler) { try { - return client.batch() + SharedIndexInformer informer = client.batch() .v1() .jobs() .inNamespace(namespace) .withName(jobName) - .inform(handler); + .runnableInformer(0L); + informer.addEventHandler(handler); + return informer; } catch (Exception e) { throw new TaskException("fail to register batch job informer", e); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index ba49b5c6ced6..36d57342c70f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -109,6 +111,7 @@ private TaskExecutionContext getTaskRequest() throws Exception { private WatcherHarness startBatchJobWatcher(TaskResponse taskResponse) throws InterruptedException { WatcherHarness harness = new WatcherHarness(); harness.informer = mock(SharedIndexInformer.class); + when(harness.informer.start()).thenReturn(CompletableFuture.completedFuture(null)); CountDownLatch handlerReady = new CountDownLatch(1); AtomicReference> handlerRef = new AtomicReference<>(); when(k8sUtils.createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any())) @@ -168,16 +171,46 @@ public void testRegisterBatchJobInformerOnDelete() throws Exception { } @Test - public void testRegisterBatchJobInformerIgnoreOnAdd() throws Exception { + public void testRegisterBatchJobInformerOnAddSuccess() throws Exception { TaskResponse taskResponse = new TaskResponse(); WatcherHarness harness = startBatchJobWatcher(taskResponse); harness.handler.onAdd(jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerOnAddFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onAdd(jobWithStatus(null, 1)); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testRegisterBatchJobInformerOnAddRunning() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(taskResponse); + harness.handler.onAdd(jobWithStatus(null, null)); Assertions.assertTrue(harness.thread.isAlive()); harness.handler.onUpdate(job, jobWithStatus(1, null)); finishWatcher(harness); assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); } + @Test + public void testRegisterBatchJobInformerInformerStartFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + doThrow(new RuntimeException("informer start failed")).when(k8sUtils) + .createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any()); + Thread thread = new Thread(() -> k8sTaskExecutor.registerBatchJobWatcher(job, + String.valueOf(taskInstanceId), taskResponse)); + thread.start(); + thread.join(5000); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + @Test public void testRegisterBatchJobInformerIgnoreRunningUpdate() throws Exception { TaskResponse taskResponse = new TaskResponse(); From a80da0c0a7aa2c47678c72d7e378e07933091459 Mon Sep 17 00:00:00 2001 From: luxiaolong-ct <294671909+luxiaolong-ct@users.noreply.github.com> Date: Wed, 24 Jun 2026 11:40:44 +0800 Subject: [PATCH 3/6] [Improvement-17330][K8s] Add GET job status polling as informer safety net --- .../task/api/k8s/impl/K8sTaskExecutor.java | 70 +++++++++++++++++-- .../plugin/task/api/utils/K8sUtils.java | 16 +++-- .../task/api/k8s/K8sTaskExecutorTest.java | 63 +++++++++++++++-- 3 files changed, 136 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 479de2e5143e..cf1fce05ff97 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -60,6 +60,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -83,6 +84,8 @@ @Slf4j public class K8sTaskExecutor extends AbstractK8sTaskExecutor { + private static final long JOB_STATUS_POLL_INTERVAL_SECONDS = 30L; + private Job job; protected boolean podLogOutputIsFinished = false; protected Future podLogOutputFuture; @@ -202,6 +205,7 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse final String namespace = job.getMetadata().getNamespace(); final CountDownLatch countDownLatch = new CountDownLatch(1); SharedIndexInformer informer = null; + ScheduledExecutorService jobStatusPoller = null; try { informer = k8sUtils.createBatchJobInformer(jobName, namespace, createBatchJobEventHandler(taskInstanceId, taskResponse, countDownLatch)); @@ -214,6 +218,7 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse }); } }); + jobStatusPoller = startJobStatusPoller(jobName, namespace, taskInstanceId, taskResponse, countDownLatch); awaitJobCompletion(countDownLatch); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); @@ -223,12 +228,63 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse log.error("job failed in k8s: {}", e.getMessage(), e); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); } finally { + if (jobStatusPoller != null) { + jobStatusPoller.shutdownNow(); + } if (informer != null) { informer.stop(); } } } + /** + * Returns the interval for polling Job status. Protected so unit tests can override with a shorter value. + */ + protected long getJobStatusPollIntervalSeconds() { + return JOB_STATUS_POLL_INTERVAL_SECONDS; + } + + private ScheduledExecutorService startJobStatusPoller(String jobName, + String namespace, + String taskInstanceId, + TaskResponse taskResponse, + CountDownLatch countDownLatch) { + ScheduledExecutorService jobStatusPoller = ThreadUtils.newSingleDaemonScheduledExecutorService( + "K8sJobStatusPoller-" + jobName); + long pollIntervalSeconds = getJobStatusPollIntervalSeconds(); + jobStatusPoller.scheduleAtFixedRate( + () -> pollBatchJobStatus(jobName, namespace, taskInstanceId, taskResponse, countDownLatch), + pollIntervalSeconds, + pollIntervalSeconds, + TimeUnit.SECONDS); + return jobStatusPoller; + } + + private void pollBatchJobStatus(String jobName, + String namespace, + String taskInstanceId, + TaskResponse taskResponse, + CountDownLatch countDownLatch) { + if (countDownLatch.getCount() == 0) { + return; + } + withTaskLogContext(() -> { + try { + log.info("[K8sJobExecutor-{}] polling job status via GET", jobName); + Job polledJob = k8sUtils.getJob(jobName, namespace); + if (polledJob == null) { + log.error("[K8sJobExecutor-{}] job not found during status polling", jobName); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + countDownLatch.countDown(); + return; + } + handleBatchJobTerminalStatus(polledJob, taskInstanceId, taskResponse, countDownLatch); + } catch (Exception e) { + log.warn("[K8sJobExecutor-{}] failed to poll job status: {}", jobName, e.getMessage()); + } + }); + } + private ResourceEventHandler createBatchJobEventHandler(String taskInstanceId, TaskResponse taskResponse, CountDownLatch countDownLatch) { return new ResourceEventHandler() { @@ -252,6 +308,9 @@ public void onUpdate(Job oldJob, Job watchedJob) { @Override public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { withTaskLogContext(() -> { + if (countDownLatch.getCount() == 0) { + return; + } log.info("event received, job: {}, action: DELETE", watchedJob.getMetadata().getName()); log.error("[K8sJobExecutor-{}] fail in k8s", watchedJob.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); @@ -287,12 +346,15 @@ private void withTaskLogContext(Runnable action) { private void handleBatchJobTerminalStatus(Job watchedJob, String taskInstanceId, TaskResponse taskResponse, CountDownLatch countDownLatch) { + if (countDownLatch.getCount() == 0) { + return; + } int jobStatus = getK8sJobStatus(watchedJob); log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); if (jobStatus == TaskConstants.RUNNING_CODE) { return; } - setTaskStatus(jobStatus, taskInstanceId, taskResponse); + setTaskStatus(jobStatus, watchedJob.getMetadata().getName(), taskResponse); countDownLatch.countDown(); } @@ -417,13 +479,13 @@ public int getK8sJobStatus(Job job) { } } - public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse) { + public void setTaskStatus(int jobStatus, String jobName, TaskResponse taskResponse) { if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { if (jobStatus == EXIT_CODE_SUCCESS) { - log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName()); + log.info("[K8sJobExecutor-{}] succeed in k8s", jobName); taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); } else { - log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName()); + log.error("[K8sJobExecutor-{}] fail in k8s", jobName); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java index 63ef986f0e9f..c8933a1a05b6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java @@ -62,15 +62,23 @@ public void deleteJob(String jobName, String namespace) { } } - public Boolean jobExist(String jobName, String namespace) { + public Job getJob(String jobName, String namespace) { try { - Job job = client.batch().v1().jobs().inNamespace(namespace).withName(jobName).get(); - return job != null; + return client.batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(jobName) + .get(); } catch (Exception e) { - throw new TaskException("fail to check job: ", e); + throw new TaskException("fail to get job: " + jobName, e); } } + public Boolean jobExist(String jobName, String namespace) { + return getJob(jobName, namespace) != null; + } + public SharedIndexInformer createBatchJobInformer(String jobName, String namespace, ResourceEventHandler handler) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index 36d57342c70f..6e0ebb60198e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -97,9 +97,18 @@ public void before() throws Exception { } private void injectK8sUtils(K8sTaskExecutor executor, K8sUtils mockK8sUtils) throws Exception { - Field field = executor.getClass().getSuperclass().getDeclaredField("k8sUtils"); - field.setAccessible(true); - field.set(executor, mockK8sUtils); + Class type = executor.getClass(); + while (type != null) { + try { + Field field = type.getDeclaredField("k8sUtils"); + field.setAccessible(true); + field.set(executor, mockK8sUtils); + return; + } catch (NoSuchFieldException ignored) { + type = type.getSuperclass(); + } + } + throw new NoSuchFieldException("k8sUtils"); } private TaskExecutionContext getTaskRequest() throws Exception { @@ -109,6 +118,11 @@ private TaskExecutionContext getTaskRequest() throws Exception { } private WatcherHarness startBatchJobWatcher(TaskResponse taskResponse) throws InterruptedException { + return startBatchJobWatcher(k8sTaskExecutor, taskResponse); + } + + private WatcherHarness startBatchJobWatcher(K8sTaskExecutor executor, + TaskResponse taskResponse) throws InterruptedException { WatcherHarness harness = new WatcherHarness(); harness.informer = mock(SharedIndexInformer.class); when(harness.informer.start()).thenReturn(CompletableFuture.completedFuture(null)); @@ -120,7 +134,7 @@ private WatcherHarness startBatchJobWatcher(TaskResponse taskResponse) throws In handlerReady.countDown(); return harness.informer; }); - harness.thread = new Thread(() -> k8sTaskExecutor.registerBatchJobWatcher(job, + harness.thread = new Thread(() -> executor.registerBatchJobWatcher(job, String.valueOf(taskInstanceId), taskResponse)); harness.thread.start(); Assertions.assertTrue(handlerReady.await(5, TimeUnit.SECONDS)); @@ -234,6 +248,45 @@ public void testRegisterBatchJobInformerTimeout() throws Exception { assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); } + @Test + public void testRegisterBatchJobInformerJobStatusPollingSuccess() throws Exception { + K8sTaskExecutor pollingExecutor = new K8sTaskExecutor(getTaskRequest()) { + + @Override + protected long getJobStatusPollIntervalSeconds() { + return 1L; + } + }; + injectK8sUtils(pollingExecutor, k8sUtils); + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) + .thenReturn(jobWithStatus(1, null)); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(pollingExecutor, taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + @Test + public void testRegisterBatchJobInformerJobStatusPollingDeleted() throws Exception { + K8sTaskExecutor pollingExecutor = new K8sTaskExecutor(getTaskRequest()) { + + @Override + protected long getJobStatusPollIntervalSeconds() { + return 1L; + } + }; + injectK8sUtils(pollingExecutor, k8sUtils); + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))).thenReturn(null); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(pollingExecutor, taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + private static final class WatcherHarness { private SharedIndexInformer informer; @@ -253,7 +306,7 @@ public void testSetTaskStatusNormal() { int jobStatus = 0; TaskResponse taskResponse = new TaskResponse(); k8sTaskExecutor.setJob(job); - k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse); + k8sTaskExecutor.setTaskStatus(jobStatus, job.getMetadata().getName(), taskResponse); Assertions.assertEquals(0, taskResponse.getExitStatusCode()); } @Test From 7f8bb62064ee748cb68f23951cffff1983fdd83d Mon Sep 17 00:00:00 2001 From: luxiaolong-ct <294671909+luxiaolong-ct@users.noreply.github.com> Date: Wed, 24 Jun 2026 13:47:06 +0800 Subject: [PATCH 4/6] Remove unused taskInstanceId from job watcher path --- .../task/api/k8s/impl/K8sTaskExecutor.java | 23 ++++++++----------- .../task/api/k8s/K8sTaskExecutorTest.java | 6 ++--- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index cf1fce05ff97..b1f0021d43ef 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -200,7 +200,7 @@ public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { } - public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { + public void registerBatchJobWatcher(Job job, TaskResponse taskResponse) { final String jobName = job.getMetadata().getName(); final String namespace = job.getMetadata().getNamespace(); final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -208,7 +208,7 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse ScheduledExecutorService jobStatusPoller = null; try { informer = k8sUtils.createBatchJobInformer(jobName, namespace, - createBatchJobEventHandler(taskInstanceId, taskResponse, countDownLatch)); + createBatchJobEventHandler(taskResponse, countDownLatch)); informer.start().whenComplete((v, ex) -> { if (ex != null && countDownLatch.getCount() > 0) { withTaskLogContext(() -> { @@ -218,7 +218,7 @@ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse }); } }); - jobStatusPoller = startJobStatusPoller(jobName, namespace, taskInstanceId, taskResponse, countDownLatch); + jobStatusPoller = startJobStatusPoller(jobName, namespace, taskResponse, countDownLatch); awaitJobCompletion(countDownLatch); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); @@ -246,14 +246,13 @@ protected long getJobStatusPollIntervalSeconds() { private ScheduledExecutorService startJobStatusPoller(String jobName, String namespace, - String taskInstanceId, TaskResponse taskResponse, CountDownLatch countDownLatch) { ScheduledExecutorService jobStatusPoller = ThreadUtils.newSingleDaemonScheduledExecutorService( "K8sJobStatusPoller-" + jobName); long pollIntervalSeconds = getJobStatusPollIntervalSeconds(); jobStatusPoller.scheduleAtFixedRate( - () -> pollBatchJobStatus(jobName, namespace, taskInstanceId, taskResponse, countDownLatch), + () -> pollBatchJobStatus(jobName, namespace, taskResponse, countDownLatch), pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS); @@ -262,7 +261,6 @@ private ScheduledExecutorService startJobStatusPoller(String jobName, private void pollBatchJobStatus(String jobName, String namespace, - String taskInstanceId, TaskResponse taskResponse, CountDownLatch countDownLatch) { if (countDownLatch.getCount() == 0) { @@ -278,14 +276,14 @@ private void pollBatchJobStatus(String jobName, countDownLatch.countDown(); return; } - handleBatchJobTerminalStatus(polledJob, taskInstanceId, taskResponse, countDownLatch); + handleBatchJobTerminalStatus(polledJob, taskResponse, countDownLatch); } catch (Exception e) { log.warn("[K8sJobExecutor-{}] failed to poll job status: {}", jobName, e.getMessage()); } }); } - private ResourceEventHandler createBatchJobEventHandler(String taskInstanceId, TaskResponse taskResponse, + private ResourceEventHandler createBatchJobEventHandler(TaskResponse taskResponse, CountDownLatch countDownLatch) { return new ResourceEventHandler() { @@ -293,7 +291,7 @@ private ResourceEventHandler createBatchJobEventHandler(String taskInstance public void onAdd(Job watchedJob) { withTaskLogContext(() -> { log.info("event received, job: {}, action: ADD", watchedJob.getMetadata().getName()); - handleBatchJobTerminalStatus(watchedJob, taskInstanceId, taskResponse, countDownLatch); + handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch); }); } @@ -301,7 +299,7 @@ public void onAdd(Job watchedJob) { public void onUpdate(Job oldJob, Job watchedJob) { withTaskLogContext(() -> { log.info("event received, job: {}, action: UPDATE", watchedJob.getMetadata().getName()); - handleBatchJobTerminalStatus(watchedJob, taskInstanceId, taskResponse, countDownLatch); + handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch); }); } @@ -344,7 +342,7 @@ private void withTaskLogContext(Runnable action) { } } - private void handleBatchJobTerminalStatus(Job watchedJob, String taskInstanceId, TaskResponse taskResponse, + private void handleBatchJobTerminalStatus(Job watchedJob, TaskResponse taskResponse, CountDownLatch countDownLatch) { if (countDownLatch.getCount() == 0) { return; @@ -395,7 +393,6 @@ private void parsePodLogOutput() { @Override public TaskResponse run(String k8sParameterStr) throws Exception { TaskResponse result = new TaskResponse(); - int taskInstanceId = taskRequest.getTaskInstanceId(); try { if (StringUtils.isEmpty(k8sParameterStr)) { return result; @@ -405,7 +402,7 @@ public TaskResponse run(String k8sParameterStr) throws Exception { k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); parsePodLogOutput(); - registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result); + registerBatchJobWatcher(job, result); if (podLogOutputFuture != null) { try { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index 6e0ebb60198e..ab476af675dd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -134,8 +134,7 @@ private WatcherHarness startBatchJobWatcher(K8sTaskExecutor executor, handlerReady.countDown(); return harness.informer; }); - harness.thread = new Thread(() -> executor.registerBatchJobWatcher(job, - String.valueOf(taskInstanceId), taskResponse)); + harness.thread = new Thread(() -> executor.registerBatchJobWatcher(job, taskResponse)); harness.thread.start(); Assertions.assertTrue(handlerReady.await(5, TimeUnit.SECONDS)); harness.handler = handlerRef.get(); @@ -218,8 +217,7 @@ public void testRegisterBatchJobInformerInformerStartFailed() throws Exception { TaskResponse taskResponse = new TaskResponse(); doThrow(new RuntimeException("informer start failed")).when(k8sUtils) .createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any()); - Thread thread = new Thread(() -> k8sTaskExecutor.registerBatchJobWatcher(job, - String.valueOf(taskInstanceId), taskResponse)); + Thread thread = new Thread(() -> k8sTaskExecutor.registerBatchJobWatcher(job, taskResponse)); thread.start(); thread.join(5000); assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); From 9d67b032c0230275e2677d23c20fe27196216e99 Mon Sep 17 00:00:00 2001 From: luxiaolong-ct <294671909+luxiaolong-ct@users.noreply.github.com> Date: Mon, 29 Jun 2026 09:30:47 +0800 Subject: [PATCH 5/6] [Improvement-17330][K8s] Fail task after consecutive GET poll failures Co-authored-by: Cursor --- .../task/api/k8s/impl/K8sTaskExecutor.java | 35 +++++++++++++++---- .../task/api/k8s/K8sTaskExecutorTest.java | 29 +++++++++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index b1f0021d43ef..7e36a95627b5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -62,6 +62,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.api.model.Affinity; @@ -85,6 +86,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { private static final long JOB_STATUS_POLL_INTERVAL_SECONDS = 30L; + private static final int MAX_CONSECUTIVE_POLL_FAILURES = 3; private Job job; protected boolean podLogOutputIsFinished = false; @@ -218,7 +220,9 @@ public void registerBatchJobWatcher(Job job, TaskResponse taskResponse) { }); } }); - jobStatusPoller = startJobStatusPoller(jobName, namespace, taskResponse, countDownLatch); + AtomicInteger consecutivePollFailures = new AtomicInteger(0); + jobStatusPoller = startJobStatusPoller(jobName, namespace, taskResponse, countDownLatch, + consecutivePollFailures); awaitJobCompletion(countDownLatch); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); @@ -238,21 +242,31 @@ public void registerBatchJobWatcher(Job job, TaskResponse taskResponse) { } /** - * Returns the interval for polling Job status. Protected so unit tests can override with a shorter value. + * Returns the interval for polling Job status. + * Protected so unit tests can override with a shorter value. */ protected long getJobStatusPollIntervalSeconds() { return JOB_STATUS_POLL_INTERVAL_SECONDS; } + /** + * Returns the max consecutive GET poll failures before failing the task. + * Protected so unit tests can override with a lower value. + */ + protected int getMaxConsecutivePollFailures() { + return MAX_CONSECUTIVE_POLL_FAILURES; + } + private ScheduledExecutorService startJobStatusPoller(String jobName, String namespace, TaskResponse taskResponse, - CountDownLatch countDownLatch) { + CountDownLatch countDownLatch, + AtomicInteger consecutivePollFailures) { ScheduledExecutorService jobStatusPoller = ThreadUtils.newSingleDaemonScheduledExecutorService( "K8sJobStatusPoller-" + jobName); long pollIntervalSeconds = getJobStatusPollIntervalSeconds(); jobStatusPoller.scheduleAtFixedRate( - () -> pollBatchJobStatus(jobName, namespace, taskResponse, countDownLatch), + () -> pollBatchJobStatus(jobName, namespace, taskResponse, countDownLatch, consecutivePollFailures), pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS); @@ -262,7 +276,8 @@ private ScheduledExecutorService startJobStatusPoller(String jobName, private void pollBatchJobStatus(String jobName, String namespace, TaskResponse taskResponse, - CountDownLatch countDownLatch) { + CountDownLatch countDownLatch, + AtomicInteger consecutivePollFailures) { if (countDownLatch.getCount() == 0) { return; } @@ -270,6 +285,7 @@ private void pollBatchJobStatus(String jobName, try { log.info("[K8sJobExecutor-{}] polling job status via GET", jobName); Job polledJob = k8sUtils.getJob(jobName, namespace); + consecutivePollFailures.set(0); if (polledJob == null) { log.error("[K8sJobExecutor-{}] job not found during status polling", jobName); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); @@ -278,7 +294,14 @@ private void pollBatchJobStatus(String jobName, } handleBatchJobTerminalStatus(polledJob, taskResponse, countDownLatch); } catch (Exception e) { - log.warn("[K8sJobExecutor-{}] failed to poll job status: {}", jobName, e.getMessage()); + int failures = consecutivePollFailures.incrementAndGet(); + log.warn("[K8sJobExecutor-{}] failed to poll job status ({}/{}): {}", + jobName, failures, getMaxConsecutivePollFailures(), e.getMessage()); + if (failures >= getMaxConsecutivePollFailures()) { + log.error("[K8sJobExecutor-{}] fail in k8s: exceeded consecutive poll failure limit", jobName); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + countDownLatch.countDown(); + } } }); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index ab476af675dd..cadeae3b80f6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -285,6 +285,35 @@ protected long getJobStatusPollIntervalSeconds() { verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); } + @Test + public void testRegisterBatchJobInformerConsecutivePollFailuresWithoutTimeoutStrategy() throws Exception { + TaskExecutionContext taskRequest = getTaskRequest(); + taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.WARN); + + K8sTaskExecutor pollingExecutor = new K8sTaskExecutor(taskRequest) { + + @Override + protected long getJobStatusPollIntervalSeconds() { + return 1L; + } + + @Override + protected int getMaxConsecutivePollFailures() { + return 2; + } + }; + injectK8sUtils(pollingExecutor, k8sUtils); + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) + .thenThrow(new RuntimeException("apiserver unreachable")); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startBatchJobWatcher(pollingExecutor, taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + verify(k8sUtils, org.mockito.Mockito.atLeast(2)) + .getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + private static final class WatcherHarness { private SharedIndexInformer informer; From 6649af518d2f36c16714513cd9ac4f32621181da Mon Sep 17 00:00:00 2001 From: luxiaolong-ct <294671909+luxiaolong-ct@users.noreply.github.com> Date: Thu, 2 Jul 2026 17:14:51 +0800 Subject: [PATCH 6/6] [Improvement-17330][K8s] Extract job monitor for k8s task --- .../task/api/k8s/impl/K8sJobMonitor.java | 268 ++++++++++++++++++ .../task/api/k8s/impl/K8sTaskExecutor.java | 215 +------------- .../task/api/k8s/K8sTaskExecutorTest.java | 265 ----------------- .../task/api/k8s/impl/K8sJobMonitorTest.java | 262 +++++++++++++++++ 4 files changed, 535 insertions(+), 475 deletions(-) create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java new file mode 100644 index 000000000000..7145affbf499 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +/** + * Monitors a submitted Kubernetes Batch Job until it reaches a terminal state. + * The GET polling path is a bounded safety net when informer events are missed or unavailable. + */ +@Slf4j +public class K8sJobMonitor { + + static final long DEFAULT_POLL_INTERVAL_SECONDS = 30L; + static final int DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES = 3; + + private final K8sUtils k8sUtils; + private final TaskExecutionContext taskRequest; + private final long pollIntervalSeconds; + private final int maxConsecutivePollFailures; + + public K8sJobMonitor(K8sUtils k8sUtils, TaskExecutionContext taskRequest) { + this(k8sUtils, taskRequest, DEFAULT_POLL_INTERVAL_SECONDS, DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES); + } + + K8sJobMonitor(K8sUtils k8sUtils, + TaskExecutionContext taskRequest, + long pollIntervalSeconds, + int maxConsecutivePollFailures) { + this.k8sUtils = k8sUtils; + this.taskRequest = taskRequest; + this.pollIntervalSeconds = pollIntervalSeconds; + this.maxConsecutivePollFailures = maxConsecutivePollFailures; + } + + public void monitorUntilTerminal(Job job, TaskResponse taskResponse) { + final String jobName = job.getMetadata().getName(); + final String namespace = job.getMetadata().getNamespace(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicBoolean completed = new AtomicBoolean(false); + SharedIndexInformer informer = null; + ScheduledExecutorService jobStatusPoller = null; + try { + informer = k8sUtils.createBatchJobInformer(jobName, namespace, + createBatchJobEventHandler(taskResponse, countDownLatch, completed)); + informer.start().whenComplete((v, ex) -> { + if (ex != null && !completed.get()) { + withTaskLogContext(() -> { + log.error("[K8sJobMonitor-{}] fail in k8s: {}", jobName, ex.getMessage(), ex); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, jobName); + }); + } + }); + AtomicInteger consecutivePollFailures = new AtomicInteger(0); + jobStatusPoller = startJobStatusPoller(jobName, namespace, taskResponse, countDownLatch, completed, + consecutivePollFailures); + awaitJobCompletion(countDownLatch); + } catch (InterruptedException e) { + log.error("job failed in k8s: {}", e.getMessage(), e); + Thread.currentThread().interrupt(); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } catch (Exception e) { + log.error("job failed in k8s: {}", e.getMessage(), e); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } finally { + if (jobStatusPoller != null) { + jobStatusPoller.shutdownNow(); + } + if (informer != null) { + informer.stop(); + } + } + } + + private ScheduledExecutorService startJobStatusPoller(String jobName, + String namespace, + TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed, + AtomicInteger consecutivePollFailures) { + ScheduledExecutorService jobStatusPoller = ThreadUtils.newSingleDaemonScheduledExecutorService( + "K8sJobStatusPoller-" + jobName); + jobStatusPoller.scheduleAtFixedRate( + () -> pollBatchJobStatus(jobName, namespace, taskResponse, countDownLatch, completed, + consecutivePollFailures), + pollIntervalSeconds, + pollIntervalSeconds, + TimeUnit.SECONDS); + return jobStatusPoller; + } + + private void pollBatchJobStatus(String jobName, + String namespace, + TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed, + AtomicInteger consecutivePollFailures) { + if (completed.get()) { + return; + } + withTaskLogContext(() -> { + try { + log.info("[K8sJobMonitor-{}] polling job status via GET", jobName); + Job polledJob = k8sUtils.getJob(jobName, namespace); + consecutivePollFailures.set(0); + if (polledJob == null) { + log.error("[K8sJobMonitor-{}] job not found during status polling", jobName); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, jobName); + return; + } + handleBatchJobTerminalStatus(polledJob, taskResponse, countDownLatch, completed); + } catch (Exception e) { + int failures = consecutivePollFailures.incrementAndGet(); + log.warn("[K8sJobMonitor-{}] failed to poll job status ({}/{}): {}", + jobName, failures, maxConsecutivePollFailures, e.getMessage()); + if (failures >= maxConsecutivePollFailures) { + log.error("[K8sJobMonitor-{}] fail in k8s: exceeded consecutive poll failure limit", jobName); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, jobName); + } + } + }); + } + + private ResourceEventHandler createBatchJobEventHandler(TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed) { + return new ResourceEventHandler() { + + @Override + public void onAdd(Job watchedJob) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: ADD", watchedJob.getMetadata().getName()); + handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch, completed); + }); + } + + @Override + public void onUpdate(Job oldJob, Job watchedJob) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: UPDATE", watchedJob.getMetadata().getName()); + handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch, completed); + }); + } + + @Override + public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { + withTaskLogContext(() -> { + if (completed.get()) { + return; + } + log.info("event received, job: {}, action: DELETE", watchedJob.getMetadata().getName()); + log.error("[K8sJobMonitor-{}] fail in k8s", watchedJob.getMetadata().getName()); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, + watchedJob.getMetadata().getName()); + }); + } + }; + } + + private void awaitJobCompletion(CountDownLatch countDownLatch) throws InterruptedException, TaskException { + boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED + || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + if (timeoutFlag) { + if (!countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)) { + throw new TaskException("K8sTask is timeout"); + } + } else { + countDownLatch.await(); + } + } + + private void withTaskLogContext(Runnable action) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), + taskRequest.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + action.run(); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + + private void handleBatchJobTerminalStatus(Job watchedJob, TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed) { + if (completed.get()) { + return; + } + int jobStatus = getK8sJobStatus(watchedJob); + log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + completeOnce(completed, countDownLatch, taskResponse, jobStatus, watchedJob.getMetadata().getName()); + } + + private static int getK8sJobStatus(Job job) { + JobStatus jobStatus = job.getStatus(); + if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) { + return EXIT_CODE_SUCCESS; + } else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) { + return EXIT_CODE_FAILURE; + } else { + return TaskConstants.RUNNING_CODE; + } + } + + private static void completeOnce(AtomicBoolean completed, + CountDownLatch countDownLatch, + TaskResponse taskResponse, + int jobStatus, + String jobName) { + if (completed.compareAndSet(false, true)) { + setTaskStatus(jobStatus, jobName, taskResponse); + countDownLatch.countDown(); + } + } + + private static void setTaskStatus(int jobStatus, String jobName, TaskResponse taskResponse) { + if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { + if (jobStatus == EXIT_CODE_SUCCESS) { + log.info("[K8sJobMonitor-{}] succeed in k8s", jobName); + taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); + } else { + log.error("[K8sJobMonitor-{}] fail in k8s", jobName); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 7e36a95627b5..29be736f8b5e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE; @@ -35,10 +34,8 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -56,13 +53,9 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.api.model.Affinity; @@ -74,10 +67,7 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; -import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; import io.fabric8.kubernetes.client.dsl.LogWatch; -import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; /** * K8sTaskExecutor used to submit k8s task to K8S @@ -85,9 +75,6 @@ @Slf4j public class K8sTaskExecutor extends AbstractK8sTaskExecutor { - private static final long JOB_STATUS_POLL_INTERVAL_SECONDS = 30L; - private static final int MAX_CONSECUTIVE_POLL_FAILURES = 3; - private Job job; protected boolean podLogOutputIsFinished = false; protected Future podLogOutputFuture; @@ -202,181 +189,12 @@ public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { } - public void registerBatchJobWatcher(Job job, TaskResponse taskResponse) { - final String jobName = job.getMetadata().getName(); - final String namespace = job.getMetadata().getNamespace(); - final CountDownLatch countDownLatch = new CountDownLatch(1); - SharedIndexInformer informer = null; - ScheduledExecutorService jobStatusPoller = null; - try { - informer = k8sUtils.createBatchJobInformer(jobName, namespace, - createBatchJobEventHandler(taskResponse, countDownLatch)); - informer.start().whenComplete((v, ex) -> { - if (ex != null && countDownLatch.getCount() > 0) { - withTaskLogContext(() -> { - log.error("[K8sJobExecutor-{}] fail in k8s: {}", jobName, ex.getMessage(), ex); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - }); - } - }); - AtomicInteger consecutivePollFailures = new AtomicInteger(0); - jobStatusPoller = startJobStatusPoller(jobName, namespace, taskResponse, countDownLatch, - consecutivePollFailures); - awaitJobCompletion(countDownLatch); - } catch (InterruptedException e) { - log.error("job failed in k8s: {}", e.getMessage(), e); - Thread.currentThread().interrupt(); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } catch (Exception e) { - log.error("job failed in k8s: {}", e.getMessage(), e); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } finally { - if (jobStatusPoller != null) { - jobStatusPoller.shutdownNow(); - } - if (informer != null) { - informer.stop(); - } - } - } - - /** - * Returns the interval for polling Job status. - * Protected so unit tests can override with a shorter value. - */ - protected long getJobStatusPollIntervalSeconds() { - return JOB_STATUS_POLL_INTERVAL_SECONDS; - } - - /** - * Returns the max consecutive GET poll failures before failing the task. - * Protected so unit tests can override with a lower value. - */ - protected int getMaxConsecutivePollFailures() { - return MAX_CONSECUTIVE_POLL_FAILURES; + protected K8sJobMonitor createJobMonitor() { + return new K8sJobMonitor(k8sUtils, taskRequest); } - private ScheduledExecutorService startJobStatusPoller(String jobName, - String namespace, - TaskResponse taskResponse, - CountDownLatch countDownLatch, - AtomicInteger consecutivePollFailures) { - ScheduledExecutorService jobStatusPoller = ThreadUtils.newSingleDaemonScheduledExecutorService( - "K8sJobStatusPoller-" + jobName); - long pollIntervalSeconds = getJobStatusPollIntervalSeconds(); - jobStatusPoller.scheduleAtFixedRate( - () -> pollBatchJobStatus(jobName, namespace, taskResponse, countDownLatch, consecutivePollFailures), - pollIntervalSeconds, - pollIntervalSeconds, - TimeUnit.SECONDS); - return jobStatusPoller; - } - - private void pollBatchJobStatus(String jobName, - String namespace, - TaskResponse taskResponse, - CountDownLatch countDownLatch, - AtomicInteger consecutivePollFailures) { - if (countDownLatch.getCount() == 0) { - return; - } - withTaskLogContext(() -> { - try { - log.info("[K8sJobExecutor-{}] polling job status via GET", jobName); - Job polledJob = k8sUtils.getJob(jobName, namespace); - consecutivePollFailures.set(0); - if (polledJob == null) { - log.error("[K8sJobExecutor-{}] job not found during status polling", jobName); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - return; - } - handleBatchJobTerminalStatus(polledJob, taskResponse, countDownLatch); - } catch (Exception e) { - int failures = consecutivePollFailures.incrementAndGet(); - log.warn("[K8sJobExecutor-{}] failed to poll job status ({}/{}): {}", - jobName, failures, getMaxConsecutivePollFailures(), e.getMessage()); - if (failures >= getMaxConsecutivePollFailures()) { - log.error("[K8sJobExecutor-{}] fail in k8s: exceeded consecutive poll failure limit", jobName); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - } - } - }); - } - - private ResourceEventHandler createBatchJobEventHandler(TaskResponse taskResponse, - CountDownLatch countDownLatch) { - return new ResourceEventHandler() { - - @Override - public void onAdd(Job watchedJob) { - withTaskLogContext(() -> { - log.info("event received, job: {}, action: ADD", watchedJob.getMetadata().getName()); - handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch); - }); - } - - @Override - public void onUpdate(Job oldJob, Job watchedJob) { - withTaskLogContext(() -> { - log.info("event received, job: {}, action: UPDATE", watchedJob.getMetadata().getName()); - handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch); - }); - } - - @Override - public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { - withTaskLogContext(() -> { - if (countDownLatch.getCount() == 0) { - return; - } - log.info("event received, job: {}, action: DELETE", watchedJob.getMetadata().getName()); - log.error("[K8sJobExecutor-{}] fail in k8s", watchedJob.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - }); - } - }; - } - - private void awaitJobCompletion(CountDownLatch countDownLatch) throws InterruptedException { - boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED - || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; - if (timeoutFlag) { - if (!countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)) { - waitTimeout(true); - } - } else { - countDownLatch.await(); - } - } - - private void withTaskLogContext(Runnable action) { - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), - taskRequest.getTaskInstanceId()); - LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - action.run(); - } finally { - LogUtils.removeTaskInstanceLogFullPathMDC(); - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } - - private void handleBatchJobTerminalStatus(Job watchedJob, TaskResponse taskResponse, - CountDownLatch countDownLatch) { - if (countDownLatch.getCount() == 0) { - return; - } - int jobStatus = getK8sJobStatus(watchedJob); - log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); - if (jobStatus == TaskConstants.RUNNING_CODE) { - return; - } - setTaskStatus(jobStatus, watchedJob.getMetadata().getName(), taskResponse); - countDownLatch.countDown(); + public void monitorBatchJob(Job job, TaskResponse taskResponse) { + createJobMonitor().monitorUntilTerminal(job, taskResponse); } private void parsePodLogOutput() { @@ -425,7 +243,7 @@ public TaskResponse run(String k8sParameterStr) throws Exception { k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); parsePodLogOutput(); - registerBatchJobWatcher(job, result); + monitorBatchJob(job, result); if (podLogOutputFuture != null) { try { @@ -488,29 +306,6 @@ public void stopJobOnK8s(String k8sParameterStr) { } } - public int getK8sJobStatus(Job job) { - JobStatus jobStatus = job.getStatus(); - if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) { - return EXIT_CODE_SUCCESS; - } else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) { - return EXIT_CODE_FAILURE; - } else { - return TaskConstants.RUNNING_CODE; - } - } - - public void setTaskStatus(int jobStatus, String jobName, TaskResponse taskResponse) { - if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { - if (jobStatus == EXIT_CODE_SUCCESS) { - log.info("[K8sJobExecutor-{}] succeed in k8s", jobName); - taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); - } else { - log.error("[K8sJobExecutor-{}] fail in k8s", jobName); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } - } - } - public Job getJob() { return job; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index cadeae3b80f6..252fccbd735b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -17,32 +17,14 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; -import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; -import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -50,9 +32,6 @@ import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; import io.fabric8.kubernetes.api.model.batch.v1.Job; -import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; -import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; public class K8sTaskExecutorTest { @@ -66,7 +45,6 @@ public class K8sTaskExecutorTest { private final int taskInstanceId = 1000; private final String taskName = "k8s_task_test"; private Job job; - private K8sUtils k8sUtils; @BeforeEach public void before() throws Exception { @@ -81,8 +59,6 @@ public void before() throws Exception { requirement.setOperator("In"); requirement.setValues(Arrays.asList("1234", "123456")); k8sTaskExecutor = new K8sTaskExecutor(taskRequest); - k8sUtils = mock(K8sUtils.class); - injectK8sUtils(k8sTaskExecutor, k8sUtils); k8sTaskMainParameters = new K8sTaskMainParameters(); k8sTaskMainParameters.setImage(image); k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy); @@ -96,246 +72,6 @@ public void before() throws Exception { job = k8sTaskExecutor.getJob(); } - private void injectK8sUtils(K8sTaskExecutor executor, K8sUtils mockK8sUtils) throws Exception { - Class type = executor.getClass(); - while (type != null) { - try { - Field field = type.getDeclaredField("k8sUtils"); - field.setAccessible(true); - field.set(executor, mockK8sUtils); - return; - } catch (NoSuchFieldException ignored) { - type = type.getSuperclass(); - } - } - throw new NoSuchFieldException("k8sUtils"); - } - - private TaskExecutionContext getTaskRequest() throws Exception { - Field field = k8sTaskExecutor.getClass().getSuperclass().getDeclaredField("taskRequest"); - field.setAccessible(true); - return (TaskExecutionContext) field.get(k8sTaskExecutor); - } - - private WatcherHarness startBatchJobWatcher(TaskResponse taskResponse) throws InterruptedException { - return startBatchJobWatcher(k8sTaskExecutor, taskResponse); - } - - private WatcherHarness startBatchJobWatcher(K8sTaskExecutor executor, - TaskResponse taskResponse) throws InterruptedException { - WatcherHarness harness = new WatcherHarness(); - harness.informer = mock(SharedIndexInformer.class); - when(harness.informer.start()).thenReturn(CompletableFuture.completedFuture(null)); - CountDownLatch handlerReady = new CountDownLatch(1); - AtomicReference> handlerRef = new AtomicReference<>(); - when(k8sUtils.createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any())) - .thenAnswer(invocation -> { - handlerRef.set(invocation.getArgument(2)); - handlerReady.countDown(); - return harness.informer; - }); - harness.thread = new Thread(() -> executor.registerBatchJobWatcher(job, taskResponse)); - harness.thread.start(); - Assertions.assertTrue(handlerReady.await(5, TimeUnit.SECONDS)); - harness.handler = handlerRef.get(); - return harness; - } - - private void finishWatcher(WatcherHarness harness) throws InterruptedException { - harness.thread.join(5000); - verify(harness.informer).stop(); - } - - private Job jobWithStatus(Integer succeeded, Integer failed) { - JobStatus status = new JobStatus(); - status.setSucceeded(succeeded); - status.setFailed(failed); - Job watchedJob = new Job(); - watchedJob.setMetadata(job.getMetadata()); - watchedJob.setStatus(status); - return watchedJob; - } - - @Test - public void testRegisterBatchJobInformerOnUpdateSuccess() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onUpdate(job, jobWithStatus(1, null)); - finishWatcher(harness); - assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerOnUpdateFailed() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onUpdate(job, jobWithStatus(null, 1)); - finishWatcher(harness); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerOnDelete() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onDelete(job, false); - finishWatcher(harness); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerOnAddSuccess() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onAdd(jobWithStatus(1, null)); - finishWatcher(harness); - assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerOnAddFailed() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onAdd(jobWithStatus(null, 1)); - finishWatcher(harness); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerOnAddRunning() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onAdd(jobWithStatus(null, null)); - Assertions.assertTrue(harness.thread.isAlive()); - harness.handler.onUpdate(job, jobWithStatus(1, null)); - finishWatcher(harness); - assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerInformerStartFailed() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - doThrow(new RuntimeException("informer start failed")).when(k8sUtils) - .createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any()); - Thread thread = new Thread(() -> k8sTaskExecutor.registerBatchJobWatcher(job, taskResponse)); - thread.start(); - thread.join(5000); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerIgnoreRunningUpdate() throws Exception { - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - harness.handler.onUpdate(job, jobWithStatus(null, null)); - Assertions.assertTrue(harness.thread.isAlive()); - harness.handler.onUpdate(job, jobWithStatus(1, null)); - finishWatcher(harness); - assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerTimeout() throws Exception { - TaskExecutionContext taskRequest = getTaskRequest(); - taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.FAILED); - taskRequest.setTaskTimeout(1); - - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(taskResponse); - finishWatcher(harness); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - } - - @Test - public void testRegisterBatchJobInformerJobStatusPollingSuccess() throws Exception { - K8sTaskExecutor pollingExecutor = new K8sTaskExecutor(getTaskRequest()) { - - @Override - protected long getJobStatusPollIntervalSeconds() { - return 1L; - } - }; - injectK8sUtils(pollingExecutor, k8sUtils); - when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) - .thenReturn(jobWithStatus(1, null)); - - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(pollingExecutor, taskResponse); - finishWatcher(harness); - assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); - verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); - } - - @Test - public void testRegisterBatchJobInformerJobStatusPollingDeleted() throws Exception { - K8sTaskExecutor pollingExecutor = new K8sTaskExecutor(getTaskRequest()) { - - @Override - protected long getJobStatusPollIntervalSeconds() { - return 1L; - } - }; - injectK8sUtils(pollingExecutor, k8sUtils); - when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))).thenReturn(null); - - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(pollingExecutor, taskResponse); - finishWatcher(harness); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); - } - - @Test - public void testRegisterBatchJobInformerConsecutivePollFailuresWithoutTimeoutStrategy() throws Exception { - TaskExecutionContext taskRequest = getTaskRequest(); - taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.WARN); - - K8sTaskExecutor pollingExecutor = new K8sTaskExecutor(taskRequest) { - - @Override - protected long getJobStatusPollIntervalSeconds() { - return 1L; - } - - @Override - protected int getMaxConsecutivePollFailures() { - return 2; - } - }; - injectK8sUtils(pollingExecutor, k8sUtils); - when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) - .thenThrow(new RuntimeException("apiserver unreachable")); - - TaskResponse taskResponse = new TaskResponse(); - WatcherHarness harness = startBatchJobWatcher(pollingExecutor, taskResponse); - finishWatcher(harness); - assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); - verify(k8sUtils, org.mockito.Mockito.atLeast(2)) - .getJob(eq(job.getMetadata().getName()), eq(namespace)); - } - - private static final class WatcherHarness { - - private SharedIndexInformer informer; - private ResourceEventHandler handler; - private Thread thread; - } - - @Test - public void testGetK8sJobStatusNormal() { - JobStatus jobStatus = new JobStatus(); - jobStatus.setSucceeded(1); - job.setStatus(jobStatus); - Assertions.assertEquals(0, Integer.compare(0, k8sTaskExecutor.getK8sJobStatus(job))); - } - @Test - public void testSetTaskStatusNormal() { - int jobStatus = 0; - TaskResponse taskResponse = new TaskResponse(); - k8sTaskExecutor.setJob(job); - k8sTaskExecutor.setTaskStatus(jobStatus, job.getMetadata().getName(), taskResponse); - Assertions.assertEquals(0, taskResponse.getExitStatusCode()); - } @Test public void testWaitTimeoutNormal() { try { @@ -352,5 +88,4 @@ public void testLoadYamlCorrectly() { k8sTaskExecutor.getJob().getSpec().getTemplate().getSpec().getContainers().get(0).getCommand(); Assertions.assertEquals(expectedCommands, actualCommands); } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java new file mode 100644 index 000000000000..9cc7285ada43 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +public class K8sJobMonitorTest { + + private final String namespace = "namespace"; + private final int taskInstanceId = 1000; + private final String taskName = "k8s_task_test"; + private Job job; + private K8sUtils k8sUtils; + private TaskExecutionContext taskRequest; + + @BeforeEach + public void before() throws Exception { + taskRequest = new TaskExecutionContext(); + taskRequest.setTaskInstanceId(taskInstanceId); + taskRequest.setTaskName(taskName); + k8sUtils = mock(K8sUtils.class); + job = new JobBuilder() + .withNewMetadata() + .withName(String.format("%s-%s", taskName, taskInstanceId)) + .withNamespace(namespace) + .endMetadata() + .build(); + } + + private K8sJobMonitor createMonitor(TaskExecutionContext taskExecutionContext) { + return createMonitor(taskExecutionContext, K8sJobMonitor.DEFAULT_POLL_INTERVAL_SECONDS, + K8sJobMonitor.DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES); + } + + private K8sJobMonitor createMonitor(TaskExecutionContext taskExecutionContext, + long pollIntervalSeconds, + int maxConsecutivePollFailures) { + return new K8sJobMonitor(k8sUtils, taskExecutionContext, pollIntervalSeconds, maxConsecutivePollFailures); + } + + private WatcherHarness startMonitor(K8sJobMonitor monitor, TaskResponse taskResponse) throws InterruptedException { + WatcherHarness harness = new WatcherHarness(); + harness.informer = mock(SharedIndexInformer.class); + when(harness.informer.start()).thenReturn(CompletableFuture.completedFuture(null)); + CountDownLatch handlerReady = new CountDownLatch(1); + AtomicReference> handlerRef = new AtomicReference<>(); + when(k8sUtils.createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any())) + .thenAnswer(invocation -> { + handlerRef.set(invocation.getArgument(2)); + handlerReady.countDown(); + return harness.informer; + }); + harness.thread = new Thread(() -> monitor.monitorUntilTerminal(job, taskResponse)); + harness.thread.start(); + Assertions.assertTrue(handlerReady.await(5, TimeUnit.SECONDS)); + harness.handler = handlerRef.get(); + return harness; + } + + private void finishWatcher(WatcherHarness harness) throws InterruptedException { + harness.thread.join(5000); + verify(harness.informer).stop(); + } + + private Job jobWithStatus(Integer succeeded, Integer failed) { + JobStatus status = new JobStatus(); + status.setSucceeded(succeeded); + status.setFailed(failed); + Job watchedJob = new Job(); + watchedJob.setMetadata(job.getMetadata()); + watchedJob.setStatus(status); + return watchedJob; + } + + @Test + public void testMonitorOnUpdateSuccess() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnUpdateFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(null, 1)); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnDelete() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onDelete(job, false); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnAddSuccess() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onAdd(jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnAddFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onAdd(jobWithStatus(null, 1)); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnAddRunning() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onAdd(jobWithStatus(null, null)); + Assertions.assertTrue(harness.thread.isAlive()); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorInformerStartFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + doThrow(new RuntimeException("informer start failed")).when(k8sUtils) + .createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any()); + Thread thread = new Thread(() -> createMonitor(taskRequest).monitorUntilTerminal(job, taskResponse)); + thread.start(); + thread.join(5000); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorIgnoreRunningUpdate() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(null, null)); + Assertions.assertTrue(harness.thread.isAlive()); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorDoesNotOverwriteFirstTerminalStatus() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + harness.handler.onDelete(job, false); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorTimeout() throws Exception { + taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.FAILED); + taskRequest.setTaskTimeout(1); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorJobStatusPollingSuccess() throws Exception { + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) + .thenReturn(jobWithStatus(1, null)); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest, 1L, + K8sJobMonitor.DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + @Test + public void testMonitorJobStatusPollingDeleted() throws Exception { + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))).thenReturn(null); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest, 1L, + K8sJobMonitor.DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + @Test + public void testMonitorConsecutivePollFailuresWithoutTimeoutStrategy() throws Exception { + taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.WARN); + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) + .thenThrow(new RuntimeException("apiserver unreachable")); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest, 1L, 2), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + verify(k8sUtils, atLeast(2)).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + private static final class WatcherHarness { + + private SharedIndexInformer informer; + private ResourceEventHandler handler; + private Thread thread; + } +}