Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion daft/io/lance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._lance import merge_columns
from ._lance import merge_columns, create_scalar_index

__all__ = [
"create_scalar_index",
"merge_columns",
]
121 changes: 121 additions & 0 deletions daft/io/lance/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,124 @@ def merge_columns(
daft_remote_args=daft_remote_args,
concurrency=concurrency,
)


@PublicAPI
def create_scalar_index(
url: str,
io_config: Optional[IOConfig] = None,
*,
column: str,
index_type: str = "INVERTED",
name: Optional[str] = None,
replace: bool = True,
storage_options: Optional[dict[str, Any]] = None,
daft_remote_args: Optional[dict[str, Any]] = None,
concurrency: Optional[int] = None,
version: Optional[Union[int, str]] = None,
asof: Optional[str] = None,
block_size: Optional[int] = None,
commit_lock: Optional[Any] = None,
index_cache_size: Optional[int] = None,
default_scan_options: Optional[dict[str, Any]] = None,
metadata_cache_size_bytes: Optional[int] = None,
**kwargs: Any,
) -> None:
"""Build a distributed full-text search index using Daft's distributed computing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only for fts? Can btree also use this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, only for fts. Bree will also use this interface in future


This function distributes the index building process across multiple Daft workers,
with each worker building indices for a subset of fragments. The indices are then
merged and committed as a single index.

Args:
url: URL to the LanceDB table (supports remote URLs to object stores such as `s3://` or `gs://`)
io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None.
column: Column name to index
index_type: Type of index to build ("INVERTED" or "FTS")
name: Name of the index (generated if None)
replace: Whether to replace an existing index with the same name. Defaults to True.
storage_options: Storage options for the dataset
daft_remote_args: Options for Daft remote execution (e.g., num_cpus, num_gpus, memory_bytes)
concurrency: Number of Daft workers to use
version: Version of the dataset to use
asof: Timestamp to use for time travel queries
block_size: Block size for the index
commit_lock: Commit lock for the dataset
index_cache_size: Size of the index cache
default_scan_options: Default scan options for the dataset
metadata_cache_size_bytes: Size of the metadata cache in bytes
**kwargs: Additional arguments to pass to create_scalar_index

Returns:
None

Raises:
ValueError: If input parameters are invalid
TypeError: If column type is not string
RuntimeError: If index building fails
ImportError: If lance package is not available

Note:
This function requires the use of [LanceDB](https://lancedb.github.io/lancedb/), which is the Python library for the LanceDB project.
To ensure that this is installed with Daft, you may install: `pip install daft[lance]`

Examples:
Create a distributed inverted index:
>>> import daft
>>> daft.io.lance.create_scalar_index(
... "s3://my-bucket/dataset/", column="text_content", index_type="INVERTED", concurrency=8
... )

Create an index with custom Daft remote arguments:
>>> daft.io.lance.create_scalar_index(
... "s3://my-bucket/dataset/",
... column="description",
... daft_remote_args={"num_cpus": 2},
... )
"""
try:
import lance
from packaging import version as packaging_version

from daft.io.lance.lance_scalar_index import create_scalar_index_internal

lance_version = packaging_version.parse(lance.__version__)
min_required_version = packaging_version.parse("0.37.0")
if lance_version < min_required_version:
raise RuntimeError(
f"Distributed indexing requires pylance >= 0.37.0, but found {lance.__version__}. "
"The distribute-related interfaces are not available in older versions. "
"Please upgrade lance by running: pip install --upgrade pylance"
)
except ImportError as e:
raise ImportError(
"Unable to import the `lance` package, please ensure that Daft is installed with the lance extra dependency: `pip install daft[lance]`"
) from e

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_options = storage_options or io_config_to_storage_options(io_config, url)

lance_ds = lance.dataset(
url,
storage_options=storage_options,
version=version,
asof=asof,
block_size=block_size,
commit_lock=commit_lock,
index_cache_size=index_cache_size,
default_scan_options=default_scan_options,
metadata_cache_size_bytes=metadata_cache_size_bytes,
)

create_scalar_index_internal(
lance_ds=lance_ds,
uri=url,
column=column,
index_type=index_type,
name=name,
replace=replace,
storage_options=storage_options,
daft_remote_args=daft_remote_args,
concurrency=concurrency,
**kwargs,
)
247 changes: 247 additions & 0 deletions daft/io/lance/lance_scalar_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
from __future__ import annotations

