@@ -1315,15 +1315,29 @@ public void createTaskAndDeletePendingTask(SingularityTask task) {
13151315 public Optional <SingularityTask > tryRepairTask (SingularityTaskId taskId ) {
13161316 try {
13171317 Optional <SingularityTask > maybeTask = getTask (taskId ); // checks zkCache for task data
1318- String path = getTaskPath (taskId );
1319- if (maybeTask .isPresent () && !exists (path )) {
1320- LOG .info ("Found info for task {} from cache not in zk node, rewriting" , taskId );
1321- save (path , maybeTask .map (taskTranscoder ::toBytes ));
1322- leaderCache .putActiveTask (taskId );
1318+ if (maybeTask .isPresent () && repairFoundTask (maybeTask .get ())) {
1319+ return maybeTask ;
13231320 }
1324- return maybeTask ;
13251321 } catch (Exception e ) {
1326- return Optional .empty ();
1322+ LOG .error ("Could not find or repair task data for {}" , taskId , e );
1323+ }
1324+ return Optional .empty ();
1325+ }
1326+
1327+ public boolean repairFoundTask (SingularityTask task ) {
1328+ try {
1329+ String path = getTaskPath (task .getTaskId ());
1330+ LOG .info (
1331+ "Found info for task {} from cache not in zk node, rewriting" ,
1332+ task .getTaskId ()
1333+ );
1334+ save (path , Optional .of (taskTranscoder .toBytes (task )));
1335+ leaderCache .putActiveTask (task .getTaskId ());
1336+ taskCache .set (path , task );
1337+ return true ;
1338+ } catch (Exception e ) {
1339+ LOG .error ("Could not repair task data for {}" , task .getTaskId (), e );
1340+ return false ;
13271341 }
13281342 }
13291343
0 commit comments