diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowLoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowLoggerController.java new file mode 100644 index 000000000000..7b545ed88005 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowLoggerController.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.api.enums.Status.DOWNLOAD_WORKFLOW_INSTANCE_LOG_FILE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_INSTANCE_LOG_ERROR; + +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.WorkflowLoggerService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.dao.entity.ResponseWorkflowLog; +import org.apache.dolphinscheduler.dao.entity.User; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.Parameters; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.tags.Tag; + +/** + * Workflow logger controller + */ +@Tag(name = "WORKFLOW_LOGGER_TAG") +@RestController +@RequestMapping("/workflow-log") +public class WorkflowLoggerController extends BaseController { + + @Autowired + private WorkflowLoggerService workflowLoggerService; + + /** + * query workflow log + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @param skipNum skip number + * @param limit limit + * @return workflow log content + */ + @Operation(summary = "queryWorkflowLog", description = "QUERY_WORKFLOW_INSTANCE_LOG_NOTES") + @Parameters({ + @Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")), + @Parameter(name = "skipLineNum", description = "SKIP_LINE_NUM", required = true, schema = @Schema(implementation = int.class, example = "100")), + @Parameter(name = "limit", description = "LIMIT", required = true, schema = @Schema(implementation = int.class, example = "100")) + }) + @GetMapping(value = "/detail") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_WORKFLOW_INSTANCE_LOG_ERROR) + public Result queryWorkflowLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "workflowInstanceId") int workflowInstanceId, + @RequestParam(value = "skipLineNum") int skipNum, + @RequestParam(value = "limit") int limit) { + return workflowLoggerService.queryWorkflowLog(loginUser, workflowInstanceId, skipNum, limit); + } + + /** + * download workflow log file + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @return log file content + */ + @Operation(summary = "downloadWorkflowLog", description = "DOWNLOAD_WORKFLOW_INSTANCE_LOG_NOTES") + @Parameters({ + @Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")) + }) + @GetMapping(value = "/download-log") + @ResponseBody + @ApiException(DOWNLOAD_WORKFLOW_INSTANCE_LOG_FILE_ERROR) + public ResponseEntity downloadWorkflowLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "workflowInstanceId") int workflowInstanceId) { + byte[] logBytes = workflowLoggerService.getWorkflowLogBytes(loginUser, workflowInstanceId); + return ResponseEntity + .ok() + .header(HttpHeaders.CONTENT_DISPOSITION, + "attachment; filename=\"" + "workflow-" + workflowInstanceId + ".log" + "\"") + .body(logBytes); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index b47a9b108921..7308055b9349 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -292,6 +292,11 @@ public enum Status { UPDATE_PROJECT_PREFERENCE_STATE_ERROR(10303, "Failed to update the state of the project preference", "更新项目偏好设置错误"), VERSION_INFO_STATE_ERROR(10304, "Failed to obtain project version and address", "获取版本信息错误"), + QUERY_WORKFLOW_INSTANCE_LOG_ERROR(10305, "view workflow instance log error: {0}", "查询工作流实例日志错误: {0}"), + DOWNLOAD_WORKFLOW_INSTANCE_LOG_FILE_ERROR(10306, "download workflow instance log file error", "下载工作流日志文件错误"), + WORKFLOW_INSTANCE_NOT_FOUND(10307, "workflow instance not found", "工作流实例不存在"), + WORKFLOW_INSTANCE_HOST_IS_NULL(10308, "workflow instance host is null", "工作流实例host为空"), + OIDC_TOKEN_EXCHANGE_FAILED(15000, "OIDC token exchange failed", "OIDC令牌交换失败"), OIDC_ID_TOKEN_ISSUER_INVALID(15001, "Invalid issuer in OIDC ID token", "OIDC ID令牌的颁发者无效"), OIDC_ID_TOKEN_AUDIENCE_INVALID(15002, "Invalid audience in OIDC ID token", "OIDC ID令牌的受众无效"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java index ebe8b2e9fea2..994a7418d6dd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java @@ -18,12 +18,17 @@ package org.apache.dolphinscheduler.api.executor.logging; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse; import lombok.extern.slf4j.Slf4j; @@ -85,4 +90,40 @@ private ILogService getProxyLogService(TaskInstance taskInstance) { log.debug("Created log service for host: {}", taskInstance.getHost()); return logService; } + + public WorkflowInstanceLogPageQueryResponse getWorkflowPartLog(WorkflowInstance workflowInstance, int skipLineNum, + int limit) { + return getLocalWorkflowPartLog(workflowInstance, skipLineNum, limit); + } + + public WorkflowInstanceLogFileDownloadResponse getWorkflowWholeLog(WorkflowInstance workflowInstance) { + return getLocalWorkflowWholeLog(workflowInstance); + } + + private WorkflowInstanceLogFileDownloadResponse getLocalWorkflowWholeLog(WorkflowInstance workflowInstance) { + WorkflowInstanceLogFileDownloadRequest request = new WorkflowInstanceLogFileDownloadRequest( + workflowInstance.getId(), + workflowInstance.getLogPath()); + return getProxyWorkflowLogService(workflowInstance).getWorkflowInstanceWholeLogFileBytes(request); + } + + private WorkflowInstanceLogPageQueryResponse getLocalWorkflowPartLog(WorkflowInstance workflowInstance, + int skipLineNum, int limit) { + WorkflowInstanceLogPageQueryRequest request = WorkflowInstanceLogPageQueryRequest + .builder() + .workflowInstanceId(workflowInstance.getId()) + .workflowInstanceLogAbsolutePath(workflowInstance.getLogPath()) + .skipLineNum(skipLineNum) + .limit(limit) + .build(); + return getProxyWorkflowLogService(workflowInstance).pageQueryWorkflowInstanceLog(request); + } + + private ILogService getProxyWorkflowLogService(WorkflowInstance workflowInstance) { + ILogService logService = Clients + .withService(ILogService.class) + .withHost(workflowInstance.getHost()); + log.debug("Created log service for host: {}", workflowInstance.getHost()); + return logService; + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java index ac5b6ecb2ba4..1fca8115a316 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java @@ -18,9 +18,12 @@ package org.apache.dolphinscheduler.api.executor.logging; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.common.transportor.LogResponseStatus; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -109,4 +112,72 @@ private boolean checkNodeExists(TaskInstance taskInstance) { return exists; } + /** + * Retrieves a portion of the log string for a given workflow instance. + * This method first attempts to fetch the log from local storage; if unsuccessful, it tries to obtain the log from remote storage. + * + * @param workflowInstance The workflow instance object, containing information needed for log retrieval. + * @param skipLineNum The number of log lines to skip from the beginning. + * @param limit The maximum number of log lines to retrieve. + * @return A string containing the specified portion of the log. + */ + public String getWorkflowPartLogString(WorkflowInstance workflowInstance, int skipLineNum, int limit) { + checkWorkflowArgs(workflowInstance); + if (checkWorkflowNodeExists(workflowInstance)) { + WorkflowInstanceLogPageQueryResponse response = + localLogClient.getWorkflowPartLog(workflowInstance, skipLineNum, limit); + if (response.getCode() == LogResponseStatus.SUCCESS) { + return response.getLogContent(); + } else { + log.warn("get part log string is not success for workflow instance {}; reason :{}", + workflowInstance.getId(), response.getMessage()); + return remoteLogClient.getWorkflowPartLog(workflowInstance, skipLineNum, limit); + } + } else { + return remoteLogClient.getWorkflowPartLog(workflowInstance, skipLineNum, limit); + } + } + + /** + * Retrieves the complete log content for a given workflow instance as a byte array. + * This method first attempts to fetch the log from local storage; if unsuccessful, it tries to obtain the log from remote storage. + * + * @param workflowInstance The workflow instance object, containing information needed for log retrieval. + * @return A byte array containing the complete log content. + */ + public byte[] getWorkflowWholeLogBytes(WorkflowInstance workflowInstance) { + checkWorkflowArgs(workflowInstance); + if (checkWorkflowNodeExists(workflowInstance)) { + WorkflowInstanceLogFileDownloadResponse response = localLogClient.getWorkflowWholeLog(workflowInstance); + if (response.getCode() == LogResponseStatus.SUCCESS) { + return response.getLogBytes(); + } else { + log.warn("get whole log bytes is not success for workflow instance {}; reason :{}", + workflowInstance.getId(), response.getMessage()); + return remoteLogClient.getWorkflowWholeLog(workflowInstance); + } + } else { + return remoteLogClient.getWorkflowWholeLog(workflowInstance); + } + } + + private static void checkWorkflowArgs(WorkflowInstance workflowInstance) { + if (workflowInstance == null) { + throw new IllegalArgumentException("workflow instance is null"); + } + } + + private boolean checkWorkflowNodeExists(WorkflowInstance workflowInstance) { + String host = workflowInstance.getHost(); + if (host == null || host.isEmpty()) { + log.warn("Host is null or empty for workflow instance {}", workflowInstance.getId()); + return false; + } + boolean exists = registryClient.checkNodeExists(host, RegistryNodeType.MASTER); + if (!exists) { + log.warn("Node {} does not exist for workflow instance {}", host, workflowInstance.getId()); + } + return exists; + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java index 1b3542e96209..318abd02b405 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java @@ -19,6 +19,7 @@ import org.apache.dolphinscheduler.common.utils.LogUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.springframework.stereotype.Component; @@ -51,4 +52,13 @@ public String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) LogUtils.readPartFileContentFromRemote(taskInstance.getLogPath(), skipLineNum, limit)); } + public byte[] getWorkflowWholeLog(WorkflowInstance workflowInstance) { + return LogUtils.getFileContentBytesFromRemote(workflowInstance.getLogPath()); + } + + public String getWorkflowPartLog(WorkflowInstance workflowInstance, int skipLineNum, int limit) { + return LogUtils.rollViewLogLines( + LogUtils.readPartFileContentFromRemote(workflowInstance.getLogPath(), skipLineNum, limit)); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLoggerService.java new file mode 100644 index 000000000000..02395246cbe6 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLoggerService.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.dao.entity.ResponseWorkflowLog; +import org.apache.dolphinscheduler.dao.entity.User; + +/** + * Workflow logger service + */ +public interface WorkflowLoggerService { + + /** + * query workflow log + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @param skipLineNum skip line number + * @param limit limit + * @return workflow log content + */ + Result queryWorkflowLog(User loginUser, int workflowInstanceId, int skipLineNum, int limit); + + /** + * get workflow log bytes + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @return log byte array + */ + byte[] getWorkflowLogBytes(User loginUser, int workflowInstanceId); + + /** + * query workflow log in specified project + * + * @param loginUser login user + * @param projectCode project code + * @param workflowInstanceId workflow instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + String queryWorkflowLog(User loginUser, long projectCode, int workflowInstanceId, int skipLineNum, int limit); + + /** + * get workflow log bytes in specified project + * + * @param loginUser login user + * @param projectCode project code + * @param workflowInstanceId workflow instance id + * @return log byte array + */ + byte[] getWorkflowLogBytes(User loginUser, long projectCode, int workflowInstanceId); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java index 5d024bbb5520..9b6de953d5b4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java @@ -75,6 +75,8 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao; import org.apache.dolphinscheduler.dao.utils.WorkflowUtils; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.master.command.ICommandParam; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.utils.GlobalParameterUtils; @@ -818,6 +820,18 @@ public void deleteWorkflowInstanceById(int workflowInstanceId) { deleteSubWorkflowInstanceIfNeeded(workflowInstanceId); // delete alert alertDao.deleteByWorkflowInstanceId(workflowInstanceId); + // delete workflow instance log + WorkflowInstance workflowInstance = processService.findWorkflowInstanceById(workflowInstanceId); + if (workflowInstance != null && StringUtils.isNotBlank(workflowInstance.getLogPath())) { + // Remove workflow instance log failed will not affect the deletion of workflow instance + try { + Clients.withService(ILogService.class) + .withHost(workflowInstance.getHost()) + .removeWorkflowInstanceLog(workflowInstance.getLogPath()); + } catch (Exception ex) { + log.error("Remove workflow instance log error", ex); + } + } // delete workflow instance workflowInstanceDao.deleteById(workflowInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLoggerServiceImpl.java new file mode 100644 index 000000000000..6399a094d9cf --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLoggerServiceImpl.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.executor.logging.LogClientDelegate; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.WorkflowLoggerService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.dao.entity.ResponseWorkflowLog; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.charset.StandardCharsets; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.google.common.primitives.Bytes; + +/** + * Workflow logger service impl + */ +@Service +@Slf4j +public class WorkflowLoggerServiceImpl extends BaseServiceImpl implements WorkflowLoggerService { + + private static final String LOG_HEAD_FORMAT = "[LOG-PATH]: %s, [HOST]: %s%s"; + + @Autowired + private WorkflowInstanceDao workflowInstanceDao; + + @Autowired + private ProjectService projectService; + + @Autowired + private LogClientDelegate logClientDelegate; + + @Override + public Result queryWorkflowLog(User loginUser, int workflowInstanceId, int skipLineNum, + int limit) { + WorkflowInstance workflowInstance = workflowInstanceDao.queryById(workflowInstanceId); + if (workflowInstance == null) { + log.error("Workflow instance does not exist, workflowInstanceId:{}.", workflowInstanceId); + return Result.error(Status.WORKFLOW_INSTANCE_NOT_FOUND); + } + if (StringUtils.isBlank(workflowInstance.getHost())) { + log.error("Host of workflow instance is null, workflowInstanceId:{}.", workflowInstanceId); + return Result.error(Status.WORKFLOW_INSTANCE_HOST_IS_NULL); + } + projectService.checkProjectAndAuthThrowException(loginUser, workflowInstance.getProjectCode(), VIEW_LOG); + String log = queryWorkflowLog(workflowInstance, skipLineNum, limit); + int lineNum = log.split("\\r\\n").length; + return Result.success(new ResponseWorkflowLog(lineNum, log)); + } + + @Override + public byte[] getWorkflowLogBytes(User loginUser, int workflowInstanceId) { + WorkflowInstance workflowInstance = workflowInstanceDao.queryById(workflowInstanceId); + if (workflowInstance == null || StringUtils.isBlank(workflowInstance.getHost())) { + throw new RuntimeException("workflow instance is null or host is null"); + } + projectService.checkProjectAndAuthThrowException(loginUser, workflowInstance.getProjectCode(), DOWNLOAD_LOG); + return getWorkflowLogBytes(workflowInstance); + } + + @Override + public String queryWorkflowLog(User loginUser, long projectCode, int workflowInstanceId, int skipLineNum, + int limit) { + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, VIEW_LOG); + WorkflowInstance workflowInstance = workflowInstanceDao.queryById(workflowInstanceId); + if (workflowInstance == null || StringUtils.isBlank(workflowInstance.getHost())) { + throw new RuntimeException("Workflow instance not found or host is null"); + } + if (projectCode != workflowInstance.getProjectCode()) { + throw new RuntimeException("Workflow instance does not exist in project"); + } + return queryWorkflowLog(workflowInstance, skipLineNum, limit); + } + + @Override + public byte[] getWorkflowLogBytes(User loginUser, long projectCode, int workflowInstanceId) { + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, DOWNLOAD_LOG); + WorkflowInstance workflowInstance = workflowInstanceDao.queryById(workflowInstanceId); + if (workflowInstance == null || StringUtils.isBlank(workflowInstance.getHost())) { + throw new RuntimeException("Workflow instance not found or host is null"); + } + if (projectCode != workflowInstance.getProjectCode()) { + throw new RuntimeException("Workflow instance does not exist in project"); + } + return getWorkflowLogBytes(workflowInstance); + } + + private String queryWorkflowLog(WorkflowInstance workflowInstance, int skipLineNum, int limit) { + final String logPath = workflowInstance.getLogPath(); + log.info("Query workflow instance log, workflowInstanceId:{}, workflowInstanceName:{}, host: {}, logPath:{}", + workflowInstance.getId(), workflowInstance.getName(), workflowInstance.getHost(), logPath); + if (StringUtils.isBlank(logPath)) { + throw new RuntimeException("WorkflowInstanceLogPath is empty"); + } + + StringBuilder sb = new StringBuilder(); + if (skipLineNum == 0) { + String head = String.format(LOG_HEAD_FORMAT, + logPath, + workflowInstance.getHost(), + Constants.SYSTEM_LINE_SEPARATOR); + sb.append(head); + } + + try { + String logContent = logClientDelegate.getWorkflowPartLogString(workflowInstance, skipLineNum, limit); + if (logContent != null) { + sb.append(logContent); + } + return sb.toString(); + } catch (Throwable ex) { + log.error("Query workflow instance log error", ex); + throw new RuntimeException("Query workflow instance log error: " + ex.getMessage(), ex); + } + } + + private byte[] getWorkflowLogBytes(WorkflowInstance workflowInstance) { + String host = workflowInstance.getHost(); + String logPath = workflowInstance.getLogPath(); + + byte[] head = String.format(LOG_HEAD_FORMAT, + logPath, + host, + Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8); + + byte[] logBytes; + + try { + logBytes = logClientDelegate.getWorkflowWholeLogBytes(workflowInstance); + return Bytes.concat(head, logBytes); + } catch (Exception ex) { + log.error("Download WorkflowInstance: {} Log Error", workflowInstance.getName(), ex); + throw new RuntimeException("Download workflow instance log error", ex); + } + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java index 38ed86900790..a3f7363e355d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; import org.apache.dolphinscheduler.extract.common.ILogService; @@ -31,6 +32,10 @@ import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse; import java.io.IOException; import java.net.ServerSocket; @@ -98,6 +103,39 @@ public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLog public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { } + + @Override + public WorkflowInstanceLogFileDownloadResponse getWorkflowInstanceWholeLogFileBytes(WorkflowInstanceLogFileDownloadRequest workflowInstanceLogFileDownloadRequest) { + if (workflowInstanceLogFileDownloadRequest.getWorkflowInstanceId() == 1) { + return new WorkflowInstanceLogFileDownloadResponse(new byte[0], LogResponseStatus.SUCCESS, ""); + } else if (workflowInstanceLogFileDownloadRequest.getWorkflowInstanceId() == 10) { + return new WorkflowInstanceLogFileDownloadResponse("log content".getBytes(), + LogResponseStatus.SUCCESS, + ""); + } + + throw new ServiceException("download error"); + } + + @Override + public WorkflowInstanceLogPageQueryResponse pageQueryWorkflowInstanceLog(WorkflowInstanceLogPageQueryRequest workflowInstanceLogPageQueryRequest) { + if (workflowInstanceLogPageQueryRequest.getWorkflowInstanceId() != null) { + if (workflowInstanceLogPageQueryRequest.getWorkflowInstanceId() == 100) { + throw new ServiceException("query log error"); + } else if (workflowInstanceLogPageQueryRequest.getWorkflowInstanceId() == 10) { + return new WorkflowInstanceLogPageQueryResponse("Partial log content", + LogResponseStatus.SUCCESS, + ""); + } + } + + return new WorkflowInstanceLogPageQueryResponse(); + } + + @Override + public void removeWorkflowInstanceLog(String workflowInstanceLogAbsolutePath) { + + } }); springServerMethodInvokerDiscovery.start(); } @@ -134,4 +172,31 @@ public void testGetPartLogSuccess() { assertNotNull(actualResponse); assertEquals("Partial log content", actualResponse.getLogContent()); } + + @Test + public void testGetWorkflowWholeLogSuccess() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setHost("127.0.0.1:" + nettyServerPort); + workflowInstance.setId(1); + workflowInstance.setLogPath("/path/to/log"); + + WorkflowInstanceLogFileDownloadResponse actualResponse = localLogClient.getWorkflowWholeLog(workflowInstance); + + assertNotNull(actualResponse); + assertArrayEquals("".getBytes(), actualResponse.getLogBytes()); + } + + @Test + public void testGetWorkflowPartLogSuccess() { + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setHost("127.0.0.1:" + nettyServerPort); + workflowInstance.setId(10); + workflowInstance.setLogPath("/path/to/log"); + + WorkflowInstanceLogPageQueryResponse actualResponse = + localLogClient.getWorkflowPartLog(workflowInstance, 0, 10); + + assertNotNull(actualResponse); + assertEquals("Partial log content", actualResponse.getLogContent()); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResponseWorkflowLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResponseWorkflowLog.java new file mode 100644 index 000000000000..4dd7b753b90a --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResponseWorkflowLog.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * log of the logger service response + */ +@Data +@AllArgsConstructor +public class ResponseWorkflowLog { + + private int lineNum; + private String message; +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java index 8abf6a46666d..31caeb7adf1d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java @@ -158,6 +158,8 @@ public class WorkflowInstance { private Date restartTime; + private String logPath; + /** * set the process name with process define version and timestamp * diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 996fce0732b9..d4896e74a68b 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -636,6 +636,7 @@ CREATE TABLE t_ds_workflow_instance var_pool longtext, dry_run int NULL DEFAULT 0, restart_time datetime DEFAULT NULL, + log_path longtext DEFAULT NULL, PRIMARY KEY (id) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 7067cd7429aa..d3fb8a6fea77 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -644,6 +644,7 @@ CREATE TABLE `t_ds_workflow_instance` ( `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run', `next_workflow_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next workflowInstanceId', `restart_time` datetime DEFAULT NULL COMMENT 'workflow instance restart time', + `log_path` longtext DEFAULT NULL COMMENT 'workflow instance log path', PRIMARY KEY (`id`), KEY `workflow_instance_index` (`workflow_definition_code`,`id`) USING BTREE, KEY `start_time_index` (`start_time`,`end_time`) USING BTREE diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 36261d7a8c2d..f8d2f0b22519 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -584,6 +584,7 @@ CREATE TABLE t_ds_workflow_instance ( dry_run int DEFAULT '0' , next_workflow_instance_id int DEFAULT '0', restart_time timestamp DEFAULT NULL , + log_path text DEFAULT NULL , PRIMARY KEY (id) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..55d6005df575 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE `t_ds_workflow_instance` +ADD COLUMN `log_path` longtext NULL COMMENT 'workflow instance log path'; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 000000000000..4a14f326b985 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..57459d10e067 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE t_ds_workflow_instance ADD COLUMN IF NOT EXISTS "log_path" text DEFAULT NULL; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 000000000000..4a14f326b985 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java index 4484e519421b..58f9e4f6ab67 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java @@ -23,6 +23,10 @@ import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse; @RpcService public interface ILogService { @@ -36,4 +40,13 @@ public interface ILogService { @RpcMethod void removeTaskInstanceLog(String taskInstanceLogAbsolutePath); + @RpcMethod + WorkflowInstanceLogFileDownloadResponse getWorkflowInstanceWholeLogFileBytes(WorkflowInstanceLogFileDownloadRequest workflowInstanceLogFileDownloadRequest); + + @RpcMethod + WorkflowInstanceLogPageQueryResponse pageQueryWorkflowInstanceLog(WorkflowInstanceLogPageQueryRequest workflowInstanceLogPageQueryRequest); + + @RpcMethod + void removeWorkflowInstanceLog(String workflowInstanceLogAbsolutePath); + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/service/impl/LogServiceImpl.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/service/impl/LogServiceImpl.java index bbe2c09d9291..f8175ff763ff 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/service/impl/LogServiceImpl.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/service/impl/LogServiceImpl.java @@ -25,6 +25,10 @@ import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryRequest; +import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -82,4 +86,43 @@ public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) { FileUtils.deleteFile(taskInstanceLogAbsolutePath); } + @Override + public WorkflowInstanceLogFileDownloadResponse getWorkflowInstanceWholeLogFileBytes(WorkflowInstanceLogFileDownloadRequest workflowInstanceLogFileDownloadRequest) { + final WorkflowInstanceLogFileDownloadResponse workflowInstanceLogFileDownloadResponse = + new WorkflowInstanceLogFileDownloadResponse(); + try { + byte[] bytes = LogUtils + .getFileContentBytesFromLocal( + workflowInstanceLogFileDownloadRequest.getWorkflowInstanceLogAbsolutePath()); + workflowInstanceLogFileDownloadResponse.setLogBytes(bytes); + } catch (Exception e) { + workflowInstanceLogFileDownloadResponse.setCode(LogResponseStatus.ERROR); + workflowInstanceLogFileDownloadResponse.setMessage(ExceptionUtils.getRootCauseMessage(e)); + } + return workflowInstanceLogFileDownloadResponse; + } + + @Override + public WorkflowInstanceLogPageQueryResponse pageQueryWorkflowInstanceLog(WorkflowInstanceLogPageQueryRequest workflowInstanceLogPageQueryRequest) { + final WorkflowInstanceLogPageQueryResponse workflowInstanceLogPageQueryResponse = + new WorkflowInstanceLogPageQueryResponse(); + List lines; + try { + lines = LogUtils.readPartFileContentFromLocal( + workflowInstanceLogPageQueryRequest.getWorkflowInstanceLogAbsolutePath(), + workflowInstanceLogPageQueryRequest.getSkipLineNum(), + workflowInstanceLogPageQueryRequest.getLimit()); + workflowInstanceLogPageQueryResponse.setLogContent(LogUtils.rollViewLogLines(lines)); + } catch (Exception e) { + workflowInstanceLogPageQueryResponse.setCode(LogResponseStatus.ERROR); + workflowInstanceLogPageQueryResponse.setMessage(ExceptionUtils.getMessage(e)); + } + return workflowInstanceLogPageQueryResponse; + } + + @Override + public void removeWorkflowInstanceLog(String workflowInstanceLogAbsolutePath) { + FileUtils.deleteFile(workflowInstanceLogAbsolutePath); + } + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogFileDownloadRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogFileDownloadRequest.java new file mode 100644 index 000000000000..fca7833151bf --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogFileDownloadRequest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.common.transportor; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceLogFileDownloadRequest { + + private Integer workflowInstanceId; + + private String workflowInstanceLogAbsolutePath; +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogFileDownloadResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogFileDownloadResponse.java new file mode 100644 index 000000000000..18a8f7c3cade --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogFileDownloadResponse.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.common.transportor; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceLogFileDownloadResponse { + + private byte[] logBytes; + + private LogResponseStatus code = LogResponseStatus.SUCCESS; + + private String message; + +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogPageQueryRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogPageQueryRequest.java new file mode 100644 index 000000000000..15f6ea1c6e31 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogPageQueryRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.common.transportor; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceLogPageQueryRequest { + + private Integer workflowInstanceId; + + private String workflowInstanceLogAbsolutePath; + + private Integer skipLineNum; + + private Integer limit; +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogPageQueryResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogPageQueryResponse.java new file mode 100644 index 000000000000..dce7cced7e73 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/WorkflowInstanceLogPageQueryResponse.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.common.transportor; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceLogPageQueryResponse { + + private String logContent; + + private LogResponseStatus code = LogResponseStatus.SUCCESS; + + private String message; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java index 46c5cb32b02c..0b914381c35a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.execution.IWorkflowExecution; import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext; import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -86,11 +87,14 @@ public void fireAllRegisteredEvent() { final String workflowInstanceName = workflowExecution.getName(); try { LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC( + workflowExecution.getWorkflowExecuteContext().getWorkflowInstance().getLogPath()); doFireSingleWorkflowEventBus(workflowExecution); } catch (Exception ex) { log.error("Fire event failed for WorkflowExecuteRunnable: {}", workflowInstanceName, ex); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java index 6af21609948c..4a55488753fc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.dao.repository.ProjectDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.extract.master.command.ICommandParam; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler; @@ -39,10 +40,12 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.commons.collections4.CollectionUtils; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -69,6 +72,9 @@ public abstract class AbstractCommandHandler implements ICommandHandler { @Autowired protected ProjectDao projectDao; + @Autowired + protected WorkflowInstanceDao workflowInstanceDao; + @Override public WorkflowExecution handleCommand(final Command command) { final WorkflowExecuteContextBuilder workflowExecuteContextBuilder = WorkflowExecuteContext.builder() @@ -81,6 +87,7 @@ public WorkflowExecution handleCommand(final Command command) { assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder); assembleWorkflowEventBus(workflowExecuteContextBuilder); assembleWorkflowExecutionGraph(workflowExecuteContextBuilder); + assembleWorkflowInstanceLogPath(workflowExecuteContextBuilder); final WorkflowExecutionBuilder workflowExecutionBuilder = WorkflowExecutionBuilder .builder() @@ -159,4 +166,20 @@ protected void assembleProject( workflowExecuteContextBuilder.setProject(project); } + protected void assembleWorkflowInstanceLogPath(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); + final Date logTime = workflowInstance.getRestartTime() != null + ? workflowInstance.getRestartTime() + : workflowInstance.getStartTime(); + final String logPath = WorkflowLogUtils.getWorkflowInstanceLogFullPath( + logTime, + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId()); + workflowInstance.setLogPath(logPath); + workflowInstanceDao.updateById(workflowInstance); + + workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java index cecdbd4bd488..fcfce8d60ed2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent; import org.apache.dolphinscheduler.server.master.engine.task.execution.ITaskExecution; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils; import java.util.Date; @@ -100,9 +101,12 @@ public void run() { TaskExecutorMDCUtils.MDCAutoClosable ignore = TaskExecutorMDCUtils.logWithMDC(taskExecution.getId())) { LogUtils.setWorkflowInstanceIdMDC(taskExecution.getTaskInstance().getWorkflowInstanceId()); + WorkflowLogUtils + .setWorkflowInstanceLogFullPathMDC(taskExecution.getWorkflowInstance().getLogPath()); doDispatchTask(taskExecution); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 48b329bc771c..df644e09ad06 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy; import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; import org.apache.commons.collections4.CollectionUtils; @@ -116,12 +117,15 @@ protected void triggerTasks(final IWorkflowExecution workflowExecution, protected void pauseActiveTask(final IWorkflowExecution workflowExecution) { try { LogUtils.setWorkflowInstanceIdMDC(workflowExecution.getId()); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC( + workflowExecution.getWorkflowExecuteContext().getWorkflowInstance().getLogPath()); workflowExecution .getWorkflowExecutionGraph() .getActiveTaskExecution() .forEach(ITaskExecution::pause); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java index 5a2f37596990..13b80f4f62bd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.task.execution.ITaskExecution; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.springframework.stereotype.Component; @@ -30,6 +31,7 @@ public void failoverTask(final ITaskExecution taskExecution) { LogUtils.setWorkflowInstanceIdMDC(taskExecution.getWorkflowInstance().getId()); taskExecution.getWorkflowEventBus().publish(TaskFailoverLifecycleEvent.of(taskExecution)); LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java new file mode 100644 index 000000000000..d8baaea1577f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.log; + +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.MDC; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.sift.AbstractDiscriminator; + +/** + * Workflow Log Discriminator + */ +@Slf4j +@Getter +@Setter +public class WorkflowLogDiscriminator extends AbstractDiscriminator { + + private String key; + + private String logBase; + + @Override + public String getDiscriminatingValue(ILoggingEvent event) { + String workflowInstanceLogPath = MDC.get(WorkflowLogUtils.WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + if (workflowInstanceLogPath == null) { + log.error("The workflow instance log path is null, please check the logback configuration, log: {}", event); + } + return workflowInstanceLogPath; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java new file mode 100644 index 000000000000..c9c42fa2d9a3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.log; + +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.MDC; +import org.slf4j.Marker; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.filter.Filter; +import ch.qos.logback.core.spi.FilterReply; + +/** + * This class is used to filter the log of the workflow instance. + */ +@Slf4j +public class WorkflowLogFilter extends Filter { + + @Override + public FilterReply decide(ILoggingEvent event) { + String workflowInstanceLogPath = MDC.get(WorkflowLogUtils.WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + // If the workflowInstanceLogPath is empty, it means that the log is not related to a workflow instance. + if (StringUtils.isEmpty(workflowInstanceLogPath)) { + return FilterReply.DENY; + } + + final Marker marker = event.getMarker(); + if (marker == null) { + return FilterReply.ACCEPT; + } + if (marker.contains(WorkflowLogMarkers.includeInWorkflowLog())) { + return FilterReply.ACCEPT; + } + if (marker.contains(WorkflowLogMarkers.excludeInWorkflowLog())) { + return FilterReply.DENY; + } + return FilterReply.ACCEPT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java new file mode 100644 index 000000000000..0653e555d2ed --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.log; + +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +public class WorkflowLogMarkers { + + private static final Marker WORKFLOW_LOGGER_EXCLUDE_MARKER = MarkerFactory.getMarker("WORKFLOW_LOGGER_EXCLUDE"); + + private static final Marker WORKFLOW_LOGGER_INCLUDE_MARKER = MarkerFactory.getMarker("WORKFLOW_LOGGER_INCLUDE"); + + /** + * The marker used to exclude logs from the workflow instance log file. + */ + public static Marker excludeInWorkflowLog() { + return WORKFLOW_LOGGER_EXCLUDE_MARKER; + } + + /** + * The marker used to include logs from the workflow instance log file. + */ + public static Marker includeInWorkflowLog() { + return WORKFLOW_LOGGER_INCLUDE_MARKER; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java index 278f5160dc80..42b9f42261a4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.rpc; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.master.ITaskExecutorEventListener; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; @@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRuntimeContextChangedEvent; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.execution.IWorkflowExecution; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorDispatchedLifecycleEvent; import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent; @@ -54,7 +56,10 @@ public class TaskExecutorEventListenerImpl implements ITaskExecutorEventListener @Override public void onTaskExecutorDispatched(final TaskExecutorDispatchedLifecycleEvent taskExecutorDispatchedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = getTaskExecution(taskExecutorDispatchedLifecycleEvent); @@ -66,12 +71,16 @@ public void onTaskExecutorDispatched(final TaskExecutorDispatchedLifecycleEvent taskExecution.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskExecutorStartedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorStartedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorStartedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = getTaskExecution(taskExecutorStartedLifecycleEvent); @@ -84,31 +93,39 @@ public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskEx taskExecution.getWorkflowEventBus().publish(taskRunningEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override - public void onTaskExecutorRuntimeContextChanged(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorRuntimeContextChangedLifecycleEventr) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorRuntimeContextChangedLifecycleEventr.getWorkflowInstanceId()); + public void onTaskExecutorRuntimeContextChanged(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorRuntimeContextChangedLifecycleEvent) { + int workflowInstanceId = taskExecutorRuntimeContextChangedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = - getTaskExecution(taskExecutorRuntimeContextChangedLifecycleEventr); + getTaskExecution(taskExecutorRuntimeContextChangedLifecycleEvent); final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent = TaskRuntimeContextChangedEvent.builder() .taskExecution(taskExecution) - .runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds()) + .runtimeContext(taskExecutorRuntimeContextChangedLifecycleEvent.getAppIds()) .build(); taskExecution.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent taskExecutorSuccessLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = getTaskExecution(taskExecutorSuccessLifecycleEvent); @@ -120,12 +137,16 @@ public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent taskEx taskExecution.getWorkflowEventBus().publish(taskSuccessEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent taskExecutorFailedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorFailedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorFailedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = getTaskExecution(taskExecutorFailedLifecycleEvent); @@ -136,12 +157,16 @@ public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent taskExec taskExecution.getWorkflowEventBus().publish(taskFailedEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent taskExecutorKilledLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorKilledLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorKilledLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = getTaskExecution(taskExecutorKilledLifecycleEvent); @@ -152,12 +177,16 @@ public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent taskExec taskExecution.getWorkflowEventBus().publish(taskKilledEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent taskExecutorPausedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorPausedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorPausedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecution taskExecution = getTaskExecution(taskExecutorPausedLifecycleEvent); @@ -165,6 +194,7 @@ public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent taskExec taskExecution.getWorkflowEventBus().publish(taskPausedEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java index 40a06f91585f..579d5ecbd7a2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.rpc; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.master.ITaskInstanceController; import org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyRequest; import org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyResponse; @@ -25,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.execution.ITaskExecution; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.execution.IWorkflowExecution; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import lombok.extern.slf4j.Slf4j; @@ -46,6 +48,9 @@ public TaskGroupSlotAcquireSuccessNotifyResponse notifyTaskGroupSlotAcquireSucce final int workflowInstanceId = taskGroupSlotAcquireSuccessNotifyRequest.getWorkflowInstanceId(); final int taskInstanceId = taskGroupSlotAcquireSuccessNotifyRequest.getTaskInstanceId(); LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + WorkflowInstance workflowInstance = + workflowExecutionMemoryRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); final IWorkflowExecution workflowExecution = workflowExecutionMemoryRepository.get(workflowInstanceId); if (workflowExecution == null) { @@ -68,6 +73,7 @@ public TaskGroupSlotAcquireSuccessNotifyResponse notifyTaskGroupSlotAcquireSucce return TaskGroupSlotAcquireSuccessNotifyResponse.success(); } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java new file mode 100644 index 000000000000..db1ea5256669 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.utils; + +import org.apache.dolphinscheduler.common.constants.DateConstants; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.server.master.log.WorkflowLogDiscriminator; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Date; +import java.util.Optional; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import ch.qos.logback.classic.sift.SiftingAppender; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.AppenderAttachable; + +@Slf4j +@UtilityClass +public class WorkflowLogUtils { + + private static final Path WORKFLOW_INSTANCE_LOG_BASE_PATH = getWorkflowInstanceLogBasePath(); + public static final String WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY = "workflowInstanceLogFullPath"; + + public static String getWorkflowInstanceLogFullPath(Date workflowStartTime, + Long workflowDefinitionCode, + int workflowDefinitionVersion, + int workflowInstanceId) { + if (WORKFLOW_INSTANCE_LOG_BASE_PATH == null) { + throw new IllegalArgumentException( + "Cannot find the workflow instance log base path, please check your logback.xml file"); + } + final String workflowLogFileName = Paths.get( + String.valueOf(workflowDefinitionCode), + String.valueOf(workflowDefinitionVersion), + String.format("%s.log", workflowInstanceId)).toString(); + return WORKFLOW_INSTANCE_LOG_BASE_PATH + .resolve(DateUtils.format(workflowStartTime, DateConstants.YYYYMMDD, null)) + .resolve(workflowLogFileName) + .toString(); + } + + /** + * Get workflow instance log base absolute path, this is defined in logback.xml + */ + public static Path getWorkflowInstanceLogBasePath() { + return Optional.of(LoggerFactory.getILoggerFactory()) + .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) + .map(e -> (SiftingAppender) (e.getAppender("WORKFLOWLOGFILE"))) + .map(e -> ((WorkflowLogDiscriminator) (e.getDiscriminator()))) + .map(WorkflowLogDiscriminator::getLogBase) + .map(e -> Paths.get(e).toAbsolutePath()) + .orElse(null); + } + + public static String getWorkflowInstanceLogFullPathMDC() { + return MDC.get(WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + } + + public static void setWorkflowInstanceLogFullPathMDC(String workflowInstanceLogFullPath) { + if (workflowInstanceLogFullPath == null) { + log.warn("workflowInstanceLogFullPath is null"); + return; + } + MDC.put(WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY, workflowInstanceLogFullPath); + } + + public static void removeWorkflowInstanceLogFullPathMDC() { + MDC.remove(WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + } +} diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index a4c3a4f22dfe..b16c201335d4 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -51,6 +51,27 @@ + + + + + workflowInstanceLogFullPath + ${log.base}/workflows + + + + ${workflowInstanceLogFullPath} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] %logger{10}:[%line] - %message%n + + UTF-8 + + true + + + + ${log.base}/dolphinscheduler-master.log @@ -75,6 +96,7 @@ + diff --git a/dolphinscheduler-ui/src/service/modules/workflow-instances/types.ts b/dolphinscheduler-ui/src/service/modules/workflow-instances/types.ts index 52bc977fa237..1ee7aa1eaac4 100644 --- a/dolphinscheduler-ui/src/service/modules/workflow-instances/types.ts +++ b/dolphinscheduler-ui/src/service/modules/workflow-instances/types.ts @@ -97,6 +97,7 @@ interface IWorkflowInstance { dryRun: number executorName: string host: string + logPath?: string count?: number disabled?: boolean buttonType?: string diff --git a/dolphinscheduler-ui/src/service/modules/workflow-log/index.ts b/dolphinscheduler-ui/src/service/modules/workflow-log/index.ts new file mode 100644 index 000000000000..994d7d251e76 --- /dev/null +++ b/dolphinscheduler-ui/src/service/modules/workflow-log/index.ts @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { axios } from '@/service/service' +import utils from '@/utils' +import { WorkflowLogIdReq, WorkflowLogReq } from './types' + +export function queryWorkflowLogDetail(params: WorkflowLogReq): any { + return axios({ + url: '/workflow-log/detail', + method: 'get', + params + }) +} + +export function downloadWorkflowLog(params: WorkflowLogIdReq): void { + utils.downloadFile('workflow-log/download-log', params) +} diff --git a/dolphinscheduler-ui/src/service/modules/workflow-log/types.ts b/dolphinscheduler-ui/src/service/modules/workflow-log/types.ts new file mode 100644 index 000000000000..831971cfc75c --- /dev/null +++ b/dolphinscheduler-ui/src/service/modules/workflow-log/types.ts @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +interface WorkflowLogIdReq { + workflowInstanceId: number +} + +interface WorkflowLogReq extends WorkflowLogIdReq { + limit: number + skipLineNum: number +} + +export { WorkflowLogIdReq, WorkflowLogReq } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/instance/components/table-action.tsx b/dolphinscheduler-ui/src/views/projects/workflow/instance/components/table-action.tsx index fffcaddf0e53..02dadd17295e 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/instance/components/table-action.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/instance/components/table-action.tsx @@ -25,7 +25,9 @@ import { CloseCircleOutlined, PauseCircleOutlined, ControlOutlined, - PlayCircleOutlined + PlayCircleOutlined, + AlignLeftOutlined, + DownloadOutlined } from '@vicons/antd' import { useI18n } from 'vue-i18n' import { useRouter } from 'vue-router' @@ -48,7 +50,9 @@ export default defineComponent({ 'reStore', 'stop', 'suspend', - 'deleteInstance' + 'deleteInstance', + 'viewLog', + 'downloadLog' ], setup(props, ctx) { const router: Router = useRouter() @@ -89,6 +93,14 @@ export default defineComponent({ ctx.emit('deleteInstance') } + const handleViewLog = () => { + ctx.emit('viewLog', props.row) + } + + const handleDownloadLog = () => { + ctx.emit('downloadLog', props.row) + } + return { handleEdit, handleReRun, @@ -97,6 +109,8 @@ export default defineComponent({ handleSuspend, handleDeleteInstance, handleGantt, + handleViewLog, + handleDownloadLog, ...toRefs(props) } }, @@ -295,6 +309,44 @@ export default defineComponent({ ) }} + + {{ + default: () => t('project.workflow.view_log'), + trigger: () => ( + + + + + + ) + }} + + + {{ + default: () => t('project.workflow.download_log'), + trigger: () => ( + + + + + + ) + }} + ) } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx index c4de57aabaa1..53e45f5389b6 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx @@ -27,6 +27,7 @@ import { } from 'naive-ui' import { useTable } from './use-table' import Card from '@/components/card' +import LogModal from '@/components/log-modal' import WorkflowInstanceCondition from './components/workflow-instance-condition' import type { IWorkflowInstanceSearch } from './types' import totalCount from '@/utils/tableTotalCount' @@ -35,8 +36,15 @@ export default defineComponent({ name: 'WorkflowInstanceList', setup() { let setIntervalP: number - const { variables, createColumns, getTableData, batchDeleteInstance } = - useTable() + const { + variables, + createColumns, + getTableData, + batchDeleteInstance, + viewLog, + downloadLog, + fetchLog + } = useTable() const requestData = () => { getTableData() @@ -64,6 +72,17 @@ export default defineComponent({ batchDeleteInstance() } + const onConfirmModal = () => { + variables.showLogModalRef = false + } + + const refreshLogs = () => { + variables.logRef = '' + variables.limit = 1000 + variables.skipLineNum = 0 + fetchLog(variables.currentLogRow) + } + onMounted(() => { createColumns(variables) requestData() @@ -82,11 +101,33 @@ export default defineComponent({ clearInterval(setIntervalP) }) + watch( + () => variables.showLogModalRef, + () => { + if (variables.showLogModalRef) { + variables.logRef = '' + variables.limit = 1000 + variables.skipLineNum = 0 + fetchLog(variables.currentLogRow) + } else { + variables.logRef = '' + variables.logLoadingRef = true + variables.currentLogRow = null + variables.skipLineNum = 0 + variables.limit = 1000 + } + } + ) + return { requestData, handleSearch, handleChangePageSize, handleBatchDelete, + onConfirmModal, + refreshLogs, + viewLog, + downloadLog, ...toRefs(variables) } }, @@ -150,6 +191,16 @@ export default defineComponent({ }} + ) } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/instance/use-table.ts b/dolphinscheduler-ui/src/views/projects/workflow/instance/use-table.ts index 02b7ba8a7479..611de39934da 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/instance/use-table.ts +++ b/dolphinscheduler-ui/src/views/projects/workflow/instance/use-table.ts @@ -16,8 +16,9 @@ */ import _ from 'lodash' -import { reactive, h, ref } from 'vue' +import { reactive, h, ref, watch } from 'vue' import { useI18n } from 'vue-i18n' +import { useAsyncState } from '@vueuse/core' import { useRouter } from 'vue-router' import ButtonLink from '@/components/button-link' import { RowKey } from 'naive-ui/lib/data-table/src/interface' @@ -27,6 +28,7 @@ import { deleteWorkflowInstanceById, batchDeleteWorkflowInstanceByIds } from '@/service/modules/workflow-instances' +import { queryWorkflowLogDetail, downloadWorkflowLog } from '@/service/modules/workflow-log' import { execute } from '@/service/modules/executors' import TableAction from './components/table-action' import { @@ -69,7 +71,14 @@ export function useTable() { .workflowDefinitionCode ? ref(Number(router.currentRoute.value.query.workflowDefinitionCode)) : ref(), - loadingRef: ref(false) + loadingRef: ref(false), + // Log related variables + showLogModalRef: ref(false), + logRef: ref(''), + logLoadingRef: ref(true), + currentLogRow: ref(null), + skipLineNum: ref(0), + limit: ref(1000) }) const createColumns = (variables: any) => { @@ -239,7 +248,9 @@ export function useTable() { }) } }, - onDeleteInstance: () => deleteInstance(_row.id) + onDeleteInstance: () => deleteInstance(_row.id), + onViewLog: () => viewLog(_row), + onDownloadLog: () => downloadLog(_row) }) } ] @@ -353,14 +364,67 @@ export function useTable() { }) } + /** + * View workflow log + */ + const viewLog = (row: IWorkflowInstance) => { + variables.currentLogRow = row + variables.showLogModalRef = true + } + + /** + * Fetch workflow log + */ + const fetchLog = (row: any) => { + const { state } = useAsyncState( + queryWorkflowLogDetail({ + workflowInstanceId: Number(row.id), + limit: variables.limit, + skipLineNum: variables.skipLineNum + }).then((res: any) => { + variables.logRef += res?.message || '' + if (res?.message && res.message !== '') { + variables.skipLineNum += res.lineNum + fetchLog(row) + } else { + variables.logLoadingRef = false + } + }), + {} + ) + + return state + } + + /** + * Download workflow log + */ + const downloadLog = (row: IWorkflowInstance) => { + downloadWorkflowLog({ workflowInstanceId: row.id }) + } + + /** + * Close log modal + */ + const closeLogModal = () => { + variables.showLogModalRef = false + variables.logRef = '' + variables.currentLogRow = null + } + return { variables, createColumns, getTableData, - batchDeleteInstance + batchDeleteInstance, + viewLog, + downloadLog, + closeLogModal, + fetchLog } } + export function renderWorkflowStateCell( state: IWorkflowExecutionState, t: Function