CASSSIDECAR-374: Implement durable operational job tracker#359
CASSSIDECAR-374: Implement durable operational job tracker#359andresbeckruiz wants to merge 2 commits into
Conversation
2f8f987 to
b43cc8a
Compare
b43cc8a to
346a446
Compare
pauloricardomg
left a comment
There was a problem hiding this comment.
I noticed the durable job tracker is never initialized, do we plan to do this in a follow-up PR ? Is the idea that the operator will choose whether to use the durable or the in memory job tracker ?
| @Nullable | ||
| public UUID nodeId() | ||
| { | ||
| return null; |
There was a problem hiding this comment.
Why did we not include this field in the cluster ops schema?
There was a problem hiding this comment.
Also why is this field needed, I don't see it being used anywhere.
| @NotNull | ||
| public List<UUID> nodesExecuting() | ||
| { | ||
| return Collections.emptyList(); |
There was a problem hiding this comment.
Is there any reason why we're not populating these nodes* fields from the cluster_ops_node_state table ?
| * @param job the operational job to convert | ||
| * @return a new record capturing the job's current state | ||
| */ | ||
| public static OperationalJobRecord fromOperationalJob(OperationalJob job) |
There was a problem hiding this comment.
When a job completes, the DurableOperationalJobTracker evicts it from the live map. After that, get(jobId) returns
an OperationalJobRecord from storage. Let's trace what data is available at each stage:
While the job is live (e.g., NodeDecommissionJob):
nodeId() → UUID of the node being decommissioned
startTime() → when execution began
nodesPending() → [nodeId] initially, then [] after execution starts
nodesExecuting()→ [nodeId] while running
nodesSucceeded()→ [nodeId] after success
nodesFailed() → [nodeId] after failure
The factory that persists the job only captures 3 fields:
// OperationalJobRecord.fromOperationalJob(job)
return new OperationalJobRecord(job.jobId(), job.operationType(), job.status());
// ^^^^^^^^ ^^^^^^^^^^^^^^^ ^^^^^^^^^^
// that's it — nodeId, startTime, node lists are all discardedAfter completion, get() returns the record with stubbed implementations:
nodeId() → null (always)
startTime() → null (constructor sets it to null)
nodesPending() → emptyList (always)
nodesExecuting() → emptyList (always)
nodesSucceeded() → emptyList (always)
nodesFailed() → emptyList (always)
So an API client observing a decommission job would see:
DURING execution: AFTER completion (from storage):
───────────────────────────────── ─────────────────────────────────
{ {
"jobId": "abc-123", "jobId": "abc-123",
"operation": "decommission", "operation": "DECOMMISSION",
"jobStatus": "RUNNING", "jobStatus": "SUCCEEDED",
"startTime": "2026-06-12T...", "startTime": null,
"nodesExecuting": ["node-uuid"], "nodesExecuting": [],
"nodesPending": [], "nodesPending": [],
"nodesSucceeded": [], "nodesSucceeded": [],
"nodesFailed": [] "nodesFailed": []
} }
All the provenance — which node was decommissioned, when it started, whether it succeeded per-node — is lost once the job leaves the
local map. The record in Cassandra only has (jobId, operationType, status).
| storageProvider.persistJob(OperationalJobRecord.fromOperationalJob(job)); | ||
|
|
||
| job.asyncResult().onComplete(ar -> { | ||
| liveJobs.remove(job.jobId()); |
There was a problem hiding this comment.
liveJobs removal should come after updateTerminalStatus to avoid RUNNING → CREATED → SUCCEEDED. when polling status between removal and update
| return liveJobs.computeIfAbsent(jobId, id -> { | ||
| OperationalJob job = mappingFunction.apply(id); | ||
|
|
||
| storageProvider.persistJob(OperationalJobRecord.fromOperationalJob(job)); |
There was a problem hiding this comment.
what happens if !storageProvider.isAvailable ?
| TaskExecutorPool executor) | ||
| { | ||
| this.liveJobs = new ConcurrentHashMap<>(serviceConfiguration.operationalJobTrackerSize()); | ||
| this.storageProvider = storageProvider; |
There was a problem hiding this comment.
who is expected to initialize the storage provider ? can this class assume it's already initialized or do we want to initialize here?
| StorageProvider storageProvider, | ||
| TaskExecutorPool executor) | ||
| { | ||
| this.liveJobs = new ConcurrentHashMap<>(serviceConfiguration.operationalJobTrackerSize()); |
There was a problem hiding this comment.
Unlike InMemoryOperationalJobTracker which has removeEldestEntry, the durable tracker has no eviction. If
asyncResult() never completes (e.g., executeBlocking fails to submit due to full pool), the entry stays in the map forever. Is this expected?
| { | ||
| LOGGER.warn("Failed to update terminal status for job {} (attempt {}/{}). error={}", | ||
| job.jobId(), attempt, MAX_STATUS_UPDATE_ATTEMPTS, e.getMessage()); | ||
| if (attempt < MAX_STATUS_UPDATE_ATTEMPTS) |
There was a problem hiding this comment.
- This method is called from above method with attempt=1, if 1st attempt fails, we are doing only one more retry. Should we increase value of MAX_STATUS_UPDATE_ATTEMPTS ?
- Also, if retries also failed to update, we will have stale entries in CREATED stated. We need a periodic thread running once in a while and removing stale entries (this can be deferred to later as not a blocker).
CASSSIDECAR-374
Original PR made against CASSSIDECAR-373 branch with review comments: andresbeckruiz#3.
Changes
DurableOperationalJobTracker: Implementation ofOperationalJobTrackerthat persists job state viaStorageProvider. Live local jobs are cached in a localConcurrentHashMap, completed jobs are served from storageOperationalJobManager: Separate job tracking from execution — the job is registered in the tracker (and persisted) before execution begins, ensuring durable trackers always have a record before the job runsDurableOperationalJobTrackerand persist-before-execute verification inOperationalJobManagerTestDurableOperationalJobTrackerwith Cassandra-backedStorageProviderFuture Work