From dfe9b18805772406b6990108418b5733b72b44e0 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Tue, 25 Mar 2025 17:45:47 -0700 Subject: [PATCH 1/2] set maximan 8 target partitions for merge insert update fragments --- rust/lance-datafusion/src/exec.rs | 22 ++++++++++++-------- rust/lance/src/dataset/write/merge_insert.rs | 3 ++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 46845f8dd74..cca7df44d16 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -182,6 +182,7 @@ pub struct LanceExecutionOptions { pub use_spilling: bool, pub mem_pool_size: Option, pub batch_size: Option, + pub target_partition: Option, } const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024; @@ -215,8 +216,11 @@ impl LanceExecutionOptions { } pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext { - let session_config = SessionConfig::new(); + let mut session_config = SessionConfig::new(); let mut runtime_env_builder = RuntimeEnvBuilder::new(); + if let Some(target_partition) = options.target_partition { + session_config = session_config.with_target_partitions(target_partition); + } if options.use_spilling() { runtime_env_builder = runtime_env_builder .with_disk_manager(DiskManagerConfig::new()) @@ -240,17 +244,17 @@ lazy_static! { } pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { - let session_ctx: SessionContext; + if options.target_partition.is_some() { + return new_session_context(options); + } if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE { - if options.use_spilling() { - session_ctx = DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone(); + return if options.use_spilling() { + DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone() } else { - session_ctx = DEFAULT_SESSION_CONTEXT.clone(); - } - } else { - session_ctx = new_session_context(options) + DEFAULT_SESSION_CONTEXT.clone() + }; } - session_ctx + new_session_context(options) } fn get_task_context( diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 8069999e29f..387e90e5fef 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -58,7 +58,7 @@ use futures::{ use lance_core::{ datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions}, error::{box_error, InvalidInputSnafu}, - utils::futures::Capacity, + utils::{futures::Capacity, tokio::get_num_compute_intensive_cpus}, Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, }; use lance_datafusion::{ @@ -665,6 +665,7 @@ impl MergeInsertJob { use datafusion::logical_expr::{col, lit}; let session_ctx = get_session_context(&LanceExecutionOptions { use_spilling: true, + target_partition: Some(get_num_compute_intensive_cpus().min(8)), ..Default::default() }); let mut group_stream = session_ctx From 16a1a65ea5e80dc32e9964586a33e0228fc2025b Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Wed, 26 Mar 2025 11:04:12 -0700 Subject: [PATCH 2/2] small fix for code style --- rust/lance-datafusion/src/exec.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index cca7df44d16..dbd6f6ceed9 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -244,10 +244,8 @@ lazy_static! { } pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { - if options.target_partition.is_some() { - return new_session_context(options); - } - if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE { + if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none() + { return if options.use_spilling() { DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone() } else {