diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 46845f8dd74..dbd6f6ceed9 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -182,6 +182,7 @@ pub struct LanceExecutionOptions { pub use_spilling: bool, pub mem_pool_size: Option, pub batch_size: Option, + pub target_partition: Option, } const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024; @@ -215,8 +216,11 @@ impl LanceExecutionOptions { } pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext { - let session_config = SessionConfig::new(); + let mut session_config = SessionConfig::new(); let mut runtime_env_builder = RuntimeEnvBuilder::new(); + if let Some(target_partition) = options.target_partition { + session_config = session_config.with_target_partitions(target_partition); + } if options.use_spilling() { runtime_env_builder = runtime_env_builder .with_disk_manager(DiskManagerConfig::new()) @@ -240,17 +244,15 @@ lazy_static! { } pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { - let session_ctx: SessionContext; - if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE { - if options.use_spilling() { - session_ctx = DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone(); + if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none() + { + return if options.use_spilling() { + DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone() } else { - session_ctx = DEFAULT_SESSION_CONTEXT.clone(); - } - } else { - session_ctx = new_session_context(options) + DEFAULT_SESSION_CONTEXT.clone() + }; } - session_ctx + new_session_context(options) } fn get_task_context( diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 8069999e29f..387e90e5fef 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -58,7 +58,7 @@ use futures::{ use lance_core::{ datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions}, error::{box_error, InvalidInputSnafu}, - utils::futures::Capacity, + utils::{futures::Capacity, tokio::get_num_compute_intensive_cpus}, Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, }; use lance_datafusion::{ @@ -665,6 +665,7 @@ impl MergeInsertJob { use datafusion::logical_expr::{col, lit}; let session_ctx = get_session_context(&LanceExecutionOptions { use_spilling: true, + target_partition: Some(get_num_compute_intensive_cpus().min(8)), ..Default::default() }); let mut group_stream = session_ctx