Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 98 additions & 36 deletions daft_lance/lance_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

logger = logging.getLogger(__name__)

SEGMENTED_INDEX_TYPES = {"BTREE", "INVERTED"}
MERGED_SEGMENTED_INDEX_TYPES = {"INVERTED"}


class FragmentIndexHandler:
"""Handler for distributed scalar index creation on fragment batches."""
Expand Down Expand Up @@ -64,10 +67,11 @@ class SegmentedFragmentIndexHandler:

Unlike ``FragmentIndexHandler``, which writes partial index files sharing
a single UUID, this handler builds a fully independent index segment per
worker via the low-level ``_ds.create_index`` binding. The returned
``lance.Index`` metadata (including ``index_details``) is serialised
(pickled) so it can cross Daft process/serialisation boundaries. The
coordinator then commits all segments atomically with
worker via Lance's public ``create_index_uncommitted`` API when available,
with a compatibility fallback for older Lance releases that expose the
method but only support vector columns. The returned ``lance.Index``
metadata is serialised (pickled) so it can cross Daft process/serialisation
boundaries. The coordinator then commits all segments atomically with
``commit_existing_index_segments``.
"""

Expand Down Expand Up @@ -96,21 +100,71 @@ def __call__(self, fragment_ids: list[int]) -> bytes:
self.index_type,
)

# _ds.create_index returns a lance.Index dataclass when fragment_ids
# is provided (uncommitted segment mode).
index_meta: lance.Index = self.lance_ds._ds.create_index( # type: ignore[call-arg]
[self.column],
self.index_type,
index_meta = _create_index_segment(
lance_ds=self.lance_ds,
column=self.column,
index_type=self.index_type,
name=self.name,
replace=self.replace,
train=True,
storage_options=None,
kwargs={"fragment_ids": fragment_ids, **self.kwargs},
fragment_ids=fragment_ids,
**self.kwargs,
)

return pickle.dumps(index_meta)


def _create_index_segment(
lance_ds: lance.LanceDataset,
*,
column: str,
index_type: str,
name: str,
replace: bool,
fragment_ids: list[int],
**kwargs: Any,
) -> lance.Index:
"""Create one uncommitted index segment.

Prefer Lance's public ``create_index_uncommitted`` API. ``pylance 7.0.0``
exposes that method but only accepts vector columns, so scalar indexes need
the existing low-level fallback until the scalar public API is available in
released wheels.
"""
try:
return lance_ds.create_index_uncommitted(
column=column,
index_type=index_type,
name=name,
replace=replace,
train=True,
fragment_ids=fragment_ids,
**kwargs,
)
except TypeError as exc:
message = str(exc)
if "Vector column" not in message or "FixedSizeListArray" not in message:
raise

logger.info(
"Falling back to Lance's low-level uncommitted index segment API for %s; public scalar segment API is unavailable in this pylance version.",
index_type,
)

raw_dataset = cast(Any, lance_ds._ds)
return cast(
lance.Index,
raw_dataset.create_index(
[column],
index_type,
name=name,
replace=replace,
train=True,
storage_options=None,
kwargs={"fragment_ids": fragment_ids, **kwargs},
),
)


def create_scalar_index_internal(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
Expand All @@ -128,17 +182,12 @@ def create_scalar_index_internal(
) -> None:
"""Internal implementation of distributed scalar index creation.

INVERTED and BTREE use a 3-phase distributed workflow (fragment-parallel build,
merge_index_metadata, then commit). ``FTS`` is normalized to ``INVERTED`` (same Lance
index); see Lance Rust/Python bindings: ``INVERTED`` and ``FTS`` map to the same
inverted full-text index type.