import logging
import uuid
from typing import Any

import lance

from daft import DataType, from_pylist
from daft.dependencies import pa
from daft.io.lance.utils import distribute_fragments_balanced
from daft.udf import udf
from daft.udf.legacy import _UnsetMarker

logger = logging.getLogger(__name__)


@udf(
return_dtype=DataType.struct(
{
"status": DataType.string(),
"fragment_ids": DataType.list(DataType.int32()),
"fields": DataType.list(DataType.int32()),
"uuid": DataType.string(),
"error": DataType.string(),
}
)
)
class FragmentIndexHandler:
"""UDF handler for distributed fragment index creation."""

def __init__(
self,
lance_ds: lance.LanceDataset,
column: str,
index_type: str,
name: str,
fragment_uuid: str,
replace: bool,
**kwargs: Any,
) -> None:
self.lance_ds = lance_ds
self.column = column
self.index_type = index_type
self.name = name
self.fragment_uuid = fragment_uuid
self.replace = replace
self.kwargs = kwargs

def __call__(self, fragment_ids_batch: list[list[int]]) -> list[dict[str, Any]]:
"""Process a batch of fragment IDs for index creation."""
results: list[dict[str, Any]] = []
for fragment_ids in fragment_ids_batch:
try:
results.append(self._handle_fragment_index(fragment_ids))
except Exception as e:
logger.exception("Error creating fragment index for fragment_ids %s: %s", fragment_ids, e)
results.append({"status": "error", "fragment_ids": fragment_ids, "error": str(e)})
return results

def _handle_fragment_index(self, fragment_ids: list[int]) -> dict[str, Any]:
"""Handle index creation for a single fragment."""
try:
logger.info("Building distributed index for fragments %s using create_scalar_index", fragment_ids)

self.lance_ds.create_scalar_index(
column=self.column,
index_type=self.index_type,
name=self.name,
replace=self.replace,
fragment_uuid=self.fragment_uuid,
fragment_ids=fragment_ids,
**self.kwargs,
)

field_id = self.lance_ds.schema.get_field_index(self.column)
logger.info("Fragment index created successfully for fragments %s", fragment_ids)
return {
"status": "success",
"fragment_ids": fragment_ids,
"fields": [field_id],
"uuid": self.fragment_uuid,
}
except Exception as e:
logger.error("Fragment index task failed for fragments %s: %s", fragment_ids, str(e))
return {
"status": "error",
"fragment_ids": fragment_ids,
"error": str(e),
}


def create_scalar_index_internal(
lance_ds: lance.LanceDataset,
uri: str,
*,
column: str,
index_type: str = "INVERTED",
name: str | None = None,
replace: bool = True,
storage_options: dict[str, str] | None = None,
daft_remote_args: dict[str, Any] | None = None,
concurrency: int | None = None,
**kwargs: Any,
) -> None:
"""Internal implementation of distributed FTS index creation using Daft UDFs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTS or btree ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only for FTS


This function implements the 3-phase distributed indexing workflow:
Phase 1: Fragment parallel processing using Daft UDFs
Phase 2: Index metadata merging
Phase 3: Atomic index creation and commit
"""
if not column:
raise ValueError("Column name cannot be empty")

# Handle index_type validation
if index_type not in ["INVERTED", "FTS"]:
raise ValueError(
f"Distributed indexing currently only supports 'INVERTED' and 'FTS' index types, not '{index_type}'"
)

# Validate column exists and has correct type
try:
field = lance_ds.schema.field(column)
except KeyError as e:
available_columns = [field.name for field in lance_ds.schema]
raise ValueError(f"Column '{column}' not found. Available: {available_columns}") from e

# Check column type
value_type = field.type
if pa.types.is_list(field.type) or pa.types.is_large_list(field.type):
value_type = field.type.value_type

if not pa.types.is_string(value_type) and not pa.types.is_large_string(value_type):
raise TypeError(f"Column {column} must be string type, got {value_type}")

# Generate index name if not provided
if name is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lance has a default name construction logic. Do we need to create it here? If we do create it, should we include the index type? Otherwise, will there be conflicts when the same column has different index types?

Copy link
Contributor Author

@huleilei huleilei Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it to f"{column}_{index_type.lower()}_idx". you can see lancedb/lance-ray#45,
img_v3_02qj_325d5528-ad06-4eb2-8cd5-9f876ef3418g
In this PR, column&&&index type&&&dataset were initially used to define index names. But classmates in the Lance community suggest using {column_name}idx.
Considering both, I suggest using f“ {column}
{index_type.lower()}_idx".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

name = f"{column}_{index_type.lower()}_idx"

# Handle replace parameter - check for existing index with same name
if not replace:
try:
existing_indices = lance_ds.list_indices()
existing_names = {idx["name"] for idx in existing_indices}
if name in existing_names:
raise ValueError(f"Index with name '{name}' already exists. Set replace=True to replace it.")
except Exception:
# If we can't check existing indices, continue
pass

# Get available fragment IDs to use
fragments = lance_ds.get_fragments()
fragment_ids_to_use = [fragment.fragment_id for fragment in fragments]

# Adjust concurrency based on fragment count
if concurrency is None:
concurrency = 4

if concurrency <= 0:
raise ValueError(f"concurrency must be positive, got {concurrency}")

if concurrency > len(fragment_ids_to_use):
concurrency = len(fragment_ids_to_use)
logger.info("Adjusted concurrency to %d to match fragment count", concurrency)

# Generate unique index ID
index_id = str(uuid.uuid4())

logger.info(
"Starting distributed FTS index creation: column=%s, type=%s, name=%s, concurrency=%s",
column,
index_type,
name,
concurrency,
)

logger.info("Starting fragment parallel processing. And create DataFrame with fragment batches")
fragment_data = distribute_fragments_balanced(fragments, concurrency)
df = from_pylist(fragment_data)

daft_remote_args = daft_remote_args or {}
num_cpus = daft_remote_args.get("num_cpus", _UnsetMarker)
num_gpus = daft_remote_args.get("num_gpus", _UnsetMarker)
memory_bytes = daft_remote_args.get("memory_bytes", _UnsetMarker)
batch_size = daft_remote_args.get("batch_size", _UnsetMarker)

handler_fragment_udf = (
FragmentIndexHandler.with_init_args( # type: ignore[attr-defined]
lance_ds=lance_ds,
column=column,
index_type=index_type,
name=name,
fragment_uuid=index_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between fragment_uuid and index_id? Why isn't index_id used here?

Copy link
Contributor Author

@huleilei huleilei Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use uuid.uuid4() to define index_id. Now index_id and fragment_uuid is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the lance, use different names at different stages. When creating an index, use fragment_uuid, and when submitting, use index.id

replace=replace,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that I can't see where "replace" is taking effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass the replace parameter to the lance API

**kwargs,
)
.override_options(num_cpus=num_cpus, num_gpus=num_gpus, memory_bytes=memory_bytes, batch_size=batch_size)
.with_concurrency(concurrency)
)
df = df.with_column("index_result", handler_fragment_udf(df["fragment_ids"]))

results = df.to_pandas()["index_result"]

# Check for failures
failed_results = [r for r in results if r["status"] == "error"]
if failed_results:
error_messages = [r["error"] for r in failed_results]
raise RuntimeError(
f"Index building failed on {len(failed_results)} fragment batches: {'; '.join(error_messages)}"
)

successful_results = [r for r in results if r["status"] == "success"]
if not successful_results:
raise RuntimeError("No successful index building results")

logger.info("Starting index metadata merging by reloading dataset to get latest state")
lance_ds = lance.LanceDataset(uri, storage_options=storage_options)

lance_ds.merge_index_metadata(index_id, index_type)

logger.info("Starting atomic index creation and commit")
fields = successful_results[0]["fields"]
index = lance.Index(
uuid=index_id,
name=name,
fields=fields,
dataset_version=lance_ds.version,
fragment_ids=set(fragment_ids_to_use),
index_version=0,
)

# Create and commit the index operation
create_index_op = lance.LanceOperation.CreateIndex(
new_indices=[index],
removed_indices=[],
)

# Commit the index operation atomically
lance.LanceDataset.commit(
uri,
create_index_op,
read_version=lance_ds.version,
storage_options=storage_options,
)

logger.info("Index %s created successfully with ID %s", name, index_id)
Loading
Loading