-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Add DELETE/UPDATE hooks to TableProvider trait and to MemTable implementation #19142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add DELETE/UPDATE hooks to TableProvider trait and to MemTable implementation #19142
Conversation
Add infrastructure for row-level DML operations (DELETE/UPDATE) to the TableProvider trait, enabling storage engines to implement SQL-based mutations. Changes: - Add `DmlCapabilities` struct to declare DELETE/UPDATE support - Add `TableProvider::dml_capabilities()` method (defaults to NONE) - Add `TableProvider::delete_from()` method for DELETE operations - Add `TableProvider::update()` method for UPDATE operations - Wire physical planner to route DML operations to TableProvider - Add helper functions for extracting filters and assignments This provides the API surface for downstream projects (iceberg-rust, delta-rs) to implement DML without custom query planners. A reference MemTable implementation follows in a subsequent PR.
Clippy warns about assertions on constants being optimized away. Changed to assert_eq! for explicit value comparison.
Add complete DELETE and UPDATE support to MemTable as a reference implementation for the TableProvider DML hooks. Implementation: - `delete_from()`: Evaluate filter predicates, remove matching rows - `update()`: Evaluate filters, apply SET expressions to matching rows - `DmlResultExec`: ExecutionPlan returning affected row count - Correct SQL NULL semantics (NULL predicates preserve rows) Tests (812 lines): - DELETE: basic, all rows, predicates (=, >, AND, OR, LIKE, BETWEEN, IN, NOT IN, IS NULL, IS NOT NULL), NULL handling, functions, types - UPDATE: basic, SET expressions, multi-column, CASE, functions, column references, NULL handling, error cases All sqllogictest tests pass: cargo test --test sqllogictests -- dml_delete dml_update
Now that MemTable supports DELETE and UPDATE, the existing EXPLAIN tests produce physical plans instead of "not implemented" errors. Updated expected output to reflect the new DmlResultExec execution.
7505171 to
695716c
Compare
Rebased on latest main. Fixed clippy warnings in tests and updated delete.slt/update.slt expected output to reflect new DmlResultExec physical plans. All tests pass locally. Could you please re-trigger the CI? Thanks! |
Feat/tableprovider dml hooks
Feat/memtable dml impl
Add infrastructure for row-level DML operations (DELETE/UPDATE) to the TableProvider trait, enabling storage engines to implement SQL-based mutations. Changes: - Add `DmlCapabilities` struct to declare DELETE/UPDATE support - Add `TableProvider::dml_capabilities()` method (defaults to NONE) - Add `TableProvider::delete_from()` method for DELETE operations - Add `TableProvider::update()` method for UPDATE operations - Wire physical planner to route DML operations to TableProvider - Add helper functions for extracting filters and assignments This provides the API surface for downstream projects (iceberg-rust, delta-rs) to implement DML without custom query planners. A reference MemTable implementation follows in a subsequent PR.
Clippy warns about assertions on constants being optimized away. Changed to assert_eq! for explicit value comparison.
- Change assert_eq!(x, true/false) to assert!(x)/assert!(!x) per clippy - Update delete.slt and update.slt expected error messages to match new physical planner behavior for unsupported DML operations
22c7273 to
b038d04
Compare
Add complete DELETE and UPDATE support to MemTable as a reference implementation for the TableProvider DML hooks. Implementation: - `delete_from()`: Evaluate filter predicates, remove matching rows - `update()`: Evaluate filters, apply SET expressions to matching rows - `DmlResultExec`: ExecutionPlan returning affected row count - Correct SQL NULL semantics (NULL predicates preserve rows) Tests (812 lines): - DELETE: basic, all rows, predicates (=, >, AND, OR, LIKE, BETWEEN, IN, NOT IN, IS NULL, IS NOT NULL), NULL handling, functions, types - UPDATE: basic, SET expressions, multi-column, CASE, functions, column references, NULL handling, error cases All sqllogictest tests pass: cargo test --test sqllogictests -- dml_delete dml_update
Now that MemTable supports DELETE and UPDATE, the existing EXPLAIN tests produce physical plans instead of "not implemented" errors. Updated expected output to reflect the new DmlResultExec execution.
Rebased on main. Previous CI failures were due to clippy bool_assert_comparison warnings and mismatched expected error messages in test files. Requesting a CI trigger to confirm I've successfully stopped embarrassing myself. 🙏 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR @ethan-tyler 🙏
I have a few suggestions for API and then I think this PR needs some tests for the new planning code
Otherwise this is looking very nice 👌
| }) => { | ||
| if let Some(provider) = | ||
| target.as_any().downcast_ref::<DefaultTableSource>() | ||
| { | ||
| let capabilities = provider.table_provider.dml_capabilities(); | ||
| if !capabilities.delete { | ||
| return plan_err!( | ||
| "Table '{}' does not support DELETE operations", | ||
| table_name | ||
| ); | ||
| } | ||
| let filters = extract_dml_filters(input)?; | ||
| provider | ||
| .table_provider | ||
| .delete_from(session_state, filters) | ||
| .await? | ||
| } else { | ||
| return exec_err!( | ||
| "Table source can't be downcasted to DefaultTableSource" | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better (smaller API surface) would be to simply try and call provider.delete_from(session_state, filters) and pass along the context
Then you could remove DmlCapabilities entirely
Something like:
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op: WriteOp::Delete,
input,
..
}) => {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
let filters = extract_dml_filters(input)?;
provider
.table_provider
.delete_from(session_state, filters)
.await
.context(format!(
"DELETE operation on table '{}'",
table_name
))?
}
}|
|
||
| /// DML operations supported by a table. | ||
| #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)] | ||
| pub struct DmlCapabilities { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this API is necessary (see above for alternate)
| Ok((physical_expr, physical_name)) | ||
| } | ||
|
|
||
| /// Extract filter predicates from a DML input plan (DELETE/UPDATE). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some tests for the planning of delete and update -- specifically that the columns extracted are correct
Perhaps somewhere in https://github.com/apache/datafusion/blob/2db3aeaa53c2fe7cbc709ad517fd6255d5f02074/datafusion/core/tests/custom_sources_cases/mod.rs#L215-L214
The custom provider doesn't actually need to delete anything, just print out the columns that it was passed, for example
|
Thanks so much for the review @alamb! Really appreciate you taking the time And yeah, you're totally right about the redudent DmlCapabilities. The default impl already returns the error, so the capability check is basically just... checking if we're going to get an error before we get the error. lol. Will rip that out and simplify the planner logic. Also adding .context() for better errors and I'll throw in some tests for the filter/assignment extraction, was thinking a mock TableProvider that just captures what gets passed to it so we can assert on it. Should have something pushed soon. Thanks again for taking the time on this! |
- Remove DmlCapabilities struct and dml_capabilities() trait method - Simplify physical planner to try-call pattern with .context() errors - Add planning tests verifying filter/assignment extraction
- Remove DmlCapabilities struct and dml_capabilities() trait method - Simplify physical planner to try-call pattern with .context() errors - Add planning tests verifying filter/assignment extraction
|
@alamb addressed your feedback and pushed the changes:
Clippy is clean locally. Ready for another look when you get a chance. |
Update expected error messages in sqllogictests to match new error format from .context() wrapping in physical planner. The error messages now include context about the DML operation being performed. Old format: physical_plan_error Error during planning: Table 't1' does not support DELETE operations New format: physical_plan_error 01)DELETE operation on table 't1' 02)caused by 03)This feature is not implemented: DELETE not supported for Base table
|
@ethan-tyler could you update the PR description since |
|
|
||
| //! Tests for DELETE and UPDATE planning to verify filter and assignment extraction. | ||
| use std::any::Any; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How hard would it be to integrate this (or a different implementation) into our in-memory tables? It would be very cool to see an SLT test showing it working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good timing - I had this sitting in a separate branch to keep the API PR small. Merged it in now. Adds delete_from() and update() to MemTable as a working example. Two new SLT files:
- MemTable.delete_from() / MemTable.update() - reference implementation
- dml_delete.slt (202 lines, 10 tests) - DELETE operations
- dml_update.slt (259 lines, 12 tests) - UPDATE operations
Covers the basics plus NULL predicate handling (important for SQL correctness). The existing delete.slt/update.slt now expect success since MemTable supports these operations.
Thanks for the catch - updated! |
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small nits otherwise looks good to me!
| let filter_mask = if filters.is_empty() { | ||
| BooleanArray::from(vec![true; batch.num_rows()]) | ||
| } else { | ||
| let mut combined_mask: Option<BooleanArray> = None; | ||
|
|
||
| for filter_expr in &filters { | ||
| let physical_expr = create_physical_expr( | ||
| filter_expr, | ||
| &df_schema, | ||
| state.execution_props(), | ||
| )?; | ||
|
|
||
| let result = physical_expr.evaluate(batch)?; | ||
| let array = result.into_array(batch.num_rows())?; | ||
| let bool_array = array | ||
| .as_any() | ||
| .downcast_ref::<BooleanArray>() | ||
| .ok_or_else(|| { | ||
| datafusion_common::DataFusionError::Internal( | ||
| "Filter did not evaluate to boolean".to_string(), | ||
| ) | ||
| })? | ||
| .clone(); | ||
|
|
||
| combined_mask = Some(match combined_mask { | ||
| Some(existing) => and(&existing, &bool_array)?, | ||
| None => bool_array, | ||
| }); | ||
| } | ||
|
|
||
| combined_mask.unwrap_or_else(|| { | ||
| BooleanArray::from(vec![true; batch.num_rows()]) | ||
| }) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be factored into common code evaluate_filters_to_mask or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I extracted - it returns Option now so callers can handle the empty-filter case directly.
| } | ||
|
|
||
| let filter_mask = if filters.is_empty() { | ||
| BooleanArray::from(vec![true; batch.num_rows()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just exit early here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call - now returns None for empty filters instead of allocating an all-true mask.
| let physical_expr = create_physical_expr( | ||
| value_expr, | ||
| &df_schema, | ||
| state.execution_props(), | ||
| )?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create these upfront e.g. assignment_map.map(...) -> HashMap<String, Arc<dyn PhysicalExpr>>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Used String keys to avoid borrow-across-await.
8a7733f to
d4da744
Compare
# Conflicts: # datafusion/catalog/src/memory/table.rs
- Use evaluate_selection() instead of evaluate() for UPDATE assignments to avoid evaluating expressions on rows excluded by WHERE clause. This prevents errors like divide-by-zero on non-updated rows. - Extract evaluate_filters_to_mask() helper shared by delete_from/update - Hoist physical expression creation outside batch loop for efficiency - Simplify strip_column_qualifiers() signature - Add regression test for divide-by-zero edge case
d4da744 to
94a2b68
Compare
|
Thanks @ethan-tyler and @adriangb Are there any concerns about merging this PR? |
|
I think it's good to go! |
No additional concerns! Thank you so much @alamb @adriangb for helping on this one! |
|
🎉 |
Add infrastructure for row-level DML operations (DELETE/UPDATE) to the TableProvider trait, enabling storage engines to implement SQL-based mutations. Changes:
This provides the API surface for downstream projects (iceberg-rust, delta-rs) to implement DML without custom query planners.
Which issue does this PR close?
delete_fromandupdateinTableProvider#16959Rationale for this change
Datafusion parses DELETE/UPDATE but returns NotImplemented("Unsupported logical plan: Dml(Delete)") at physical planning. Downstream projects (iceberg-rust, delta-rs) must implement custom planners to work around this.
What changes are included in this PR?
Adds TableProvider hooks for row-level DML:
Physical planner routes WriteOp::Delete and WriteOp::Update to these methods. Tables that don't support DML return NotImplemented (the default behavior). MemTable reference implementation demonstrates:
Are these changes tested?
Yes:
Are there any user-facing changes?
New trait methods on TableProvider:
Fully backward compatible. Default implementations return NotImplemented.