@@ -1402,32 +1402,22 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
14021402 AtomicBoolean hasErr = new AtomicBoolean (false );
14031403 try {
14041404 final String path = getTaskPath (task .getTaskId ());
1405- curator
1406- .transaction ()
1407- .forOperations (
1408- curator .transactionOp ().create ().forPath (path , taskTranscoder .toBytes (task )),
1409- curator
1410- .transactionOp ()
1411- .create ()
1412- .forPath (
1413- getLastActiveTaskStatusPath (task .getTaskId ()),
1414- taskStatusTranscoder .toBytes (taskStatusHolder )
1415- )
1416- )
1417- .forEach (
1418- r -> {
1419- if (r .getError () > 0 ) {
1420- LOG .error ("Error committing new task {} to zk {}" , task .getTaskId (), r );
1421- hasErr .set (true );
1422- }
1423- }
1424- );
1425-
1405+ saveTaskDeletePendingInTransaction (hasErr , path , task , taskStatusHolder );
14261406 // Not checking isActive here, already called within offer check flow
14271407 leaderCache .putActiveTask (task .getTaskId ());
14281408 taskCache .set (path , task );
1409+ if (configuration .isVerifyTaskDataWrites ()) {
1410+ Optional <SingularityTask > maybeTask = getTaskCheckCache (task .getTaskId (), true );
1411+ if (!maybeTask .isPresent ()) {
1412+ LOG .error ("Found empty task after write for {}" , task .getTaskId ());
1413+ saveTaskDeletePendingInTransaction (hasErr , path , task , taskStatusHolder );
1414+ }
1415+ }
14291416 } catch (KeeperException .NodeExistsException nee ) {
14301417 LOG .error ("Task or active path already existed for {}" , task .getTaskId ());
1418+ } catch (Exception e ) {
1419+ LOG .error ("Could not save task data for {}" , task .getTaskId (), e );
1420+ throw new RuntimeException (e );
14311421 } finally {
14321422 if (hasErr .get ()) {
14331423 // Rare case, but in case the transaction failed to write task data, try again from the cache data
@@ -1436,6 +1426,39 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
14361426 }
14371427 }
14381428
1429+ private void saveTaskDeletePendingInTransaction (
1430+ AtomicBoolean hasErr ,
1431+ String taskPath ,
1432+ SingularityTask task ,
1433+ SingularityTaskStatusHolder taskStatusHolder
1434+ )
1435+ throws Exception {
1436+ byte [] taskBytes = taskTranscoder .toBytes (task );
1437+ if (taskBytes == null || taskBytes .length == 0 ) {
1438+ LOG .error ("Encountered null or empty task bytes for {}" , task .getTaskId ());
1439+ }
1440+ curator
1441+ .transaction ()
1442+ .forOperations (
1443+ curator .transactionOp ().create ().forPath (taskPath , taskBytes ),
1444+ curator
1445+ .transactionOp ()
1446+ .create ()
1447+ .forPath (
1448+ getLastActiveTaskStatusPath (task .getTaskId ()),
1449+ taskStatusTranscoder .toBytes (taskStatusHolder )
1450+ )
1451+ )
1452+ .forEach (
1453+ r -> {
1454+ if (r .getError () > 0 ) {
1455+ LOG .error ("Error committing new task {} to zk {}" , task .getTaskId (), r );
1456+ hasErr .set (true );
1457+ }
1458+ }
1459+ );
1460+ }
1461+
14391462 public List <SingularityTaskId > getLBCleanupTasks () {
14401463 return getChildrenAsIds (LB_CLEANUP_PATH_ROOT , taskIdTranscoder );
14411464 }
0 commit comments