|
50 | 50 | import java.util.List; |
51 | 51 | import java.util.Map; |
52 | 52 | import java.util.Optional; |
| 53 | +import java.util.concurrent.atomic.AtomicBoolean; |
53 | 54 | import java.util.stream.Collectors; |
54 | 55 | import org.apache.curator.framework.CuratorFramework; |
55 | 56 | import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; |
| 57 | +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; |
56 | 58 | import org.apache.curator.utils.ZKPaths; |
57 | 59 | import org.apache.zookeeper.KeeperException; |
58 | 60 | import org.apache.zookeeper.KeeperException.NodeExistsException; |
| 61 | +import org.apache.zookeeper.OpResult.ErrorResult; |
59 | 62 | import org.apache.zookeeper.data.Stat; |
60 | 63 | import org.slf4j.Logger; |
61 | 64 | import org.slf4j.LoggerFactory; |
@@ -1304,6 +1307,21 @@ public void createTaskAndDeletePendingTask(SingularityTask task) { |
1304 | 1307 | } |
1305 | 1308 | } |
1306 | 1309 |
|
| 1310 | + public Optional<SingularityTask> tryRepairTask(SingularityTaskId taskId) { |
| 1311 | + try { |
| 1312 | + Optional<SingularityTask> maybeTask = getTask(taskId); // checks zkCache for task data |
| 1313 | + String path = getTaskPath(taskId); |
| 1314 | + if (maybeTask.isPresent() && !exists(path)) { |
| 1315 | + LOG.info("Found info for task {} from cache not in zk node, rewriting", taskId); |
| 1316 | + save(path, maybeTask.map(taskTranscoder::toBytes)); |
| 1317 | + leaderCache.putActiveTask(taskId); |
| 1318 | + } |
| 1319 | + return maybeTask; |
| 1320 | + } catch (Exception e) { |
| 1321 | + return Optional.empty(); |
| 1322 | + } |
| 1323 | + } |
| 1324 | + |
1307 | 1325 | public Map<SingularityTaskId, SingularityTask> getTasks( |
1308 | 1326 | Iterable<SingularityTaskId> taskIds |
1309 | 1327 | ) { |
@@ -1376,29 +1394,40 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) |
1376 | 1394 | } |
1377 | 1395 | } |
1378 | 1396 |
|
| 1397 | + AtomicBoolean hasErr = new AtomicBoolean(false); |
1379 | 1398 | try { |
1380 | 1399 | final String path = getTaskPath(task.getTaskId()); |
1381 | | - |
1382 | | - CuratorTransactionFinal transaction = curator |
1383 | | - .inTransaction() |
1384 | | - .create() |
1385 | | - .forPath(path, taskTranscoder.toBytes(task)) |
1386 | | - .and(); |
1387 | | - |
1388 | | - transaction |
1389 | | - .create() |
1390 | | - .forPath( |
1391 | | - getLastActiveTaskStatusPath(task.getTaskId()), |
1392 | | - taskStatusTranscoder.toBytes(taskStatusHolder) |
| 1400 | + curator |
| 1401 | + .transaction() |
| 1402 | + .forOperations( |
| 1403 | + curator.transactionOp().create().forPath(path, taskTranscoder.toBytes(task)), |
| 1404 | + curator |
| 1405 | + .transactionOp() |
| 1406 | + .create() |
| 1407 | + .forPath( |
| 1408 | + getLastActiveTaskStatusPath(task.getTaskId()), |
| 1409 | + taskStatusTranscoder.toBytes(taskStatusHolder) |
| 1410 | + ) |
1393 | 1411 | ) |
1394 | | - .and() |
1395 | | - .commit(); |
| 1412 | + .forEach( |
| 1413 | + r -> { |
| 1414 | + if (r.getError() > 0) { |
| 1415 | + LOG.error("Error committing new task {} to zk {}", task.getTaskId(), r); |
| 1416 | + hasErr.set(true); |
| 1417 | + } |
| 1418 | + } |
| 1419 | + ); |
1396 | 1420 |
|
1397 | 1421 | // Not checking isActive here, already called within offer check flow |
1398 | 1422 | leaderCache.putActiveTask(task.getTaskId()); |
1399 | 1423 | taskCache.set(path, task); |
1400 | 1424 | } catch (KeeperException.NodeExistsException nee) { |
1401 | 1425 | LOG.error("Task or active path already existed for {}", task.getTaskId()); |
| 1426 | + } finally { |
| 1427 | + if (hasErr.get()) { |
| 1428 | + // Rare case, but in case the transaction failed to write task data, try again from the cache data |
| 1429 | + tryRepairTask(task.getTaskId()); |
| 1430 | + } |
1402 | 1431 | } |
1403 | 1432 | } |
1404 | 1433 |
|
|
0 commit comments