Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
68a5e89
Add separate workflow instance logs
Apr 2, 2026
1efe619
Merge branch 'apache:dev' into Feature-18125
njnu-seafish Apr 2, 2026
1952489
Merge branch 'dev' into DSIP-107
SbloodyS Apr 3, 2026
5a4d175
Merge remote-tracking branch 'origin/dev' into DSIP-107
Apr 16, 2026
f38aa56
Resolve conflicts
Apr 16, 2026
94708ed
Merge branch 'dev' into DSIP-107
ruanwenjun Apr 25, 2026
2ef11be
Merge branch 'apache:dev' into DSIP-107
njnu-seafish May 13, 2026
c2e475a
set logPath in AbstractCommandHandler
May 14, 2026
e71f415
Merge branch 'dev' into DSIP-107
njnu-seafish May 14, 2026
5689ffe
Merge branch 'dev' into DSIP-107
njnu-seafish May 15, 2026
8df0376
Merge branch 'apache:dev' into DSIP-107
njnu-seafish May 19, 2026
bf4e76e
add query and delete workflow log in backend
May 19, 2026
c81b961
update import
May 19, 2026
ca022f7
add query and delete workflow log in frontend
May 21, 2026
c6ff5c9
remove locates
May 21, 2026
26d09c0
Delete redundant file
May 21, 2026
e7bcda8
Merge branch 'dev' into DSIP-107
njnu-seafish May 21, 2026
4062178
Add blank lines
May 22, 2026
65f9377
Merge branch 'DSIP-107' of github.com:njnu-seafish/dolphinscheduler i…
May 22, 2026
3911cab
Merge branch 'dev' into DSIP-107
njnu-seafish May 25, 2026
0931712
Merge branch 'dev' into DSIP-107
njnu-seafish May 27, 2026
d93f2ba
Merge branch 'apache:dev' into DSIP-107
njnu-seafish Jun 9, 2026
9f06e8b
adapt to 3.5.0
Jun 9, 2026
1064e23
Merge branch 'dev' into DSIP-107
njnu-seafish Jun 12, 2026
cbcffb9
Merge branch 'dev' into DSIP-107
njnu-seafish Jun 22, 2026
0e06a2f
Merge branch 'dev' into DSIP-107
njnu-seafish Jun 23, 2026
9914755
Merge branch 'dev' into DSIP-107
njnu-seafish Jun 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.controller;

import static org.apache.dolphinscheduler.api.enums.Status.DOWNLOAD_WORKFLOW_INSTANCE_LOG_FILE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_INSTANCE_LOG_ERROR;

import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.WorkflowLoggerService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ResponseWorkflowLog;
import org.apache.dolphinscheduler.dao.entity.User;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;

/**
* Workflow logger controller
*/
@Tag(name = "WORKFLOW_LOGGER_TAG")
@RestController
@RequestMapping("/workflow-log")
public class WorkflowLoggerController extends BaseController {

@Autowired
private WorkflowLoggerService workflowLoggerService;

/**
* query workflow log
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @param skipNum skip number
* @param limit limit
* @return workflow log content
*/
@Operation(summary = "queryWorkflowLog", description = "QUERY_WORKFLOW_INSTANCE_LOG_NOTES")
@Parameters({
@Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")),
@Parameter(name = "skipLineNum", description = "SKIP_LINE_NUM", required = true, schema = @Schema(implementation = int.class, example = "100")),
@Parameter(name = "limit", description = "LIMIT", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_INSTANCE_LOG_ERROR)
public Result<ResponseWorkflowLog> queryWorkflowLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "workflowInstanceId") int workflowInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return workflowLoggerService.queryWorkflowLog(loginUser, workflowInstanceId, skipNum, limit);
}

/**
* download workflow log file
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @return log file content
*/
@Operation(summary = "downloadWorkflowLog", description = "DOWNLOAD_WORKFLOW_INSTANCE_LOG_NOTES")
@Parameters({
@Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@GetMapping(value = "/download-log")
@ResponseBody
@ApiException(DOWNLOAD_WORKFLOW_INSTANCE_LOG_FILE_ERROR)
public ResponseEntity downloadWorkflowLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "workflowInstanceId") int workflowInstanceId) {
byte[] logBytes = workflowLoggerService.getWorkflowLogBytes(loginUser, workflowInstanceId);
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + "workflow-" + workflowInstanceId + ".log" + "\"")
.body(logBytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public enum Status {
UPDATE_PROJECT_PREFERENCE_STATE_ERROR(10303, "Failed to update the state of the project preference", "更新项目偏好设置错误"),
VERSION_INFO_STATE_ERROR(10304, "Failed to obtain project version and address", "获取版本信息错误"),

QUERY_WORKFLOW_INSTANCE_LOG_ERROR(10305, "view workflow instance log error: {0}", "查询工作流实例日志错误: {0}"),
DOWNLOAD_WORKFLOW_INSTANCE_LOG_FILE_ERROR(10306, "download workflow instance log file error", "下载工作流日志文件错误"),
WORKFLOW_INSTANCE_NOT_FOUND(10307, "workflow instance not found", "工作流实例不存在"),
WORKFLOW_INSTANCE_HOST_IS_NULL(10308, "workflow instance host is null", "工作流实例host为空"),

OIDC_TOKEN_EXCHANGE_FAILED(15000, "OIDC token exchange failed", "OIDC令牌交换失败"),
OIDC_ID_TOKEN_ISSUER_INVALID(15001, "Invalid issuer in OIDC ID token", "OIDC ID令牌的颁发者无效"),
OIDC_ID_TOKEN_AUDIENCE_INVALID(15002, "Invalid audience in OIDC ID token", "OIDC ID令牌的受众无效"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.dolphinscheduler.api.executor.logging;

import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -85,4 +90,40 @@ private ILogService getProxyLogService(TaskInstance taskInstance) {
log.debug("Created log service for host: {}", taskInstance.getHost());
return logService;
}

public WorkflowInstanceLogPageQueryResponse getWorkflowPartLog(WorkflowInstance workflowInstance, int skipLineNum,
int limit) {
return getLocalWorkflowPartLog(workflowInstance, skipLineNum, limit);
}

public WorkflowInstanceLogFileDownloadResponse getWorkflowWholeLog(WorkflowInstance workflowInstance) {
return getLocalWorkflowWholeLog(workflowInstance);
}

private WorkflowInstanceLogFileDownloadResponse getLocalWorkflowWholeLog(WorkflowInstance workflowInstance) {
WorkflowInstanceLogFileDownloadRequest request = new WorkflowInstanceLogFileDownloadRequest(
workflowInstance.getId(),
workflowInstance.getLogPath());
return getProxyWorkflowLogService(workflowInstance).getWorkflowInstanceWholeLogFileBytes(request);
}

private WorkflowInstanceLogPageQueryResponse getLocalWorkflowPartLog(WorkflowInstance workflowInstance,
int skipLineNum, int limit) {
WorkflowInstanceLogPageQueryRequest request = WorkflowInstanceLogPageQueryRequest
.builder()
.workflowInstanceId(workflowInstance.getId())
.workflowInstanceLogAbsolutePath(workflowInstance.getLogPath())
.skipLineNum(skipLineNum)
.limit(limit)
.build();
return getProxyWorkflowLogService(workflowInstance).pageQueryWorkflowInstanceLog(request);
}

private ILogService getProxyWorkflowLogService(WorkflowInstance workflowInstance) {
ILogService logService = Clients
.withService(ILogService.class)
.withHost(workflowInstance.getHost());
log.debug("Created log service for host: {}", workflowInstance.getHost());
return logService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.dolphinscheduler.api.executor.logging;

import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.common.transportor.LogResponseStatus;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.WorkflowInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
Expand Down Expand Up @@ -109,4 +112,72 @@ private boolean checkNodeExists(TaskInstance taskInstance) {
return exists;
}

/**
* Retrieves a portion of the log string for a given workflow instance.
* This method first attempts to fetch the log from local storage; if unsuccessful, it tries to obtain the log from remote storage.
*
* @param workflowInstance The workflow instance object, containing information needed for log retrieval.
* @param skipLineNum The number of log lines to skip from the beginning.
* @param limit The maximum number of log lines to retrieve.
* @return A string containing the specified portion of the log.
*/
public String getWorkflowPartLogString(WorkflowInstance workflowInstance, int skipLineNum, int limit) {
checkWorkflowArgs(workflowInstance);
if (checkWorkflowNodeExists(workflowInstance)) {
WorkflowInstanceLogPageQueryResponse response =
localLogClient.getWorkflowPartLog(workflowInstance, skipLineNum, limit);
if (response.getCode() == LogResponseStatus.SUCCESS) {
return response.getLogContent();
} else {
log.warn("get part log string is not success for workflow instance {}; reason :{}",
workflowInstance.getId(), response.getMessage());
return remoteLogClient.getWorkflowPartLog(workflowInstance, skipLineNum, limit);
}
} else {
return remoteLogClient.getWorkflowPartLog(workflowInstance, skipLineNum, limit);
}
}

/**
* Retrieves the complete log content for a given workflow instance as a byte array.
* This method first attempts to fetch the log from local storage; if unsuccessful, it tries to obtain the log from remote storage.
*
* @param workflowInstance The workflow instance object, containing information needed for log retrieval.
* @return A byte array containing the complete log content.
*/
public byte[] getWorkflowWholeLogBytes(WorkflowInstance workflowInstance) {
checkWorkflowArgs(workflowInstance);
if (checkWorkflowNodeExists(workflowInstance)) {
WorkflowInstanceLogFileDownloadResponse response = localLogClient.getWorkflowWholeLog(workflowInstance);
if (response.getCode() == LogResponseStatus.SUCCESS) {
return response.getLogBytes();
} else {
log.warn("get whole log bytes is not success for workflow instance {}; reason :{}",
workflowInstance.getId(), response.getMessage());
return remoteLogClient.getWorkflowWholeLog(workflowInstance);
}
} else {
return remoteLogClient.getWorkflowWholeLog(workflowInstance);
}
}

private static void checkWorkflowArgs(WorkflowInstance workflowInstance) {
if (workflowInstance == null) {
throw new IllegalArgumentException("workflow instance is null");
}
}

private boolean checkWorkflowNodeExists(WorkflowInstance workflowInstance) {
String host = workflowInstance.getHost();
if (host == null || host.isEmpty()) {
log.warn("Host is null or empty for workflow instance {}", workflowInstance.getId());
return false;
}
boolean exists = registryClient.checkNodeExists(host, RegistryNodeType.MASTER);
if (!exists) {
log.warn("Node {} does not exist for workflow instance {}", host, workflowInstance.getId());
}
return exists;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;

import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -51,4 +52,13 @@ public String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit)
LogUtils.readPartFileContentFromRemote(taskInstance.getLogPath(), skipLineNum, limit));
}

public byte[] getWorkflowWholeLog(WorkflowInstance workflowInstance) {
return LogUtils.getFileContentBytesFromRemote(workflowInstance.getLogPath());
}

public String getWorkflowPartLog(WorkflowInstance workflowInstance, int skipLineNum, int limit) {
return LogUtils.rollViewLogLines(
LogUtils.readPartFileContentFromRemote(workflowInstance.getLogPath(), skipLineNum, limit));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.ResponseWorkflowLog;
import org.apache.dolphinscheduler.dao.entity.User;

/**
* Workflow logger service
*/
public interface WorkflowLoggerService {

/**
* query workflow log
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @param skipLineNum skip line number
* @param limit limit
* @return workflow log content
*/
Result<ResponseWorkflowLog> queryWorkflowLog(User loginUser, int workflowInstanceId, int skipLineNum, int limit);

/**
* get workflow log bytes
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @return log byte array
*/
byte[] getWorkflowLogBytes(User loginUser, int workflowInstanceId);

/**
* query workflow log in specified project
*
* @param loginUser login user
* @param projectCode project code
* @param workflowInstanceId workflow instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
String queryWorkflowLog(User loginUser, long projectCode, int workflowInstanceId, int skipLineNum, int limit);

/**
* get workflow log bytes in specified project
*
* @param loginUser login user
* @param projectCode project code
* @param workflowInstanceId workflow instance id
* @return log byte array
*/
byte[] getWorkflowLogBytes(User loginUser, long projectCode, int workflowInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.utils.GlobalParameterUtils;
Expand Down Expand Up @@ -818,6 +820,18 @@ public void deleteWorkflowInstanceById(int workflowInstanceId) {
deleteSubWorkflowInstanceIfNeeded(workflowInstanceId);
// delete alert
alertDao.deleteByWorkflowInstanceId(workflowInstanceId);
// delete workflow instance log
WorkflowInstance workflowInstance = processService.findWorkflowInstanceById(workflowInstanceId);
if (workflowInstance != null && StringUtils.isNotBlank(workflowInstance.getLogPath())) {
// Remove workflow instance log failed will not affect the deletion of workflow instance
try {
Clients.withService(ILogService.class)
.withHost(workflowInstance.getHost())
.removeWorkflowInstanceLog(workflowInstance.getLogPath());
} catch (Exception ex) {
log.error("Remove workflow instance log error", ex);
}
}
// delete workflow instance
workflowInstanceDao.deleteById(workflowInstanceId);
}
Expand Down
Loading
Loading