Skip to content

CASSSIDECAR-374: Implement durable operational job tracker#359

Open
andresbeckruiz wants to merge 2 commits into
apache:trunkfrom
andresbeckruiz:CASSSIDECAR-374
Open

CASSSIDECAR-374: Implement durable operational job tracker#359
andresbeckruiz wants to merge 2 commits into
apache:trunkfrom
andresbeckruiz:CASSSIDECAR-374

Conversation

@andresbeckruiz

@andresbeckruiz andresbeckruiz commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

CASSSIDECAR-374

Original PR made against CASSSIDECAR-373 branch with review comments: andresbeckruiz#3.

Changes

  • DurableOperationalJobTracker: Implementation of OperationalJobTracker that persists job state via StorageProvider. Live local jobs are cached in a local ConcurrentHashMap, completed jobs are served from storage
  • OperationalJobManager: Separate job tracking from execution — the job is registered in the tracker (and persisted) before execution begins, ensuring durable trackers always have a record before the job runs
  • Unit tests for DurableOperationalJobTracker and persist-before-execute verification in OperationalJobManagerTest
  • Integration test for DurableOperationalJobTracker with Cassandra-backed StorageProvider

Future Work

  • CASSSIDECAR-378: Add support for creation and coordination of local Sidecar Jobs

@pauloricardomg pauloricardomg left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the durable job tracker is never initialized, do we plan to do this in a follow-up PR ? Is the idea that the operator will choose whether to use the durable or the in memory job tracker ?

@Nullable
public UUID nodeId()
{
return null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we not include this field in the cluster ops schema?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why is this field needed, I don't see it being used anywhere.

@NotNull
public List<UUID> nodesExecuting()
{
return Collections.emptyList();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why we're not populating these nodes* fields from the cluster_ops_node_state table ?

* @param job the operational job to convert
* @return a new record capturing the job's current state
*/
public static OperationalJobRecord fromOperationalJob(OperationalJob job)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a job completes, the DurableOperationalJobTracker evicts it from the live map. After that, get(jobId) returns
an OperationalJobRecord from storage. Let's trace what data is available at each stage:

  While the job is live (e.g., NodeDecommissionJob):
  nodeId()        → UUID of the node being decommissioned
  startTime()     → when execution began
  nodesPending()  → [nodeId] initially, then [] after execution starts
  nodesExecuting()→ [nodeId] while running
  nodesSucceeded()→ [nodeId] after success
  nodesFailed()   → [nodeId] after failure

The factory that persists the job only captures 3 fields:

  // OperationalJobRecord.fromOperationalJob(job)
  return new OperationalJobRecord(job.jobId(), job.operationType(), job.status());
  //                               ^^^^^^^^     ^^^^^^^^^^^^^^^     ^^^^^^^^^^
  //                               that's it — nodeId, startTime, node lists are all discarded

After completion, get() returns the record with stubbed implementations:

  nodeId()         → null            (always)
  startTime()      → null            (constructor sets it to null)
  nodesPending()   → emptyList       (always)
  nodesExecuting() → emptyList       (always)
  nodesSucceeded() → emptyList       (always)
  nodesFailed()    → emptyList       (always)

So an API client observing a decommission job would see:

  DURING execution:                    AFTER completion (from storage):
  ─────────────────────────────────    ─────────────────────────────────
  {                                    {
    "jobId": "abc-123",                  "jobId": "abc-123",
    "operation": "decommission",         "operation": "DECOMMISSION",
    "jobStatus": "RUNNING",              "jobStatus": "SUCCEEDED",
    "startTime": "2026-06-12T...",       "startTime": null,
    "nodesExecuting": ["node-uuid"],     "nodesExecuting": [],
    "nodesPending": [],                  "nodesPending": [],
    "nodesSucceeded": [],                "nodesSucceeded": [],
    "nodesFailed": []                    "nodesFailed": []
  }                                    }

All the provenance — which node was decommissioned, when it started, whether it succeeded per-node — is lost once the job leaves the
local map. The record in Cassandra only has (jobId, operationType, status).

storageProvider.persistJob(OperationalJobRecord.fromOperationalJob(job));

job.asyncResult().onComplete(ar -> {
liveJobs.remove(job.jobId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

liveJobs removal should come after updateTerminalStatus to avoid RUNNING → CREATED → SUCCEEDED. when polling status between removal and update

return liveJobs.computeIfAbsent(jobId, id -> {
OperationalJob job = mappingFunction.apply(id);

storageProvider.persistJob(OperationalJobRecord.fromOperationalJob(job));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if !storageProvider.isAvailable ?

TaskExecutorPool executor)
{
this.liveJobs = new ConcurrentHashMap<>(serviceConfiguration.operationalJobTrackerSize());
this.storageProvider = storageProvider;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who is expected to initialize the storage provider ? can this class assume it's already initialized or do we want to initialize here?

StorageProvider storageProvider,
TaskExecutorPool executor)
{
this.liveJobs = new ConcurrentHashMap<>(serviceConfiguration.operationalJobTrackerSize());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike InMemoryOperationalJobTracker which has removeEldestEntry, the durable tracker has no eviction. If
asyncResult() never completes (e.g., executeBlocking fails to submit due to full pool), the entry stays in the map forever. Is this expected?

{
LOGGER.warn("Failed to update terminal status for job {} (attempt {}/{}). error={}",
job.jobId(), attempt, MAX_STATUS_UPDATE_ATTEMPTS, e.getMessage());
if (attempt < MAX_STATUS_UPDATE_ATTEMPTS)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This method is called from above method with attempt=1, if 1st attempt fails, we are doing only one more retry. Should we increase value of MAX_STATUS_UPDATE_ATTEMPTS ?
  • Also, if retries also failed to update, we will have stale entries in CREATED stated. We need a periodic thread running once in a while and removing stale entries (this can be deferred to later as not a blocker).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants