Skip to content

Commit 9295525

Browse files
committed
Fetch the list of task ids to persist behind lock
1 parent 78ee88a commit 9295525

2 files changed

Lines changed: 61 additions & 20 deletions

File tree

SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityTaskHistoryPersister.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.hubspot.singularity.config.SingularityConfiguration;
1515
import com.hubspot.singularity.data.DeployManager;
1616
import com.hubspot.singularity.data.TaskManager;
17+
import com.hubspot.singularity.mesos.SingularitySchedulerLock;
1718
import java.util.ArrayList;
1819
import java.util.Comparator;
1920
import java.util.List;
@@ -42,6 +43,7 @@ public class SingularityTaskHistoryPersister
4243
private final DeployManager deployManager;
4344
private final HistoryManager historyManager;
4445
private final int agentReregisterTimeoutSeconds;
46+
private final SingularitySchedulerLock singularitySchedulerLock;
4547

4648
@Inject
4749
public SingularityTaskHistoryPersister(
@@ -50,6 +52,7 @@ public SingularityTaskHistoryPersister(
5052
DeployManager deployManager,
5153
HistoryManager historyManager,
5254
SingularityManagedThreadPoolFactory managedThreadPoolFactory,
55+
SingularitySchedulerLock singularitySchedulerLock,
5356
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
5457
@Named(
5558
SingularityHistoryModule.LAST_TASK_PERSISTER_SUCCESS
@@ -61,6 +64,7 @@ public SingularityTaskHistoryPersister(
6164
this.deployManager = deployManager;
6265
this.agentReregisterTimeoutSeconds =
6366
configuration.getMesosConfiguration().getAgentReregisterTimeoutSeconds();
67+
this.singularitySchedulerLock = singularitySchedulerLock;
6468
}
6569

6670
@Override
@@ -88,22 +92,30 @@ public void runActionOnPoll() {
8892
() -> {
8993
try {
9094
LOG.debug("Checking request {}", requestId);
91-
List<SingularityTaskId> taskIds = taskManager.getTaskIdsForRequest(
92-
requestId
95+
List<SingularityTaskId> taskIds = singularitySchedulerLock.runWithRequestLockAndReturn(
96+
() -> {
97+
List<SingularityTaskId> ids = taskManager.getTaskIdsForRequest(
98+
requestId
99+
);
100+
ids.removeAll(taskManager.getActiveTaskIdsForRequest(requestId));
101+
ids.removeAll(taskManager.getLBCleanupTasks());
102+
List<SingularityPendingDeploy> pendingDeploys = deployManager.getPendingDeploys();
103+
ids =
104+
ids
105+
.stream()
106+
.filter(
107+
taskId ->
108+
!isPartOfPendingDeploy(pendingDeploys, taskId) &&
109+
!couldReturnWithRecoveredAgent(taskId)
110+
)
111+
.sorted(SingularityTaskId.STARTED_AT_COMPARATOR_DESC)
112+
.collect(Collectors.toList());
113+
return ids;
114+
},
115+
requestId,
116+
"task history persister fetch"
93117
);
94-
taskIds.removeAll(taskManager.getActiveTaskIdsForRequest(requestId));
95-
taskIds.removeAll(taskManager.getLBCleanupTasks());
96-
List<SingularityPendingDeploy> pendingDeploys = deployManager.getPendingDeploys();
97-
taskIds =
98-
taskIds
99-
.stream()
100-
.filter(
101-
taskId ->
102-
!isPartOfPendingDeploy(pendingDeploys, taskId) &&
103-
!couldReturnWithRecoveredAgent(taskId)
104-
)
105-
.sorted(SingularityTaskId.STARTED_AT_COMPARATOR_DESC)
106-
.collect(Collectors.toList());
118+
107119
int forRequest = 0;
108120
int transferred = 0;
109121
for (SingularityTaskId taskId : taskIds) {

SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,16 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
373373
// If a task is missing data in Singularity there is not much we can do to recover it
374374
Optional<SingularityTask> maybeTask = taskManager.getTask(taskIdObj);
375375
if (!maybeTask.isPresent()) {
376-
maybeTask = tryFindMissingTaskData(taskIdObj, taskId);
376+
maybeTask = tryFindMissingTaskData(taskIdObj, taskId, taskState);
377377
}
378378
if (!maybeTask.isPresent()) {
379-
handledMissingTaskData(taskIdObj, taskId, newTaskStatusHolder, taskState, now);
379+
return handledMissingTaskData(
380+
taskIdObj,
381+
taskId,
382+
newTaskStatusHolder,
383+
taskState,
384+
now
385+
);
380386
}
381387

382388
SingularityTask task = maybeTask.get();
@@ -636,15 +642,38 @@ private StatusUpdateResult tryRecoverTask(
636642

637643
private Optional<SingularityTask> tryFindMissingTaskData(
638644
SingularityTaskId taskIdObj,
639-
String taskId
645+
String taskId,
646+
ExtendedTaskState taskState
640647
) {
641648
LOG.warn("Missing task data for {}, trying to recover", taskId);
649+
// If found in this first step, it was a bad zk write and everything should just work
642650
Optional<SingularityTask> maybeTask = taskManager.tryRepairTask(taskIdObj);
643651
if (!maybeTask.isPresent()) {
644652
// Ensure history manager calls cannot interrupt the status update path
645653
try {
646-
maybeTask =
647-
historyManager.getTaskHistory(taskId).map(SingularityTaskHistory::getTask);
654+
Optional<SingularityTaskHistory> maybeTaskHistory = historyManager.getTaskHistory(
655+
taskId
656+
);
657+
if (maybeTaskHistory.isPresent()) {
658+
maybeTask = maybeTaskHistory.map(SingularityTaskHistory::getTask);
659+
if (
660+
maybeTaskHistory
661+
.get()
662+
.getLastTaskUpdate()
663+
.map(SingularityTaskHistoryUpdate::getTaskState)
664+
.orElse(taskState)
665+
.isDone() &&
666+
!taskState.isDone()
667+
) {
668+
// Don't bother with LB state/etc recovery, let the task get killed and replaced as a cleaner replacement
669+
LOG.info(
670+
"Recovered task {} was previously marked as done. Will not reactivate fully",
671+
taskId
672+
);
673+
taskManager.repairFoundTask(maybeTask.get());
674+
return Optional.empty();
675+
}
676+
}
648677
} catch (Exception e) {
649678
LOG.error("Could not fetch {} from history", taskId, e);
650679
}

0 commit comments

Comments
 (0)