Skip to content

feat(lance): support checkpointed append commits#16

Open
everySympathy wants to merge 1 commit into
daft-engine:mainfrom
everySympathy:daft-lance-checkpoint
Open

feat(lance): support checkpointed append commits#16
everySympathy wants to merge 1 commit into
daft-engine:mainfrom
everySympathy:daft-lance-checkpoint

Conversation

@everySympathy

@everySympathy everySympathy commented May 31, 2026

Copy link
Copy Markdown
Collaborator

Related issue: Eventual-Inc/Daft#6967

Summary

This PR adds checkpoint-aware append commit support for write_lance.

The goal is to make Lance append writes recoverable when Daft source checkpointing is enabled.

The key recovery problem is that, on retry, Daft may skip source rows that were already checkpointed. That means the retry run may not produce all current-run write_results again. For Lance, those write_results contain the fragment metadata that still needs to be appended to the Lance table.

This PR makes daft_lance recover staged write_results from the checkpoint store during finalize(), then commit the recovered fragments through a Lance append transaction.

Daft source checkpoint
        |
        v
worker writes Lance fragments
        |
        v
Daft core stages write_results
        |
        v
checkpoint store
        |
        v
daft_lance finalize()
        |
        v
Lance append transaction
        |
        v
mark checkpoint ids committed

Implementation

This PR adds:

  • A checkpoint parameter to LanceDataSink.
  • Append-only validation for checkpointed Lance writes. Currently only mode="append" is supported.
  • checkpoint_file_format(), so Daft core knows this DataSink wants its per-input write_results staged in the checkpoint store.
  • Ray worker pickling behavior for checkpointed Lance sinks:
    • workers keep the checkpoint-enabled signal;
    • the driver-side CheckpointStore is not sent to workers;
    • the checkpoint store is only used later by driver-side finalize().
  • Checkpointed finalize() recovery logic:
    • read pending checkpoint ids from the checkpoint store;
    • read staged Lance write_results from the checkpoint store;
    • extract Lance fragments from those write_results;
    • commit the fragments with lance.Transaction and LanceOperation.Append;
    • write the Daft idempotence key into Lance transaction properties;
    • call mark_committed() after the Lance commit succeeds.
  • Lance transaction history checks:
    • if Lance history already contains the same Daft idempotence key, the logical append has already landed;
    • the retry does not append the same fragments again;
    • pending checkpoint ids are still marked committed so recovery can finish cleanly.

The Lance-specific recovery logic intentionally lives in daft_lance. Daft core only stages opaque write_results; it does not need to understand Lance FragmentMetadata.

Scope

This PR currently supports append only.

Not supported yet:

  • mode="create"
  • mode="overwrite"
  • mode="merge"

Those modes have different recovery semantics and should be designed separately.

Testing

Unit tests cover:

  • checkpoint_file_format() opt-in for Lance checkpoint metadata staging.
  • Ray worker pickling without carrying the driver-side checkpoint store.
  • Idempotence-key lookup in Lance transaction history.
  • Append-only validation.
  • Decoding staged write_results from checkpoint metadata.
  • Committing pending checkpointed fragments.
  • Recovery when Lance commit succeeds but mark_committed() fails.
  • Empty-fragment checkpoint results.
  • Failure when a checkpointed Lance write is used without source checkpointing.

Ray end-to-end tests cover:

  • retrying append with the same idempotence key without duplicate rows;
  • requiring source checkpointing for checkpointed Lance writes;
  • recovery for the commit-before-mark_committed crash window;
  • recovery for the stage-before-commit crash window.

Manual validation:

  • Ran checkpointed read_parquet -> write_lance on a Ray cluster.
  • Used TOS/S3-compatible storage.
  • Retried with the same checkpoint store and idempotence key.
  • Verified the final Lance dataset row count.
  • Verified checkpoint entries moved to Committed.
  • Verified retry did not append Lance fragments twice.

@everySympathy everySympathy force-pushed the daft-lance-checkpoint branch from 3782bbf to 3c2f099 Compare June 7, 2026 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant