feat: support build FTS index distributedly#4578
Conversation
d26ed0f to
a58678a
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4578 +/- ##
==========================================
- Coverage 80.62% 80.59% -0.03%
==========================================
Files 317 317
Lines 118785 119097 +312
Branches 118785 119097 +312
==========================================
+ Hits 95769 95985 +216
- Misses 19568 19660 +92
- Partials 3448 3452 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
311e247 to
18fc5a8
Compare
a079fb8 to
d5b3790
Compare
80d542b to
b07d1a9
Compare
BubbleCal
left a comment
There was a problem hiding this comment.
This is a great work!
Just some questions
| location: location!(), | ||
| })?; | ||
|
|
||
| all_partitions.extend(partition_ids); |
There was a problem hiding this comment.
I think each indexing worker/fragment would have partition IDs starting from 0, we need to map each new partition id to all_partitions.len() + part_id?
There was a problem hiding this comment.
Thanks for the suggestion — that’s convincing. I’ll adjust the partition ID assignment logic and make the corresponding changes.
There was a problem hiding this comment.
I think each indexing worker/fragment would have partition IDs starting from 0, we need to map each new partition id to
all_partitions.len() + part_id?
Hi @BubbleCal , thanks for taking a look. I want to confirm that I’m interpreting the implementation correctly.
In the previous design, each partition ID was assigned using the mask fragment_id << 32 to ensure isolation across fragments and avoid conflicts. As a result, partition IDs are not sequential (i.e., not 0, 1, 2, 3, …).
Based on the suggestions, in the final stage, I reorder and rename all fragment partitions and then reassign partition IDs starting from 0. While this entails some rename operations, I’m not sure this is the best approach. I can’t assign sequential partition IDs 0,1,2... at fragment-index creation time because that work happens on different machines (e.g., Ray workers). Therefore, the only practical option I see is to sort the IDs and remap them in the last stage in a coordinator (e.g. Ray header)
I’ve made some changes to the function accordingly—please see the diff here:
I’d appreciate any feedback or alternative suggestions if you think there’s a better way to handle this.
There was a problem hiding this comment.
This makes sense to me, TBH I didn't realize that the partition IDs were encoded by frag_id << 32 + i, but I think the current reorder and re-assign design is still better, just in case we will probably build single index over multiple fragments (e.g. there are many small fragments?)
There was a problem hiding this comment.
@BubbleCal Yes, I completely agree with your idea. Using “reorder” and “reassign” is definitely clearer, since the generated FTS index is almost the same as the single-node index.
My only concern is that the renaming and overall code structure may not be very elegant, which could make the code harder for others to understand. To mitigate this, I’ve added inline comments for clarity. I’ll likely refactor the code structure in the near future.
BTW, kindly let me know if any other suggestions, or if I could get your approval to merge the PR.
| return ds | ||
|
|
||
|
|
||
| def test_distribute_fts_index_build(tmp_path): |
There was a problem hiding this comment.
it would be nice if we can add a helper function that builds the FTS index in distributed way, then we can add it into all existing FTS tests, to make sure the 2 ways will always get the same result
There was a problem hiding this comment.
Got it, thank you for your great suggestions. I will add them later.
There was a problem hiding this comment.
Done. Added helper function assert_distributed_fts_consistency and applied it to all existing FTS tests. Validated that distributed execution produces results consistent with single-node FTS index searches.
2d1d80f to
9bb5b42
Compare
6c0c885 to
67e2424
Compare
|
Rebased to resolve conflict. |
67e2424 to
56cbff1
Compare
BubbleCal
left a comment
There was a problem hiding this comment.
This work is amazing, thanks!
| "text", | ||
| "lance search text", | ||
| tmp_path, | ||
| index_params={"with_position": False}, |
There was a problem hiding this comment.
NIT, we've changed the default value to False for with_position, so no need to specify it here. we can remove this in the future PR
There was a problem hiding this comment.
Got it , thanks !
Close lance-format#4514 Related with lance-format/lance-ray#12 This PR introduces distributed fts index capabilities enabling parallel index creation across multiple fragments. ### New added methods - `execute_uncommitted()`: Creates index metadata without dataset commitment, returning `IndexMetadata` for distributed coordination. Chainable method specifying target fragment IDs for selective indexing as suggested by Will Jones in discussion lance-format#4514 (comment) ``` let partial_index=CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Inverted, ¶ms) .name("distributed_index".to_string()) .fragments(vec![fragment_id]) .fragment_uuid(shared_uuid.clone()) .execute_uncommitted() .await?; ``` - `merge_index_metadata()`: Merges distributed index metadata from multiple workers into consolidated final metadata ### Distributed Workflow 1. **Split and Parallel Phase**: Ray header distributes the fragments to different workers. (We may distribute the fragments evenly to different workers by fragment statistics, which will be implemented in lance-ray connector) Ray Workers call `execute_uncommitted()` on specific fragments using `fragments()` method. Shared UUID via `fragment_uuid()` ensures consistent index identity. 2. **Merge Phase**: `merge_index_metadata()` consolidates partition metadata files (`part_*_metadata.lance`) 3. **Commit Phase**: Final index commitment with unified metadata **Example** The workflow example can be found in `test_distribute_fts_index_build` in [test_scalar_index.py](https://github.com/lancedb/lance/pull/4578/files#diff-a95edaddaa3a260e498c04e10f073261bdc529cd4f47b928ad80274754af0548R1964-R2021). **Following work after this PR:** The distributed index building workflow PR will be proposed to lance-ray connector. lance-ray draft PR lance-format/lance-ray#45 **Other implementation details on fragment_mask** Optional mask with fragment_id in high 32 bits. When provided, only partitions whose partition id matches this fragment will be included. The fragment mask is constructed as `(fragment_id as u64) << 32`.
…dule (lance-format#4961) This PR is about to bring the distributed index creation functionality (see lance-format#4667, lance-format#4578) to java module, which is aligned with the python implementation. --------- Co-authored-by: 喆宇 <wxy407679@antgroup.com>
Close #4514
Related with lance-format/lance-ray#12
This PR introduces distributed fts index capabilities enabling parallel index creation across multiple fragments.
New added methods
execute_uncommitted(): Creates index metadata without dataset commitment, returningIndexMetadatafor distributed coordination. Chainable method specifying target fragment IDs for selective indexing as suggested by Will Jones in discussion feat: add create_fragment_inverted_index for distributed inverted index building #4514 (comment)merge_index_metadata(): Merges distributed index metadata from multiple workers into consolidated final metadataDistributed Workflow
Ray Workers call
execute_uncommitted()on specific fragments usingfragments()method. Shared UUID viafragment_uuid()ensures consistent index identity.merge_index_metadata()consolidates partition metadata files (part_*_metadata.lance)Example
The workflow example can be found in
test_distribute_fts_index_buildin test_scalar_index.py.Following work after this PR:
The distributed index building workflow PR will be proposed to lance-ray connector.
lance-ray draft PR lance-format/lance-ray#45
Other implementation details on fragment_mask
Optional mask with fragment_id in high 32 bits. When provided,
only partitions whose partition id matches this fragment will be included.
The fragment mask is constructed as
(fragment_id as u64) << 32.