@@ -308,7 +308,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
308308 long now = System .currentTimeMillis ();
309309 long delta = now - timestamp ;
310310
311- LOG .debug (
311+ LOG .info (
312312 "Update: task {} is now {} ({}) at {} (delta: {})" ,
313313 taskId ,
314314 status .getState (),
@@ -507,16 +507,39 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
507507 } else {
508508 return StatusUpdateResult .KILL_TASK ;
509509 }
510- } else if (
510+ }
511+
512+ // If a task is missing data in Singularity there is not much we can do to recover it
513+ Optional <SingularityTask > maybeTask = taskManager .getTask (taskIdObj );
514+ if (!maybeTask .isPresent ()) {
515+ LOG .warn ("Missing task data for {}, trying to recover" , taskId );
516+ maybeTask = taskManager .tryRepairTask (taskIdObj );
517+ }
518+ if (!maybeTask .isPresent ()) {
519+ if (taskState .isDone ()) {
520+ saveNewTaskStatusHolder (taskIdObj , newTaskStatusHolder , taskState );
521+ return StatusUpdateResult .DONE ;
522+ } else {
523+ final String message = String .format (
524+ "Task %s is active but is missing task data, killing task" ,
525+ taskId
526+ );
527+ exceptionNotifier .notify (message );
528+ LOG .error (message );
529+ return StatusUpdateResult .KILL_TASK ;
530+ }
531+ }
532+
533+ SingularityTask task = maybeTask .get ();
534+
535+ if (
511536 isDuplicateOrIgnorableStatusUpdate (previousTaskStatusHolder , newTaskStatusHolder )
512537 ) {
513538 LOG .trace ("Ignoring status update {} to {}" , taskState , taskIdObj );
514539 saveNewTaskStatusHolder (taskIdObj , newTaskStatusHolder , taskState );
515540 return StatusUpdateResult .IGNORED ;
516541 }
517542
518- Optional <SingularityTask > task = taskManager .getTask (taskIdObj );
519-
520543 if (status .getState () == TaskState .TASK_LOST ) {
521544 boolean isMesosFailure =
522545 status .getReason () == Reason .REASON_INVALID_OFFERS ||
@@ -526,14 +549,12 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
526549 status .getReason () == Reason .REASON_MASTER_DISCONNECTED ||
527550 status .getReason () == Reason .REASON_AGENT_DISCONNECTED ;
528551
529- RequestType requestType = task .isPresent ()
530- ? task .get ().getTaskRequest ().getRequest ().getRequestType ()
531- : null ;
552+ RequestType requestType = task .getTaskRequest ().getRequest ().getRequestType ();
532553 boolean isRelaunchable = requestType != null && !requestType .isLongRunning ();
533554
534555 if (isMesosFailure && isRelaunchable ) {
535556 LOG .info ("Relaunching lost task {}" , task );
536- relaunchTask (task . get () );
557+ relaunchTask (task );
537558 }
538559 lostTasksMeter .mark ();
539560 if (configuration .getDisasterDetection ().isEnabled ()) {
@@ -542,67 +563,40 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
542563 }
543564
544565 if (!taskState .isDone ()) {
545- if (!task .isPresent ()) {
546- task = taskManager .tryRepairTask (taskIdObj );
547- }
548- if (task .isPresent ()) {
549- final Optional <SingularityPendingDeploy > pendingDeploy = deployManager .getPendingDeploy (
550- taskIdObj .getRequestId ()
551- );
566+ final Optional <SingularityPendingDeploy > pendingDeploy = deployManager .getPendingDeploy (
567+ taskIdObj .getRequestId ()
568+ );
552569
553- Optional <SingularityRequestWithState > requestWithState = Optional .empty ();
570+ Optional <SingularityRequestWithState > requestWithState = Optional .empty ();
554571
555- if (taskState == ExtendedTaskState .TASK_RUNNING ) {
556- requestWithState = requestManager .getRequest (taskIdObj .getRequestId ());
557- healthchecker .enqueueHealthcheck (task . get () , pendingDeploy , requestWithState );
558- }
572+ if (taskState == ExtendedTaskState .TASK_RUNNING ) {
573+ requestWithState = requestManager .getRequest (taskIdObj .getRequestId ());
574+ healthchecker .enqueueHealthcheck (task , pendingDeploy , requestWithState );
575+ }
559576
560- if (
561- !pendingDeploy .isPresent () ||
562- !pendingDeploy
563- .get ()
564- .getDeployMarker ()
565- .getDeployId ()
566- .equals (taskIdObj .getDeployId ())
567- ) {
568- if (!requestWithState .isPresent ()) {
569- requestWithState = requestManager .getRequest (taskIdObj .getRequestId ());
570- }
571- newTaskChecker .enqueueNewTaskCheck (task .get (), requestWithState , healthchecker );
577+ if (
578+ !pendingDeploy .isPresent () ||
579+ !pendingDeploy
580+ .get ()
581+ .getDeployMarker ()
582+ .getDeployId ()
583+ .equals (taskIdObj .getDeployId ())
584+ ) {
585+ if (!requestWithState .isPresent ()) {
586+ requestWithState = requestManager .getRequest (taskIdObj .getRequestId ());
572587 }
573- } else {
574- final String message = String .format (
575- "Task %s is active but is missing task data" ,
576- taskId
577- );
578- taskManager .createTaskCleanup (
579- new SingularityTaskCleanup (
580- Optional .empty (),
581- TaskCleanupType .UNHEALTHY_NEW_TASK ,
582- System .currentTimeMillis (),
583- taskIdObj ,
584- Optional .of (
585- "Task is active but had no task data. Unable to continue running"
586- ),
587- Optional .empty (),
588- Optional .empty ()
589- )
590- );
591- exceptionNotifier .notify (message );
592- LOG .error (message );
588+ newTaskChecker .enqueueNewTaskCheck (task , requestWithState , healthchecker );
593589 }
594590 }
595591
596- final Optional <String > statusMessage = getStatusMessage (status , task );
592+ final Optional <String > statusMessage = getStatusMessage (status , Optional . of ( task ) );
597593
598594 final SingularityTaskHistoryUpdate taskUpdate = new SingularityTaskHistoryUpdate (
599595 taskIdObj ,
600596 timestamp ,
601597 taskState ,
602598 statusMessage ,
603- status .hasReason ()
604- ? Optional .of (status .getReason ().name ())
605- : Optional .<String >empty ()
599+ status .hasReason () ? Optional .of (status .getReason ().name ()) : Optional .empty ()
606600 );
607601 final SingularityCreateResult taskHistoryUpdateCreateResult = taskManager .saveTaskHistoryUpdate (
608602 taskUpdate
@@ -621,7 +615,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
621615 taskIdObj ,
622616 taskState ,
623617 taskHistoryUpdateCreateResult ,
624- task ,
618+ Optional . of ( task ) ,
625619 timestamp
626620 );
627621 }
@@ -665,8 +659,11 @@ public boolean hasRoomForMoreUpdates() {
665659 public CompletableFuture <StatusUpdateResult > processStatusUpdateAsync (
666660 Protos .TaskStatus status
667661 ) {
662+ LOG .info ("Creating status update -- task: " + status .getTaskId ());
668663 return CompletableFuture .supplyAsync (
669664 () -> {
665+ LOG .info ("Starting status update -- task: " + status .getTaskId ());
666+
670667 final String taskId = status .getTaskId ().getValue ();
671668 final Optional <SingularityTaskId > maybeTaskId = getTaskId (taskId );
672669
0 commit comments