Skip to content

Commit a8da4c6

Browse files
committed
Update transaction and attempt to repair task data
1 parent 698dcb7 commit a8da4c6

3 files changed

Lines changed: 73 additions & 17 deletions

File tree

SingularityService/src/main/java/com/hubspot/singularity/data/TaskManager.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,15 @@
5050
import java.util.List;
5151
import java.util.Map;
5252
import java.util.Optional;
53+
import java.util.concurrent.atomic.AtomicBoolean;
5354
import java.util.stream.Collectors;
5455
import org.apache.curator.framework.CuratorFramework;
5556
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
57+
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
5658
import org.apache.curator.utils.ZKPaths;
5759
import org.apache.zookeeper.KeeperException;
5860
import org.apache.zookeeper.KeeperException.NodeExistsException;
61+
import org.apache.zookeeper.OpResult.ErrorResult;
5962
import org.apache.zookeeper.data.Stat;
6063
import org.slf4j.Logger;
6164
import org.slf4j.LoggerFactory;
@@ -1304,6 +1307,21 @@ public void createTaskAndDeletePendingTask(SingularityTask task) {
13041307
}
13051308
}
13061309

1310+
public Optional<SingularityTask> tryRepairTask(SingularityTaskId taskId) {
1311+
try {
1312+
Optional<SingularityTask> maybeTask = getTask(taskId); // checks zkCache for task data
1313+
String path = getTaskPath(taskId);
1314+
if (maybeTask.isPresent() && !exists(path)) {
1315+
LOG.info("Found info for task {} from cache not in zk node, rewriting", taskId);
1316+
save(path, maybeTask.map(taskTranscoder::toBytes));
1317+
leaderCache.putActiveTask(taskId);
1318+
}
1319+
return maybeTask;
1320+
} catch (Exception e) {
1321+
return Optional.empty();
1322+
}
1323+
}
1324+
13071325
public Map<SingularityTaskId, SingularityTask> getTasks(
13081326
Iterable<SingularityTaskId> taskIds
13091327
) {
@@ -1379,24 +1397,35 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
13791397
try {
13801398
final String path = getTaskPath(task.getTaskId());
13811399

1382-
CuratorTransactionFinal transaction = curator
1383-
.inTransaction()
1384-
.create()
1385-
.forPath(path, taskTranscoder.toBytes(task))
1386-
.and();
1387-
1388-
transaction
1389-
.create()
1390-
.forPath(
1391-
getLastActiveTaskStatusPath(task.getTaskId()),
1392-
taskStatusTranscoder.toBytes(taskStatusHolder)
1400+
AtomicBoolean hasErr = new AtomicBoolean(false);
1401+
curator
1402+
.transaction()
1403+
.forOperations(
1404+
curator.transactionOp().create().forPath(path, taskTranscoder.toBytes(task)),
1405+
curator
1406+
.transactionOp()
1407+
.create()
1408+
.forPath(
1409+
getLastActiveTaskStatusPath(task.getTaskId()),
1410+
taskStatusTranscoder.toBytes(taskStatusHolder)
1411+
)
13931412
)
1394-
.and()
1395-
.commit();
1413+
.forEach(
1414+
r -> {
1415+
if (r.getError() > 0) {
1416+
LOG.error("Error committing new task {} to zk {}", task.getTaskId(), r);
1417+
hasErr.set(true);
1418+
}
1419+
}
1420+
);
13961421

13971422
// Not checking isActive here, already called within offer check flow
13981423
leaderCache.putActiveTask(task.getTaskId());
13991424
taskCache.set(path, task);
1425+
if (hasErr.get()) {
1426+
// Rare case, but in case the transaction failed to write task data, try again from the cache data
1427+
tryRepairTask(task.getTaskId());
1428+
}
14001429
} catch (KeeperException.NodeExistsException nee) {
14011430
LOG.error("Task or active path already existed for {}", task.getTaskId());
14021431
}

SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
515515
return StatusUpdateResult.IGNORED;
516516
}
517517

518-
final Optional<SingularityTask> task = taskManager.getTask(taskIdObj);
518+
Optional<SingularityTask> task = taskManager.getTask(taskIdObj);
519519

520520
if (status.getState() == TaskState.TASK_LOST) {
521521
boolean isMesosFailure =
@@ -542,6 +542,9 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
542542
}
543543

544544
if (!taskState.isDone()) {
545+
if (!task.isPresent()) {
546+
task = taskManager.tryRepairTask(taskIdObj);
547+
}
545548
if (task.isPresent()) {
546549
final Optional<SingularityPendingDeploy> pendingDeploy = deployManager.getPendingDeploy(
547550
taskIdObj.getRequestId()

SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityDeployChecker.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,9 +1495,33 @@ private SingularityDeployResult enqueueAndProcessLbRequest(
14951495
toShutDown = otherActiveTasks;
14961496
}
14971497

1498-
final Map<SingularityTaskId, SingularityTask> tasks = taskManager.getTasks(
1499-
Iterables.concat(deployActiveTasks, toShutDown)
1500-
);
1498+
Iterable<SingularityTaskId> allIds = Iterables.concat(deployActiveTasks, toShutDown);
1499+
final Map<SingularityTaskId, SingularityTask> tasks = taskManager.getTasks(allIds);
1500+
// Handle a case where occasionally a task's data cannot be found in zk, but the task is running
1501+
if (
1502+
!tasks.keySet().containsAll(deployActiveTasks) ||
1503+
!tasks.keySet().containsAll(toShutDown)
1504+
) {
1505+
LOG.warn("Could not find task data for task to shut down, will check taskCache");
1506+
for (SingularityTaskId taskId : deployActiveTasks) {
1507+
if (!tasks.containsKey(taskId)) {
1508+
Optional<SingularityTask> maybeRepaired = taskManager.tryRepairTask(taskId);
1509+
if (maybeRepaired.isPresent()) {
1510+
LOG.info("Repaired and fetched definition for {}", taskId);
1511+
tasks.put(taskId, maybeRepaired.get());
1512+
}
1513+
}
1514+
}
1515+
for (SingularityTaskId taskId : toShutDown) {
1516+
if (!tasks.containsKey(taskId)) {
1517+
Optional<SingularityTask> maybeRepaired = taskManager.tryRepairTask(taskId);
1518+
if (maybeRepaired.isPresent()) {
1519+
LOG.info("Repaired and fetched definition for {}", taskId);
1520+
tasks.put(taskId, maybeRepaired.get());
1521+
}
1522+
}
1523+
}
1524+
}
15011525
final LoadBalancerRequestId lbRequestId = SingularityDeployCheckHelper.getNewLoadBalancerRequestId(
15021526
pendingDeploy
15031527
);

0 commit comments

Comments
 (0)