feat: support create btree index distributedly#4667
feat: support create btree index distributedly#4667westonpace merged 20 commits intolance-format:mainfrom
Conversation
2cdc2b1 to
b634aa8
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4667 +/- ##
========================================
Coverage 80.72% 80.73%
========================================
Files 321 321
Lines 124043 124847 +804
Branches 124043 124847 +804
========================================
+ Hits 100131 100792 +661
- Misses 20340 20457 +117
- Partials 3572 3598 +26
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@jackye1995 @westonpace @BubbleCal @chenghao-guo Please take a look when you have time, thx! |
| ], | ||
| prefetch_batch: Optional[int] = None, | ||
| ): | ||
| """ |
There was a problem hiding this comment.
Thank you for the refactor. My previous design wasn’t thoroughly considered; yours is a significant improvement
f09c92b to
d39785d
Compare
jackye1995
left a comment
There was a problem hiding this comment.
this is amazing feature! I added some initial comments, will look more into the details tomorrow
d39785d to
550e37a
Compare
|
@jackye1995 Thanks for your review, address all comments. |
42022fb to
966bceb
Compare
5c3c211 to
00b5f59
Compare
d3c3e6d to
c7353ce
Compare
c7353ce to
eaaac6b
Compare
westonpace
left a comment
There was a problem hiding this comment.
This is great. I love the overall approach of adding a train method that takes in some fragments and creates a partial index and then a merge method that will complete all the fragments.
I think we can simplify the prefetch and just use the file reader's prefetch (I have some comments here).
Also, does this merge the batches themselves? For example, if I sort fragment 1 and I get a batch with values 10-50 and then I sort fragment 2 and I get a batch with values 30-80 is there some code that is merging these two batches (maybe the output is a batch with 10-50 and 50-80)? I might be missing it but maybe it is just emitting 10-50 and then 30-80 in sequence?
Also, if we want to simplify some of the merge code, Datafusion has a SortPreservingMerge operator that can do this for us too.
I might not fully understand what's going on here. Let me explain the logic using these two fragments:
|
Ah, I see where I was confused. I thought the partition iterator was yielding batches. Instead it is yielding rows. So the heap is built one row at a time and batches are merged that way. You can ignore that particular comment. |
|
@jackye1995 @westonpace @BubbleCal Hi, I've refactored the code based on your feedback. Please review it again when you have time, thanks! I've also noticed that using Datafusion's SortPreservingMerge has slowed down index creation compared to my previous version(the speed can be improved by adjusting the prefetch nums and the number of sub-indexes), but it's still significantly faster than the current single-node creation. Furthermore, the code has been significantly simplified. I think we can further optimize performance in future PRs. |
|
@jackye1995 @westonpace @BubbleCal Gentle pin for this |
westonpace
left a comment
There was a problem hiding this comment.
Thanks for working through the reviews, this looks good to me now.
Add support for distributed BTREE index building in ray connector based on @xloya's great work in lance-format/lance#4667
Closed lance-format#4665. Overall Steps: 1. Create ordered Btree sub-page files / sub-lookup files at the fragment level based on Ray / Daft. 2. Sort and merge the sub-page files using a k-way merge sort algorithm, supporting prefetch data of sub-page files. 3. Output the final lookup file. 4. Commit the final index to dataset. Production Test Results: In a production scenario, using Ray and 50 workers on a string ID field in a dataset of 700 million records, we achieved the following: 1. Btree index build time was reduced from 190 minutes to 19 minutes, a 10x increase in build speed. 2. Peak memory usage on the Ray head node when creating the index was reduced from 90+ GB to 4+ GB, a 95%+ reduction. --------- Co-authored-by: xloya <xiaojiebao@apache.org>
…dule (lance-format#4961) This PR is about to bring the distributed index creation functionality (see lance-format#4667, lance-format#4578) to java module, which is aligned with the python implementation. --------- Co-authored-by: 喆宇 <wxy407679@antgroup.com>
Closed #4665.
Overall Steps:
Production Test Results:
In a production scenario, using Ray and 50 workers on a string ID field in a dataset of 700 million records, we achieved the following: