Skip to content

Commit 5fe3011

Browse files
committed
Handle tasks which no longer have data in zk
1 parent eadde5c commit 5fe3011

1 file changed

Lines changed: 54 additions & 57 deletions

File tree

SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java

Lines changed: 54 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
308308
long now = System.currentTimeMillis();
309309
long delta = now - timestamp;
310310

311-
LOG.debug(
311+
LOG.info(
312312
"Update: task {} is now {} ({}) at {} (delta: {})",
313313
taskId,
314314
status.getState(),
@@ -507,16 +507,39 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
507507
} else {
508508
return StatusUpdateResult.KILL_TASK;
509509
}
510-
} else if (
510+
}
511+
512+
// If a task is missing data in Singularity there is not much we can do to recover it
513+
Optional<SingularityTask> maybeTask = taskManager.getTask(taskIdObj);
514+
if (!maybeTask.isPresent()) {
515+
LOG.warn("Missing task data for {}, trying to recover", taskId);
516+
maybeTask = taskManager.tryRepairTask(taskIdObj);
517+
}
518+
if (!maybeTask.isPresent()) {
519+
if (taskState.isDone()) {
520+
saveNewTaskStatusHolder(taskIdObj, newTaskStatusHolder, taskState);
521+
return StatusUpdateResult.DONE;
522+
} else {
523+
final String message = String.format(
524+
"Task %s is active but is missing task data, killing task",
525+
taskId
526+
);
527+
exceptionNotifier.notify(message);
528+
LOG.error(message);
529+
return StatusUpdateResult.KILL_TASK;
530+
}
531+
}
532+
533+
SingularityTask task = maybeTask.get();
534+
535+
if (
511536
isDuplicateOrIgnorableStatusUpdate(previousTaskStatusHolder, newTaskStatusHolder)
512537
) {
513538
LOG.trace("Ignoring status update {} to {}", taskState, taskIdObj);
514539
saveNewTaskStatusHolder(taskIdObj, newTaskStatusHolder, taskState);
515540
return StatusUpdateResult.IGNORED;
516541
}
517542

518-
Optional<SingularityTask> task = taskManager.getTask(taskIdObj);
519-
520543
if (status.getState() == TaskState.TASK_LOST) {
521544
boolean isMesosFailure =
522545
status.getReason() == Reason.REASON_INVALID_OFFERS ||
@@ -526,14 +549,12 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
526549
status.getReason() == Reason.REASON_MASTER_DISCONNECTED ||
527550
status.getReason() == Reason.REASON_AGENT_DISCONNECTED;
528551

529-
RequestType requestType = task.isPresent()
530-
? task.get().getTaskRequest().getRequest().getRequestType()
531-
: null;
552+
RequestType requestType = task.getTaskRequest().getRequest().getRequestType();
532553
boolean isRelaunchable = requestType != null && !requestType.isLongRunning();
533554

534555
if (isMesosFailure && isRelaunchable) {
535556
LOG.info("Relaunching lost task {}", task);
536-
relaunchTask(task.get());
557+
relaunchTask(task);
537558
}
538559
lostTasksMeter.mark();
539560
if (configuration.getDisasterDetection().isEnabled()) {
@@ -542,67 +563,40 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
542563
}
543564

544565
if (!taskState.isDone()) {
545-
if (!task.isPresent()) {
546-
task = taskManager.tryRepairTask(taskIdObj);
547-
}
548-
if (task.isPresent()) {
549-
final Optional<SingularityPendingDeploy> pendingDeploy = deployManager.getPendingDeploy(
550-
taskIdObj.getRequestId()
551-
);
566+
final Optional<SingularityPendingDeploy> pendingDeploy = deployManager.getPendingDeploy(
567+
taskIdObj.getRequestId()
568+
);
552569

553-
Optional<SingularityRequestWithState> requestWithState = Optional.empty();
570+
Optional<SingularityRequestWithState> requestWithState = Optional.empty();
554571

555-
if (taskState == ExtendedTaskState.TASK_RUNNING) {
556-
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
557-
healthchecker.enqueueHealthcheck(task.get(), pendingDeploy, requestWithState);
558-
}
572+
if (taskState == ExtendedTaskState.TASK_RUNNING) {
573+
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
574+
healthchecker.enqueueHealthcheck(task, pendingDeploy, requestWithState);
575+
}
559576

560-
if (
561-
!pendingDeploy.isPresent() ||
562-
!pendingDeploy
563-
.get()
564-
.getDeployMarker()
565-
.getDeployId()
566-
.equals(taskIdObj.getDeployId())
567-
) {
568-
if (!requestWithState.isPresent()) {
569-
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
570-
}
571-
newTaskChecker.enqueueNewTaskCheck(task.get(), requestWithState, healthchecker);
577+
if (
578+
!pendingDeploy.isPresent() ||
579+
!pendingDeploy
580+
.get()
581+
.getDeployMarker()
582+
.getDeployId()
583+
.equals(taskIdObj.getDeployId())
584+
) {
585+
if (!requestWithState.isPresent()) {
586+
requestWithState = requestManager.getRequest(taskIdObj.getRequestId());
572587
}
573-
} else {
574-
final String message = String.format(
575-
"Task %s is active but is missing task data",
576-
taskId
577-
);
578-
taskManager.createTaskCleanup(
579-
new SingularityTaskCleanup(
580-
Optional.empty(),
581-
TaskCleanupType.UNHEALTHY_NEW_TASK,
582-
System.currentTimeMillis(),
583-
taskIdObj,
584-
Optional.of(
585-
"Task is active but had no task data. Unable to continue running"
586-
),
587-
Optional.empty(),
588-
Optional.empty()
589-
)
590-
);
591-
exceptionNotifier.notify(message);
592-
LOG.error(message);
588+
newTaskChecker.enqueueNewTaskCheck(task, requestWithState, healthchecker);
593589
}
594590
}
595591

596-
final Optional<String> statusMessage = getStatusMessage(status, task);
592+
final Optional<String> statusMessage = getStatusMessage(status, Optional.of(task));
597593

598594
final SingularityTaskHistoryUpdate taskUpdate = new SingularityTaskHistoryUpdate(
599595
taskIdObj,
600596
timestamp,
601597
taskState,
602598
statusMessage,
603-
status.hasReason()
604-
? Optional.of(status.getReason().name())
605-
: Optional.<String>empty()
599+
status.hasReason() ? Optional.of(status.getReason().name()) : Optional.empty()
606600
);
607601
final SingularityCreateResult taskHistoryUpdateCreateResult = taskManager.saveTaskHistoryUpdate(
608602
taskUpdate
@@ -621,7 +615,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
621615
taskIdObj,
622616
taskState,
623617
taskHistoryUpdateCreateResult,
624-
task,
618+
Optional.of(task),
625619
timestamp
626620
);
627621
}
@@ -665,8 +659,11 @@ public boolean hasRoomForMoreUpdates() {
665659
public CompletableFuture<StatusUpdateResult> processStatusUpdateAsync(
666660
Protos.TaskStatus status
667661
) {
662+
LOG.info("Creating status update -- task: " + status.getTaskId());
668663
return CompletableFuture.supplyAsync(
669664
() -> {
665+
LOG.info("Starting status update -- task: " + status.getTaskId());
666+
670667
final String taskId = status.getTaskId().getValue();
671668
final Optional<SingularityTaskId> maybeTaskId = getTaskId(taskId);
672669

0 commit comments

Comments
 (0)