When ``segmented=True`` and ``index_type`` is ``BTREE``, a cleaner segmented workflow
is used instead: each worker builds a fully independent index segment via the low-level
``_ds.create_index`` binding (which returns ``lance.Index`` metadata including
``index_details``), and the coordinator commits them atomically with
``commit_existing_index_segments``. This resolves a known issue where ``index_details``
was left empty in the legacy path, preventing ``describe_indices()`` from working.
``BTREE`` and ``INVERTED`` use Lance's public segment-index workflow: each
worker builds a fully independent index segment, and the coordinator commits
them atomically with ``commit_existing_index_segments``. ``FTS`` is
normalized to ``INVERTED`` (same Lance index); see Lance Rust/Python
bindings: ``INVERTED`` and ``FTS`` map to the same inverted full-text index
type.
"""
if not column:
raise ValueError("Column name cannot be empty")
Expand Down Expand Up @@ -237,10 +286,10 @@ def create_scalar_index_internal(
segmented,
)

# Choose between the segmented workflow and the legacy partitioned-and-merged
# workflow. Segmented mode produces proper ``index_details`` so
# ``describe_indices()`` works correctly.
if segmented and index_type == "BTREE":
# Use segment-index creation for Lance scalar index types that expose the
# public uncommitted segment API. The legacy path is kept as a fallback for
# older/unsupported distributed scalar index types.
if segmented or index_type in SEGMENTED_INDEX_TYPES:
_create_segmented_index(
lance_ds=lance_ds,
uri=uri,
Expand Down Expand Up @@ -289,11 +338,10 @@ def _create_segmented_index(
) -> None:
"""Segmented index workflow: each worker builds an independent segment.

Workers call the low-level ``_ds.create_index`` binding (which returns
``lance.Index`` metadata with ``index_details`` populated), pickle the
result so it can traverse Daft serialisation boundaries, and return it.
The coordinator unpickles all segments and commits them atomically via
``commit_existing_index_segments``.
Workers call Lance's uncommitted index segment API, pickle the returned
``lance.Index`` metadata so it can traverse Daft serialisation boundaries,
and return it. The coordinator unpickles all segments and commits them
atomically via ``commit_existing_index_segments``.
"""
handler_cls = daft.cls(
SegmentedFragmentIndexHandler,
Expand Down Expand Up @@ -322,20 +370,34 @@ def _create_segmented_index(
pickle.loads(raw) for raw in collected.to_pydict()["index_meta"]
]

# Reload dataset to pick up the latest version (segment files were written
# by workers against the version that was current at their invocation time).
lance_ds = lance.LanceDataset(uri, storage_options=storage_options)
index_metas = _prepare_index_segments_for_commit(lance_ds, index_type, index_metas)

logger.info(
"Collected %d index segments; committing as segmented index %s",
len(index_metas),
name,
)

# Reload dataset to pick up the latest version (segment files were written
# by workers against the version that was current at their invocation time).
lance_ds = lance.LanceDataset(uri, storage_options=storage_options)
lance_ds.commit_existing_index_segments(name, column, index_metas)

logger.info("Segmented index %s committed successfully", name)


def _prepare_index_segments_for_commit(
lance_ds: lance.LanceDataset,
index_type: str,
index_metas: list[lance.Index | lance.indices.IndexSegment],
) -> list[lance.Index | lance.indices.IndexSegment]:
"""Prepare worker-built segments for the final manifest commit."""
if index_type not in MERGED_SEGMENTED_INDEX_TYPES or len(index_metas) <= 1:
return index_metas

merged = lance_ds.merge_existing_index_segments([cast(lance.Index, segment) for segment in index_metas])
return [merged]


def _create_partitioned_index(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
name = "daft-lance"
version = "0.4.0"
description = "Lance integration for Daft - compaction, indexing, merge columns, and REST operations"
requires-python = ">=3.10"
requires-python = ">=3.10,<3.14"
license = "Apache-2.0"
readme = "README.md"
dependencies = [
"lance-namespace>=0.6.0",
"lance-namespace-urllib3-client>=0.6.0",
"pylance>=7.0.0"
"pylance>=8.0.0b11"
]

[dependency-groups]
Expand All @@ -34,6 +34,10 @@ ignore_missing_imports = true
warn_return_any = true
warn_unused_configs = true

[tool.uv]
extra-index-url = ["https://pypi.fury.io/lance-format"]
index-strategy = "unsafe-best-match"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Loading
Loading