diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java new file mode 100644 index 000000000000..7145affbf499 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitor.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +/** + * Monitors a submitted Kubernetes Batch Job until it reaches a terminal state. + * The GET polling path is a bounded safety net when informer events are missed or unavailable. + */ +@Slf4j +public class K8sJobMonitor { + + static final long DEFAULT_POLL_INTERVAL_SECONDS = 30L; + static final int DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES = 3; + + private final K8sUtils k8sUtils; + private final TaskExecutionContext taskRequest; + private final long pollIntervalSeconds; + private final int maxConsecutivePollFailures; + + public K8sJobMonitor(K8sUtils k8sUtils, TaskExecutionContext taskRequest) { + this(k8sUtils, taskRequest, DEFAULT_POLL_INTERVAL_SECONDS, DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES); + } + + K8sJobMonitor(K8sUtils k8sUtils, + TaskExecutionContext taskRequest, + long pollIntervalSeconds, + int maxConsecutivePollFailures) { + this.k8sUtils = k8sUtils; + this.taskRequest = taskRequest; + this.pollIntervalSeconds = pollIntervalSeconds; + this.maxConsecutivePollFailures = maxConsecutivePollFailures; + } + + public void monitorUntilTerminal(Job job, TaskResponse taskResponse) { + final String jobName = job.getMetadata().getName(); + final String namespace = job.getMetadata().getNamespace(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicBoolean completed = new AtomicBoolean(false); + SharedIndexInformer informer = null; + ScheduledExecutorService jobStatusPoller = null; + try { + informer = k8sUtils.createBatchJobInformer(jobName, namespace, + createBatchJobEventHandler(taskResponse, countDownLatch, completed)); + informer.start().whenComplete((v, ex) -> { + if (ex != null && !completed.get()) { + withTaskLogContext(() -> { + log.error("[K8sJobMonitor-{}] fail in k8s: {}", jobName, ex.getMessage(), ex); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, jobName); + }); + } + }); + AtomicInteger consecutivePollFailures = new AtomicInteger(0); + jobStatusPoller = startJobStatusPoller(jobName, namespace, taskResponse, countDownLatch, completed, + consecutivePollFailures); + awaitJobCompletion(countDownLatch); + } catch (InterruptedException e) { + log.error("job failed in k8s: {}", e.getMessage(), e); + Thread.currentThread().interrupt(); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } catch (Exception e) { + log.error("job failed in k8s: {}", e.getMessage(), e); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } finally { + if (jobStatusPoller != null) { + jobStatusPoller.shutdownNow(); + } + if (informer != null) { + informer.stop(); + } + } + } + + private ScheduledExecutorService startJobStatusPoller(String jobName, + String namespace, + TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed, + AtomicInteger consecutivePollFailures) { + ScheduledExecutorService jobStatusPoller = ThreadUtils.newSingleDaemonScheduledExecutorService( + "K8sJobStatusPoller-" + jobName); + jobStatusPoller.scheduleAtFixedRate( + () -> pollBatchJobStatus(jobName, namespace, taskResponse, countDownLatch, completed, + consecutivePollFailures), + pollIntervalSeconds, + pollIntervalSeconds, + TimeUnit.SECONDS); + return jobStatusPoller; + } + + private void pollBatchJobStatus(String jobName, + String namespace, + TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed, + AtomicInteger consecutivePollFailures) { + if (completed.get()) { + return; + } + withTaskLogContext(() -> { + try { + log.info("[K8sJobMonitor-{}] polling job status via GET", jobName); + Job polledJob = k8sUtils.getJob(jobName, namespace); + consecutivePollFailures.set(0); + if (polledJob == null) { + log.error("[K8sJobMonitor-{}] job not found during status polling", jobName); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, jobName); + return; + } + handleBatchJobTerminalStatus(polledJob, taskResponse, countDownLatch, completed); + } catch (Exception e) { + int failures = consecutivePollFailures.incrementAndGet(); + log.warn("[K8sJobMonitor-{}] failed to poll job status ({}/{}): {}", + jobName, failures, maxConsecutivePollFailures, e.getMessage()); + if (failures >= maxConsecutivePollFailures) { + log.error("[K8sJobMonitor-{}] fail in k8s: exceeded consecutive poll failure limit", jobName); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, jobName); + } + } + }); + } + + private ResourceEventHandler createBatchJobEventHandler(TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed) { + return new ResourceEventHandler() { + + @Override + public void onAdd(Job watchedJob) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: ADD", watchedJob.getMetadata().getName()); + handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch, completed); + }); + } + + @Override + public void onUpdate(Job oldJob, Job watchedJob) { + withTaskLogContext(() -> { + log.info("event received, job: {}, action: UPDATE", watchedJob.getMetadata().getName()); + handleBatchJobTerminalStatus(watchedJob, taskResponse, countDownLatch, completed); + }); + } + + @Override + public void onDelete(Job watchedJob, boolean deletedFinalStateUnknown) { + withTaskLogContext(() -> { + if (completed.get()) { + return; + } + log.info("event received, job: {}, action: DELETE", watchedJob.getMetadata().getName()); + log.error("[K8sJobMonitor-{}] fail in k8s", watchedJob.getMetadata().getName()); + completeOnce(completed, countDownLatch, taskResponse, EXIT_CODE_FAILURE, + watchedJob.getMetadata().getName()); + }); + } + }; + } + + private void awaitJobCompletion(CountDownLatch countDownLatch) throws InterruptedException, TaskException { + boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED + || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + if (timeoutFlag) { + if (!countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)) { + throw new TaskException("K8sTask is timeout"); + } + } else { + countDownLatch.await(); + } + } + + private void withTaskLogContext(Runnable action) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), + taskRequest.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + action.run(); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + + private void handleBatchJobTerminalStatus(Job watchedJob, TaskResponse taskResponse, + CountDownLatch countDownLatch, + AtomicBoolean completed) { + if (completed.get()) { + return; + } + int jobStatus = getK8sJobStatus(watchedJob); + log.info("job {} status {}", watchedJob.getMetadata().getName(), jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + completeOnce(completed, countDownLatch, taskResponse, jobStatus, watchedJob.getMetadata().getName()); + } + + private static int getK8sJobStatus(Job job) { + JobStatus jobStatus = job.getStatus(); + if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) { + return EXIT_CODE_SUCCESS; + } else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) { + return EXIT_CODE_FAILURE; + } else { + return TaskConstants.RUNNING_CODE; + } + } + + private static void completeOnce(AtomicBoolean completed, + CountDownLatch countDownLatch, + TaskResponse taskResponse, + int jobStatus, + String jobName) { + if (completed.compareAndSet(false, true)) { + setTaskStatus(jobStatus, jobName, taskResponse); + countDownLatch.countDown(); + } + } + + private static void setTaskStatus(int jobStatus, String jobName, TaskResponse taskResponse) { + if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { + if (jobStatus == EXIT_CODE_SUCCESS) { + log.info("[K8sJobMonitor-{}] succeed in k8s", jobName); + taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); + } else { + log.error("[K8sJobMonitor-{}] fail in k8s", jobName); + taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + } + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f48169c5e150..29be736f8b5e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE; @@ -35,10 +34,8 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -56,11 +53,9 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.api.model.Affinity; @@ -72,10 +67,6 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; -import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.LogWatch; /** @@ -198,63 +189,12 @@ public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { } - public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { - CountDownLatch countDownLatch = new CountDownLatch(1); - Watcher watcher = new Watcher() { - - @Override - public void eventReceived(Action action, Job job) { - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), - taskRequest.getTaskInstanceId()); - LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); - if (action == Action.DELETED) { - log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - } else if (action != Action.ADDED) { - int jobStatus = getK8sJobStatus(job); - log.info("job {} status {}", job.getMetadata().getName(), jobStatus); - if (jobStatus == TaskConstants.RUNNING_CODE) { - return; - } - setTaskStatus(jobStatus, taskInstanceId, taskResponse); - countDownLatch.countDown(); - } - } finally { - LogUtils.removeTaskInstanceLogFullPathMDC(); - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } + protected K8sJobMonitor createJobMonitor() { + return new K8sJobMonitor(k8sUtils, taskRequest); + } - @Override - public void onClose(WatcherException e) { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getWorkflowInstanceId(), - taskRequest.getTaskInstanceId()); - log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), e.getMessage()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - countDownLatch.countDown(); - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - }; - try (Watch watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) { - boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED - || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; - if (timeoutFlag) { - Boolean timeout = !(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS)); - waitTimeout(timeout); - } else { - countDownLatch.await(); - } - } catch (InterruptedException e) { - log.error("job failed in k8s: {}", e.getMessage(), e); - Thread.currentThread().interrupt(); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } catch (Exception e) { - log.error("job failed in k8s: {}", e.getMessage(), e); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } + public void monitorBatchJob(Job job, TaskResponse taskResponse) { + createJobMonitor().monitorUntilTerminal(job, taskResponse); } private void parsePodLogOutput() { @@ -294,7 +234,6 @@ private void parsePodLogOutput() { @Override public TaskResponse run(String k8sParameterStr) throws Exception { TaskResponse result = new TaskResponse(); - int taskInstanceId = taskRequest.getTaskInstanceId(); try { if (StringUtils.isEmpty(k8sParameterStr)) { return result; @@ -304,7 +243,7 @@ public TaskResponse run(String k8sParameterStr) throws Exception { k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); parsePodLogOutput(); - registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result); + monitorBatchJob(job, result); if (podLogOutputFuture != null) { try { @@ -367,29 +306,6 @@ public void stopJobOnK8s(String k8sParameterStr) { } } - public int getK8sJobStatus(Job job) { - JobStatus jobStatus = job.getStatus(); - if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) { - return EXIT_CODE_SUCCESS; - } else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) { - return EXIT_CODE_FAILURE; - } else { - return TaskConstants.RUNNING_CODE; - } - } - - public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse) { - if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { - if (jobStatus == EXIT_CODE_SUCCESS) { - log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); - } else { - log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } - } - } - public Job getJob() { return job; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java index a96f3ebb010e..c8933a1a05b6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java @@ -29,8 +29,8 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @Slf4j public class K8sUtils { @@ -62,24 +62,37 @@ public void deleteJob(String jobName, String namespace) { } } - public Boolean jobExist(String jobName, String namespace) { + public Job getJob(String jobName, String namespace) { try { - Job job = client.batch().v1().jobs().inNamespace(namespace).withName(jobName).get(); - return job != null; + return client.batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(jobName) + .get(); } catch (Exception e) { - throw new TaskException("fail to check job: ", e); + throw new TaskException("fail to get job: " + jobName, e); } } - public Watch createBatchJobWatcher(String jobName, Watcher watcher) { + public Boolean jobExist(String jobName, String namespace) { + return getJob(jobName, namespace) != null; + } + + public SharedIndexInformer createBatchJobInformer(String jobName, + String namespace, + ResourceEventHandler handler) { try { - return client.batch() + SharedIndexInformer informer = client.batch() .v1() .jobs() + .inNamespace(namespace) .withName(jobName) - .watch(watcher); + .runnableInformer(0L); + informer.addEventHandler(handler); + return informer; } catch (Exception e) { - throw new TaskException("fail to register batch job watcher", e); + throw new TaskException("fail to register batch job informer", e); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index d93130caee9a..252fccbd735b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -20,7 +20,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; -import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import java.util.Arrays; import java.util.HashMap; @@ -30,17 +29,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; import io.fabric8.kubernetes.api.model.batch.v1.Job; -import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; public class K8sTaskExecutorTest { - private static final Logger logger = LoggerFactory.getLogger(K8sTaskExecutorTest.class); - private K8sTaskExecutor k8sTaskExecutor = null; private K8sTaskMainParameters k8sTaskMainParameters = null; private final String image = "ds-dev"; @@ -51,8 +45,9 @@ public class K8sTaskExecutorTest { private final int taskInstanceId = 1000; private final String taskName = "k8s_task_test"; private Job job; + @BeforeEach - public void before() { + public void before() throws Exception { TaskExecutionContext taskRequest = new TaskExecutionContext(); taskRequest.setTaskInstanceId(taskInstanceId); taskRequest.setTaskName(taskName); @@ -76,21 +71,7 @@ public void before() { k8sTaskExecutor.buildK8sJob(k8sTaskMainParameters); job = k8sTaskExecutor.getJob(); } - @Test - public void testGetK8sJobStatusNormal() { - JobStatus jobStatus = new JobStatus(); - jobStatus.setSucceeded(1); - job.setStatus(jobStatus); - Assertions.assertEquals(0, Integer.compare(0, k8sTaskExecutor.getK8sJobStatus(job))); - } - @Test - public void testSetTaskStatusNormal() { - int jobStatus = 0; - TaskResponse taskResponse = new TaskResponse(); - k8sTaskExecutor.setJob(job); - k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse); - Assertions.assertEquals(0, taskResponse.getExitStatusCode()); - } + @Test public void testWaitTimeoutNormal() { try { @@ -107,5 +88,4 @@ public void testLoadYamlCorrectly() { k8sTaskExecutor.getJob().getSpec().getTemplate().getSpec().getContainers().get(0).getCommand(); Assertions.assertEquals(expectedCommands, actualCommands); } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java new file mode 100644 index 000000000000..9cc7285ada43 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sJobMonitorTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +public class K8sJobMonitorTest { + + private final String namespace = "namespace"; + private final int taskInstanceId = 1000; + private final String taskName = "k8s_task_test"; + private Job job; + private K8sUtils k8sUtils; + private TaskExecutionContext taskRequest; + + @BeforeEach + public void before() throws Exception { + taskRequest = new TaskExecutionContext(); + taskRequest.setTaskInstanceId(taskInstanceId); + taskRequest.setTaskName(taskName); + k8sUtils = mock(K8sUtils.class); + job = new JobBuilder() + .withNewMetadata() + .withName(String.format("%s-%s", taskName, taskInstanceId)) + .withNamespace(namespace) + .endMetadata() + .build(); + } + + private K8sJobMonitor createMonitor(TaskExecutionContext taskExecutionContext) { + return createMonitor(taskExecutionContext, K8sJobMonitor.DEFAULT_POLL_INTERVAL_SECONDS, + K8sJobMonitor.DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES); + } + + private K8sJobMonitor createMonitor(TaskExecutionContext taskExecutionContext, + long pollIntervalSeconds, + int maxConsecutivePollFailures) { + return new K8sJobMonitor(k8sUtils, taskExecutionContext, pollIntervalSeconds, maxConsecutivePollFailures); + } + + private WatcherHarness startMonitor(K8sJobMonitor monitor, TaskResponse taskResponse) throws InterruptedException { + WatcherHarness harness = new WatcherHarness(); + harness.informer = mock(SharedIndexInformer.class); + when(harness.informer.start()).thenReturn(CompletableFuture.completedFuture(null)); + CountDownLatch handlerReady = new CountDownLatch(1); + AtomicReference> handlerRef = new AtomicReference<>(); + when(k8sUtils.createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any())) + .thenAnswer(invocation -> { + handlerRef.set(invocation.getArgument(2)); + handlerReady.countDown(); + return harness.informer; + }); + harness.thread = new Thread(() -> monitor.monitorUntilTerminal(job, taskResponse)); + harness.thread.start(); + Assertions.assertTrue(handlerReady.await(5, TimeUnit.SECONDS)); + harness.handler = handlerRef.get(); + return harness; + } + + private void finishWatcher(WatcherHarness harness) throws InterruptedException { + harness.thread.join(5000); + verify(harness.informer).stop(); + } + + private Job jobWithStatus(Integer succeeded, Integer failed) { + JobStatus status = new JobStatus(); + status.setSucceeded(succeeded); + status.setFailed(failed); + Job watchedJob = new Job(); + watchedJob.setMetadata(job.getMetadata()); + watchedJob.setStatus(status); + return watchedJob; + } + + @Test + public void testMonitorOnUpdateSuccess() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnUpdateFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(null, 1)); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnDelete() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onDelete(job, false); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnAddSuccess() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onAdd(jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnAddFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onAdd(jobWithStatus(null, 1)); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorOnAddRunning() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onAdd(jobWithStatus(null, null)); + Assertions.assertTrue(harness.thread.isAlive()); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorInformerStartFailed() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + doThrow(new RuntimeException("informer start failed")).when(k8sUtils) + .createBatchJobInformer(eq(job.getMetadata().getName()), eq(namespace), any()); + Thread thread = new Thread(() -> createMonitor(taskRequest).monitorUntilTerminal(job, taskResponse)); + thread.start(); + thread.join(5000); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorIgnoreRunningUpdate() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(null, null)); + Assertions.assertTrue(harness.thread.isAlive()); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorDoesNotOverwriteFirstTerminalStatus() throws Exception { + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + harness.handler.onUpdate(job, jobWithStatus(1, null)); + harness.handler.onDelete(job, false); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorTimeout() throws Exception { + taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.FAILED); + taskRequest.setTaskTimeout(1); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + } + + @Test + public void testMonitorJobStatusPollingSuccess() throws Exception { + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) + .thenReturn(jobWithStatus(1, null)); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest, 1L, + K8sJobMonitor.DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_SUCCESS, taskResponse.getExitStatusCode()); + verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + @Test + public void testMonitorJobStatusPollingDeleted() throws Exception { + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))).thenReturn(null); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest, 1L, + K8sJobMonitor.DEFAULT_MAX_CONSECUTIVE_POLL_FAILURES), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + verify(k8sUtils).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + @Test + public void testMonitorConsecutivePollFailuresWithoutTimeoutStrategy() throws Exception { + taskRequest.setTaskTimeoutStrategy(TaskTimeoutStrategy.WARN); + when(k8sUtils.getJob(eq(job.getMetadata().getName()), eq(namespace))) + .thenThrow(new RuntimeException("apiserver unreachable")); + + TaskResponse taskResponse = new TaskResponse(); + WatcherHarness harness = startMonitor(createMonitor(taskRequest, 1L, 2), taskResponse); + finishWatcher(harness); + assertEquals(EXIT_CODE_FAILURE, taskResponse.getExitStatusCode()); + verify(k8sUtils, atLeast(2)).getJob(eq(job.getMetadata().getName()), eq(namespace)); + } + + private static final class WatcherHarness { + + private SharedIndexInformer informer; + private ResourceEventHandler handler; + private Thread thread; + } +}