feat: add plan/execute separation to FilteredReadExec#5843
feat: add plan/execute separation to FilteredReadExec#5843LuQQiu merged 4 commits intolance-format:mainfrom
Conversation
|
@hamersaw FYI |
PR Review: feat: add plan/execute separation to FilteredReadExecSummaryThis PR introduces a clean separation between planning and execution phases in Critical Issues (P0/P1)P1: Missing test coverage for The new test
The existing complex test cases (e.g., P1: In let (internal_plan, _loaded_fragments) = plan_scan(
self.dataset.clone(),
self.options.clone(),
index_input.clone(),
0, // hardcoded partition
ctx,
)
.await?;This may be intentional since planning should be partition-independent, but it warrants a doc comment explaining this design decision, especially since the index execution uses the partition parameter. Minor Observations (not blocking)
VerdictThe core implementation is sound. Addressing the P1 test coverage gap before merge would strengthen confidence in the new ExecuteOnly path. |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
2244a3b to
a4fb82e
Compare
|
Will have FilteredReadExec serd/deserd in another follow up PRs. |
|
|
||
| /// Filter to apply per fragment | ||
| /// fragments not here don't need filtering | ||
| pub filters: HashMap<u32, Arc<Expr>>, |
There was a problem hiding this comment.
Currently for Expr, we serd and deserialize with subtrait which requires ArrowSchema for serd/deser. This prevents the FilteredReadPlan from being serd indepdently.
Another choice is to have lance depends on datafusion-proto (may be big ... ) and have new way to serialize Expr using proto approach.
There was a problem hiding this comment.
datafusion-proto:
use datafusion_proto::bytes::Serializeable;
// Encode - NO schema
let bytes = expr.to_bytes()?;
// Decode - NO schema (just needs function
registry)
let expr = Expr::from_bytes_with_registry(&bytes,
registry)?;
datafusion-substrait (what Lance uses):
// Encode - NEEDS schema
let bytes = encode_substrait(expr, schema,
state)?;
// Decode - NEEDS schema
let expr = parse_substrait(&bytes, schema,
state).await?;
Substrait is a cross-system standard (works with PyArrow, DuckDB, etc.). But datafusion-proto is simpler for Rust-to-Rust communication.
We will maintain Expr subsrait here. The problem is FilteredReadPlan cannot be serd independently. it needs schema info. we can include schema info in the plan but then it's way too much info.
There was a problem hiding this comment.
Serialization and deserialization will be in following up PR because it's not very straightforward
hamersaw
left a comment
There was a problem hiding this comment.
Thanks! I'll start building on top of this and merge in any changes (if necessary).
| #[derive(Clone, Debug)] | ||
| pub enum FilteredReadExecMode { | ||
| PlanAndExecute { | ||
| index_input: Option<Arc<dyn ExecutionPlan>>, | ||
| }, | ||
| ExecuteOnly { | ||
| plan: FilteredReadPlan, | ||
| }, | ||
| } |
There was a problem hiding this comment.
If I understand correctly the reason to have this enum is to modify how the FilteredReadExpr executes to either (1) generate a plan and use that or (2) use an existing FilteredReadPlan (that's provide on creation). I'm finding this makes it a little difficult to parse the call stack. Could we get the same functionality by having a publicly exposed function (ex. get_or_create_plan(index_input: Option<Arc<dyn ExecutionPlan>>)) that either returns the existing plan (stored as Option<FilteredReadPlan> within the FilteredReadExec) or generates a new one and populates that field before returning? We could call this inline in the execute function just to make sure a plan is compiled before execution -- if one was provided on creation (ie. with_plan(...) or deserialization) then it would just be a noop.
There was a problem hiding this comment.
Good call.
I will remodel to this approach (store as parameter in Exec, and get_or_create_plan)
And the detailed plan ranges can also convert from ranges to roaring bitmap calculation, it may be even easier this way.
- Add FilteredReadPlan struct using RowAddrTreeMap for row selection - Add get_or_create_plan API for lazy plan computation via OnceCell - Support providing pre-computed plan to FilteredReadExec::try_new - Centralize plan creation in get_or_create_plan_impl - Make RowAddrSelection public in lance-core
|
@hamersaw @wjones127 @westonpace PTAL, thanks |
westonpace
left a comment
There was a problem hiding this comment.
I think my only concern is that the conversion to/from ranges and bitmap might be expensive in some cases (e.g. filters that match most rows) but we can see how it does in benchmarks.
| pub filters: HashMap<u32, Arc<Expr>>, | ||
| /// Scan range after filter may be applied during planning phase based on index result | ||
| /// This is leftover range to apply during execution phase | ||
| pub scan_range_after_filter: Option<Range<u64>>, |
There was a problem hiding this comment.
Is this a logical range / does it include deletions (I should probably know)? It would be good to document that.
There was a problem hiding this comment.
This is a logical range after filter, similar to offset limit, yep, i will modify the document
in the plan_scan, we deal with scan_range_before_filter & deletion vector at the beginning.
| if let Some(to_read) = fragments_to_read.get(&fragment_id) { | ||
| if !to_read.is_empty() { | ||
| // Convert ranges to bitmap | ||
| let bitmap = Self::ranges_to_bitmap(to_read); |
There was a problem hiding this comment.
Why do we have to go from ranges to bitmap here and then back to ranges later (I guess this is the same question as the above TODO)?
There was a problem hiding this comment.
For external distributed execution, i think returning RowAddrTreeMap (bitmap-based) is more useful because it's easier to split/merge/serialize, and support arbitrary row selection patterns.
For internal local execution, the underlying reader uses read_range which expects Vec<Range>, so ranges is better internally.
The current implementation prioritizes external API flexibility. For the internal path, we could potentially change planning to use bitmap since we're dealing with index results anyway.
There was a problem hiding this comment.
For external distributed execution
If this is the only case I would say lets keep things as lightweight as possible internally (work only in ranges) and we can do the conversion to bitmap in plan_splits if we decide it's useful. Thoughts?
| } | ||
|
|
||
| /// A fragment with all of its metadata loaded | ||
| #[derive(Debug, Clone)] |
There was a problem hiding this comment.
Maybe we should change fragment to Arc<FileFragment> if we are going to clone this? It has the protobuf metadata which might not be trivial to clone.
|
@bench-bot benchmark |
Benchmark Results for PR #5843Commit: Summary
Flagged Benchmarks (|z-score| > 2.0)
All ResultsView all 131 benchmark results
Generated by bench-bot 🤖 |
- Add FilteredReadInternalPlan (private) using BTreeMap<u32, Vec<Range<u64>>> for efficient local execution without bitmap conversion - Keep FilteredReadPlan (public) using RowAddrTreeMap for distributed execution - Local path: plan_scan() → internal plan → ScopedFragmentRead (zero conversions) - External API: get_or_create_plan() converts internal → external once - with_plan() converts external → internal for distributed workers - Add bitmap_to_ranges() utility in lance-core for efficient bitmap conversion - Use BTreeMap for rows to maintain deterministic fragment order 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
|
For local execution, no conversion of bitmap & vec, for remote execution, when fetch plan and when provide plan we will each do a conversion. Compare to other overhead of remote execution, this extra bitmap - vec conversion is neglectable |
) Split the FilteredReadExec into plan phrase and execute phrase. The planned result ( RowAddrTreeMap) can be further split with distributed execution engine's distribute logics. FilteredReadExec can take the planned result, skip the planning phrase and directly read (+ optional predicate) --------- Co-authored-by: Claude <noreply@anthropic.com>
Split the FilteredReadExec into plan phrase and execute phrase.
The planned result ( RowAddrTreeMap) can be further split with distributed execution engine's distribute logics.
FilteredReadExec can take the planned result, skip the planning phrase and directly read (+ optional predicate)