From d78c51bdf98f6b0ca9c0008e5a53c15e2897d6f5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 23 Feb 2026 08:25:32 +0800 Subject: [PATCH 01/11] perf: spawn structural decode batch tasks --- rust/lance-encoding/src/decoder.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 67996c41f3c..7b8622bade7 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1795,7 +1795,15 @@ impl StructuralBatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - async move { next_task.into_batch(emitted_batch_size_warning) }.await + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + .await + .map_err(|err| Error::Wrapped { + error: err.into(), + location: location!(), + })? }; (task, num_rows) }); From 628683b80d3108accaeaa77b763a16cb99077d9f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:00:26 +0800 Subject: [PATCH 02/11] perf: auto-tune structural batch decode spawning --- rust/lance-encoding/src/decoder.rs | 220 +++++++++++++++++++++++++++-- rust/lance-encoding/src/testing.rs | 1 + 2 files changed, 213 insertions(+), 8 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 7b8622bade7..7a80e6fda9d 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -259,6 +259,9 @@ use crate::{BufferScheduler, EncodingsIo}; // If users are getting batches over 10MiB large then it's time to reduce the batch size const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; +const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = + "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE"; +const DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES: usize = 1024; /// Top-level encoding message for a page. Wraps both the /// legacy pb::ArrayEncoding and the newer pb::PageLayout @@ -1689,6 +1692,7 @@ pub struct StructuralBatchDecodeStream { rows_drained: u64, scheduler_exhausted: bool, emitted_batch_size_warning: Arc, + spawn_batch_decode_tasks: bool, } impl StructuralBatchDecodeStream { @@ -1706,6 +1710,7 @@ impl StructuralBatchDecodeStream { rows_per_batch: u32, num_rows: u64, root_decoder: StructuralStructDecoder, + spawn_batch_decode_tasks: bool, ) -> Self { Self { context: DecoderContext::new(scheduled), @@ -1716,6 +1721,7 @@ impl StructuralBatchDecodeStream { rows_drained: 0, scheduler_exhausted: false, emitted_batch_size_warning: Arc::new(Once::new()), + spawn_batch_decode_tasks, } } @@ -1793,17 +1799,24 @@ impl StructuralBatchDecodeStream { let next_task = next_task.transpose().map(|next_task| { let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0); let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); + let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks; let task = async move { let next_task = next_task?; - // Real decode work happens inside into_batch, which can block the current - // thread for a long time. By spawning it as a new task, we allow Tokio's - // worker threads to keep making progress. - tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + if spawn_batch_decode_tasks { + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn( + async move { next_task.into_batch(emitted_batch_size_warning) }, + ) .await .map_err(|err| Error::Wrapped { error: err.into(), location: location!(), })? + } else { + next_task.into_batch(emitted_batch_size_warning) + } }; (task, num_rows) }); @@ -1844,12 +1857,68 @@ impl RequestedRows { } /// Configuration for decoder behavior -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct DecoderConfig { /// Whether to cache repetition indices for better performance pub cache_repetition_index: bool, /// Whether to validate decoded data pub validate_on_decode: bool, + /// Controls whether structural decode batches should be spawned as tokio tasks. + pub structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode, + /// In auto mode, takes with at most this many indices are treated as point lookups. + pub structural_batch_decode_point_lookup_max_indices: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StructuralBatchDecodeSpawnMode { + /// Choose strategy based on request shape: + /// - `Indices` with small cardinality => no spawn + /// - `Ranges` => spawn + /// - any filter => no spawn + Auto, + /// Always spawn a decode task for structural batches. + Always, + /// Never spawn a decode task for structural batches. + Never, +} + +impl Default for StructuralBatchDecodeSpawnMode { + fn default() -> Self { + static DEFAULT: LazyLock = LazyLock::new(|| { + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + match mode + .ok() + .as_deref() + .map(|value| value.trim().to_ascii_lowercase()) + .as_deref() + { + None | Some("") | Some("auto") => StructuralBatchDecodeSpawnMode::Auto, + Some("always") => StructuralBatchDecodeSpawnMode::Always, + Some("never") => StructuralBatchDecodeSpawnMode::Never, + Some(other) => { + warn!( + "Unknown value '{}' for {}. Valid values are auto|always|never. Falling back to auto.", + other, + ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE + ); + StructuralBatchDecodeSpawnMode::Auto + } + } + }); + *DEFAULT + } +} + +impl Default for DecoderConfig { + fn default() -> Self { + Self { + cache_repetition_index: false, + validate_on_decode: false, + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::default(), + structural_batch_decode_point_lookup_max_indices: + DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES, + } + } } #[derive(Debug, Clone)] @@ -1888,6 +1957,7 @@ pub fn create_decode_stream( batch_size: u32, is_structural: bool, should_validate: bool, + spawn_structural_batch_decode_tasks: bool, rx: mpsc::UnboundedReceiver>, ) -> Result> { if is_structural { @@ -1897,10 +1967,14 @@ pub fn create_decode_stream( should_validate, /*is_root=*/ true, )?; - Ok( - StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder) - .into_stream(), + Ok(StructuralBatchDecodeStream::new( + rx, + batch_size, + num_rows, + structural_decoder, + spawn_structural_batch_decode_tasks, ) + .into_stream()) } else { let arrow_schema = ArrowSchema::from(schema); let root_fields = arrow_schema.fields; @@ -1910,6 +1984,28 @@ pub fn create_decode_stream( } } +fn should_spawn_structural_batch_decode_tasks( + decoder_config: &DecoderConfig, + requested_rows: &RequestedRows, + filter: &FilterExpression, +) -> bool { + match decoder_config.structural_batch_decode_spawn_mode { + StructuralBatchDecodeSpawnMode::Always => true, + StructuralBatchDecodeSpawnMode::Never => false, + StructuralBatchDecodeSpawnMode::Auto => { + if !filter.is_noop() { + return false; + } + match requested_rows { + RequestedRows::Ranges(_) => true, + RequestedRows::Indices(indices) => { + indices.len() > decoder_config.structural_batch_decode_point_lookup_max_indices + } + } + } + } +} + /// Creates a iterator that decodes a set of messages in a blocking fashion /// /// See [`schedule_and_decode_blocking`] for more information. @@ -1956,6 +2052,11 @@ fn create_scheduler_decoder( let num_rows = requested_rows.num_rows(); let is_structural = column_infos[0].is_structural(); + let spawn_structural_batch_decode_tasks = should_spawn_structural_batch_decode_tasks( + &config.decoder_config, + &requested_rows, + &filter, + ); let (tx, rx) = mpsc::unbounded_channel(); @@ -1965,6 +2066,7 @@ fn create_scheduler_decoder( config.batch_size, is_structural, config.decoder_config.validate_on_decode, + spawn_structural_batch_decode_tasks, rx, )?; @@ -2664,12 +2766,19 @@ pub async fn decode_batch( let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); let is_structural = version >= LanceFileVersion::V2_1; + let requested_rows = RequestedRows::Ranges(vec![0..batch.num_rows]); + let spawn_structural_batch_decode_tasks = should_spawn_structural_batch_decode_tasks( + &DecoderConfig::default(), + &requested_rows, + filter, + ); let mut decode_stream = create_decode_stream( &batch.schema, batch.num_rows, batch.num_rows as u32, is_structural, should_validate, + spawn_structural_batch_decode_tasks, rx, )?; decode_stream.next().await.unwrap().task.await @@ -2679,6 +2788,7 @@ pub async fn decode_batch( // test coalesce indices to ranges mod tests { use super::*; + use bytes::Bytes; #[test] fn test_coalesce_indices_to_ranges_with_single_index() { @@ -2700,4 +2810,98 @@ mod tests { let ranges = DecodeBatchScheduler::indices_to_ranges(&indices); assert_eq!(ranges, vec![1..4, 5..8, 9..10]); } + + #[test] + fn test_spawn_structural_batch_decode_tasks_auto_small_take() { + let config = DecoderConfig { + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, + structural_batch_decode_point_lookup_max_indices: 8, + ..Default::default() + }; + let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); + + assert!(!should_spawn_structural_batch_decode_tasks( + &config, + &requested_rows, + &FilterExpression::no_filter() + )); + } + + #[test] + fn test_spawn_structural_batch_decode_tasks_auto_large_take() { + let config = DecoderConfig { + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, + structural_batch_decode_point_lookup_max_indices: 2, + ..Default::default() + }; + let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); + + assert!(should_spawn_structural_batch_decode_tasks( + &config, + &requested_rows, + &FilterExpression::no_filter() + )); + } + + #[test] + fn test_spawn_structural_batch_decode_tasks_auto_ranges() { + let config = DecoderConfig { + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, + ..Default::default() + }; + let requested_rows = RequestedRows::Ranges(vec![0..100]); + + assert!(should_spawn_structural_batch_decode_tasks( + &config, + &requested_rows, + &FilterExpression::no_filter() + )); + } + + #[test] + fn test_spawn_structural_batch_decode_tasks_auto_filter_disables_spawn() { + let config = DecoderConfig { + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, + ..Default::default() + }; + let requested_rows = RequestedRows::Ranges(vec![0..100]); + let filter = FilterExpression(Bytes::from_static(b"has-filter")); + + assert!(!should_spawn_structural_batch_decode_tasks( + &config, + &requested_rows, + &filter + )); + } + + #[test] + fn test_spawn_structural_batch_decode_tasks_always() { + let config = DecoderConfig { + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Always, + ..Default::default() + }; + let requested_rows = RequestedRows::Indices(vec![1]); + let filter = FilterExpression(Bytes::from_static(b"has-filter")); + + assert!(should_spawn_structural_batch_decode_tasks( + &config, + &requested_rows, + &filter + )); + } + + #[test] + fn test_spawn_structural_batch_decode_tasks_never() { + let config = DecoderConfig { + structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Never, + ..Default::default() + }; + let requested_rows = RequestedRows::Ranges(vec![0..100]); + + assert!(!should_spawn_structural_batch_decode_tasks( + &config, + &requested_rows, + &FilterExpression::no_filter() + )); + } } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 37df889035f..8bed74506df 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -216,6 +216,7 @@ async fn test_decode( batch_size, is_structural_encoding, /*should_validate=*/ true, + /*spawn_structural_batch_decode_tasks=*/ is_structural_encoding, rx, ) .unwrap(); From 3a825d61f85a1c24fa5d00310188a782bc23a3bd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:02:41 +0800 Subject: [PATCH 03/11] Revert "perf: auto-tune structural batch decode spawning" This reverts commit 628683b80d3108accaeaa77b763a16cb99077d9f. --- rust/lance-encoding/src/decoder.rs | 220 ++--------------------------- rust/lance-encoding/src/testing.rs | 1 - 2 files changed, 8 insertions(+), 213 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 7a80e6fda9d..7b8622bade7 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -259,9 +259,6 @@ use crate::{BufferScheduler, EncodingsIo}; // If users are getting batches over 10MiB large then it's time to reduce the batch size const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; -const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = - "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE"; -const DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES: usize = 1024; /// Top-level encoding message for a page. Wraps both the /// legacy pb::ArrayEncoding and the newer pb::PageLayout @@ -1692,7 +1689,6 @@ pub struct StructuralBatchDecodeStream { rows_drained: u64, scheduler_exhausted: bool, emitted_batch_size_warning: Arc, - spawn_batch_decode_tasks: bool, } impl StructuralBatchDecodeStream { @@ -1710,7 +1706,6 @@ impl StructuralBatchDecodeStream { rows_per_batch: u32, num_rows: u64, root_decoder: StructuralStructDecoder, - spawn_batch_decode_tasks: bool, ) -> Self { Self { context: DecoderContext::new(scheduled), @@ -1721,7 +1716,6 @@ impl StructuralBatchDecodeStream { rows_drained: 0, scheduler_exhausted: false, emitted_batch_size_warning: Arc::new(Once::new()), - spawn_batch_decode_tasks, } } @@ -1799,24 +1793,17 @@ impl StructuralBatchDecodeStream { let next_task = next_task.transpose().map(|next_task| { let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0); let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); - let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks; let task = async move { let next_task = next_task?; - if spawn_batch_decode_tasks { - // Real decode work happens inside into_batch, which can block the current - // thread for a long time. By spawning it as a new task, we allow Tokio's - // worker threads to keep making progress. - tokio::spawn( - async move { next_task.into_batch(emitted_batch_size_warning) }, - ) + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) .await .map_err(|err| Error::Wrapped { error: err.into(), location: location!(), })? - } else { - next_task.into_batch(emitted_batch_size_warning) - } }; (task, num_rows) }); @@ -1857,68 +1844,12 @@ impl RequestedRows { } /// Configuration for decoder behavior -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct DecoderConfig { /// Whether to cache repetition indices for better performance pub cache_repetition_index: bool, /// Whether to validate decoded data pub validate_on_decode: bool, - /// Controls whether structural decode batches should be spawned as tokio tasks. - pub structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode, - /// In auto mode, takes with at most this many indices are treated as point lookups. - pub structural_batch_decode_point_lookup_max_indices: usize, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum StructuralBatchDecodeSpawnMode { - /// Choose strategy based on request shape: - /// - `Indices` with small cardinality => no spawn - /// - `Ranges` => spawn - /// - any filter => no spawn - Auto, - /// Always spawn a decode task for structural batches. - Always, - /// Never spawn a decode task for structural batches. - Never, -} - -impl Default for StructuralBatchDecodeSpawnMode { - fn default() -> Self { - static DEFAULT: LazyLock = LazyLock::new(|| { - let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); - match mode - .ok() - .as_deref() - .map(|value| value.trim().to_ascii_lowercase()) - .as_deref() - { - None | Some("") | Some("auto") => StructuralBatchDecodeSpawnMode::Auto, - Some("always") => StructuralBatchDecodeSpawnMode::Always, - Some("never") => StructuralBatchDecodeSpawnMode::Never, - Some(other) => { - warn!( - "Unknown value '{}' for {}. Valid values are auto|always|never. Falling back to auto.", - other, - ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE - ); - StructuralBatchDecodeSpawnMode::Auto - } - } - }); - *DEFAULT - } -} - -impl Default for DecoderConfig { - fn default() -> Self { - Self { - cache_repetition_index: false, - validate_on_decode: false, - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::default(), - structural_batch_decode_point_lookup_max_indices: - DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES, - } - } } #[derive(Debug, Clone)] @@ -1957,7 +1888,6 @@ pub fn create_decode_stream( batch_size: u32, is_structural: bool, should_validate: bool, - spawn_structural_batch_decode_tasks: bool, rx: mpsc::UnboundedReceiver>, ) -> Result> { if is_structural { @@ -1967,14 +1897,10 @@ pub fn create_decode_stream( should_validate, /*is_root=*/ true, )?; - Ok(StructuralBatchDecodeStream::new( - rx, - batch_size, - num_rows, - structural_decoder, - spawn_structural_batch_decode_tasks, + Ok( + StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder) + .into_stream(), ) - .into_stream()) } else { let arrow_schema = ArrowSchema::from(schema); let root_fields = arrow_schema.fields; @@ -1984,28 +1910,6 @@ pub fn create_decode_stream( } } -fn should_spawn_structural_batch_decode_tasks( - decoder_config: &DecoderConfig, - requested_rows: &RequestedRows, - filter: &FilterExpression, -) -> bool { - match decoder_config.structural_batch_decode_spawn_mode { - StructuralBatchDecodeSpawnMode::Always => true, - StructuralBatchDecodeSpawnMode::Never => false, - StructuralBatchDecodeSpawnMode::Auto => { - if !filter.is_noop() { - return false; - } - match requested_rows { - RequestedRows::Ranges(_) => true, - RequestedRows::Indices(indices) => { - indices.len() > decoder_config.structural_batch_decode_point_lookup_max_indices - } - } - } - } -} - /// Creates a iterator that decodes a set of messages in a blocking fashion /// /// See [`schedule_and_decode_blocking`] for more information. @@ -2052,11 +1956,6 @@ fn create_scheduler_decoder( let num_rows = requested_rows.num_rows(); let is_structural = column_infos[0].is_structural(); - let spawn_structural_batch_decode_tasks = should_spawn_structural_batch_decode_tasks( - &config.decoder_config, - &requested_rows, - &filter, - ); let (tx, rx) = mpsc::unbounded_channel(); @@ -2066,7 +1965,6 @@ fn create_scheduler_decoder( config.batch_size, is_structural, config.decoder_config.validate_on_decode, - spawn_structural_batch_decode_tasks, rx, )?; @@ -2766,19 +2664,12 @@ pub async fn decode_batch( let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); let is_structural = version >= LanceFileVersion::V2_1; - let requested_rows = RequestedRows::Ranges(vec![0..batch.num_rows]); - let spawn_structural_batch_decode_tasks = should_spawn_structural_batch_decode_tasks( - &DecoderConfig::default(), - &requested_rows, - filter, - ); let mut decode_stream = create_decode_stream( &batch.schema, batch.num_rows, batch.num_rows as u32, is_structural, should_validate, - spawn_structural_batch_decode_tasks, rx, )?; decode_stream.next().await.unwrap().task.await @@ -2788,7 +2679,6 @@ pub async fn decode_batch( // test coalesce indices to ranges mod tests { use super::*; - use bytes::Bytes; #[test] fn test_coalesce_indices_to_ranges_with_single_index() { @@ -2810,98 +2700,4 @@ mod tests { let ranges = DecodeBatchScheduler::indices_to_ranges(&indices); assert_eq!(ranges, vec![1..4, 5..8, 9..10]); } - - #[test] - fn test_spawn_structural_batch_decode_tasks_auto_small_take() { - let config = DecoderConfig { - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, - structural_batch_decode_point_lookup_max_indices: 8, - ..Default::default() - }; - let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); - - assert!(!should_spawn_structural_batch_decode_tasks( - &config, - &requested_rows, - &FilterExpression::no_filter() - )); - } - - #[test] - fn test_spawn_structural_batch_decode_tasks_auto_large_take() { - let config = DecoderConfig { - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, - structural_batch_decode_point_lookup_max_indices: 2, - ..Default::default() - }; - let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); - - assert!(should_spawn_structural_batch_decode_tasks( - &config, - &requested_rows, - &FilterExpression::no_filter() - )); - } - - #[test] - fn test_spawn_structural_batch_decode_tasks_auto_ranges() { - let config = DecoderConfig { - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, - ..Default::default() - }; - let requested_rows = RequestedRows::Ranges(vec![0..100]); - - assert!(should_spawn_structural_batch_decode_tasks( - &config, - &requested_rows, - &FilterExpression::no_filter() - )); - } - - #[test] - fn test_spawn_structural_batch_decode_tasks_auto_filter_disables_spawn() { - let config = DecoderConfig { - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Auto, - ..Default::default() - }; - let requested_rows = RequestedRows::Ranges(vec![0..100]); - let filter = FilterExpression(Bytes::from_static(b"has-filter")); - - assert!(!should_spawn_structural_batch_decode_tasks( - &config, - &requested_rows, - &filter - )); - } - - #[test] - fn test_spawn_structural_batch_decode_tasks_always() { - let config = DecoderConfig { - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Always, - ..Default::default() - }; - let requested_rows = RequestedRows::Indices(vec![1]); - let filter = FilterExpression(Bytes::from_static(b"has-filter")); - - assert!(should_spawn_structural_batch_decode_tasks( - &config, - &requested_rows, - &filter - )); - } - - #[test] - fn test_spawn_structural_batch_decode_tasks_never() { - let config = DecoderConfig { - structural_batch_decode_spawn_mode: StructuralBatchDecodeSpawnMode::Never, - ..Default::default() - }; - let requested_rows = RequestedRows::Ranges(vec![0..100]); - - assert!(!should_spawn_structural_batch_decode_tasks( - &config, - &requested_rows, - &FilterExpression::no_filter() - )); - } } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 8bed74506df..37df889035f 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -216,7 +216,6 @@ async fn test_decode( batch_size, is_structural_encoding, /*should_validate=*/ true, - /*spawn_structural_batch_decode_tasks=*/ is_structural_encoding, rx, ) .unwrap(); From db60f8b7b9d1283aaed63d5349abce3345ed72f4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:07:58 +0800 Subject: [PATCH 04/11] perf: auto-select structural decode spawning by access pattern --- rust/lance-encoding/src/decoder.rs | 221 +++++++++++++++++++++++++++-- 1 file changed, 212 insertions(+), 9 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 7b8622bade7..3e063a33803 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -259,6 +259,11 @@ use crate::{BufferScheduler, EncodingsIo}; // If users are getting batches over 10MiB large then it's time to reduce the batch size const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; +const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = + "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE"; +const ENV_LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES: &str = + "LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES"; +const DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES: usize = 1024; /// Top-level encoding message for a page. Wraps both the /// legacy pb::ArrayEncoding and the newer pb::PageLayout @@ -1689,6 +1694,7 @@ pub struct StructuralBatchDecodeStream { rows_drained: u64, scheduler_exhausted: bool, emitted_batch_size_warning: Arc, + spawn_batch_decode_tasks: bool, } impl StructuralBatchDecodeStream { @@ -1706,6 +1712,7 @@ impl StructuralBatchDecodeStream { rows_per_batch: u32, num_rows: u64, root_decoder: StructuralStructDecoder, + spawn_batch_decode_tasks: bool, ) -> Self { Self { context: DecoderContext::new(scheduled), @@ -1716,6 +1723,7 @@ impl StructuralBatchDecodeStream { rows_drained: 0, scheduler_exhausted: false, emitted_batch_size_warning: Arc::new(Once::new()), + spawn_batch_decode_tasks, } } @@ -1793,17 +1801,24 @@ impl StructuralBatchDecodeStream { let next_task = next_task.transpose().map(|next_task| { let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0); let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); + let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks; let task = async move { let next_task = next_task?; - // Real decode work happens inside into_batch, which can block the current - // thread for a long time. By spawning it as a new task, we allow Tokio's - // worker threads to keep making progress. - tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) + if spawn_batch_decode_tasks { + // Real decode work happens inside into_batch, which can block the current + // thread for a long time. By spawning it as a new task, we allow Tokio's + // worker threads to keep making progress. + tokio::spawn( + async move { next_task.into_batch(emitted_batch_size_warning) }, + ) .await .map_err(|err| Error::Wrapped { error: err.into(), location: location!(), })? + } else { + next_task.into_batch(emitted_batch_size_warning) + } }; (task, num_rows) }); @@ -1862,6 +1877,88 @@ pub struct SchedulerDecoderConfig { pub decoder_config: DecoderConfig, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum StructuralBatchDecodeSpawnMode { + Auto, + Always, + Never, +} + +fn structural_batch_decode_spawn_mode() -> StructuralBatchDecodeSpawnMode { + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + match mode + .ok() + .as_deref() + .map(|value| value.trim().to_ascii_lowercase()) + .as_deref() + { + None | Some("") | Some("auto") => StructuralBatchDecodeSpawnMode::Auto, + Some("always") => StructuralBatchDecodeSpawnMode::Always, + Some("never") => StructuralBatchDecodeSpawnMode::Never, + Some(other) => { + warn!( + "Unknown value '{}' for {}. Valid values are auto|always|never. Falling back to auto.", + other, + ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE + ); + StructuralBatchDecodeSpawnMode::Auto + } + } +} + +fn structural_point_lookup_max_indices() -> usize { + let from_env = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES); + match from_env { + Ok(raw) => match raw.parse::() { + Ok(parsed) => parsed, + Err(err) => { + warn!( + "Failed to parse {}='{}': {}. Falling back to default {}.", + ENV_LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES, + raw, + err, + DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES + ); + DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES + } + }, + Err(_) => DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES, + } +} + +fn should_spawn_structural_batch_decode_tasks_with_mode( + mode: StructuralBatchDecodeSpawnMode, + point_lookup_max_indices: usize, + requested_rows: &RequestedRows, + filter: &FilterExpression, +) -> bool { + match mode { + StructuralBatchDecodeSpawnMode::Always => true, + StructuralBatchDecodeSpawnMode::Never => false, + StructuralBatchDecodeSpawnMode::Auto => { + if !filter.is_noop() { + return false; + } + match requested_rows { + RequestedRows::Ranges(_) => true, + RequestedRows::Indices(indices) => indices.len() > point_lookup_max_indices, + } + } + } +} + +fn should_spawn_structural_batch_decode_tasks( + requested_rows: &RequestedRows, + filter: &FilterExpression, +) -> bool { + should_spawn_structural_batch_decode_tasks_with_mode( + structural_batch_decode_spawn_mode(), + structural_point_lookup_max_indices(), + requested_rows, + filter, + ) +} + fn check_scheduler_on_drop( stream: BoxStream<'static, ReadBatchTask>, scheduler_handle: tokio::task::JoinHandle<()>, @@ -1889,6 +1986,32 @@ pub fn create_decode_stream( is_structural: bool, should_validate: bool, rx: mpsc::UnboundedReceiver>, +) -> Result> { + let spawn_structural_batch_decode_tasks = match structural_batch_decode_spawn_mode() { + StructuralBatchDecodeSpawnMode::Always => true, + StructuralBatchDecodeSpawnMode::Never => false, + // Keep existing behavior for contextless call sites (historically we spawned). + StructuralBatchDecodeSpawnMode::Auto => true, + }; + create_decode_stream_with_spawn( + schema, + num_rows, + batch_size, + is_structural, + should_validate, + spawn_structural_batch_decode_tasks, + rx, + ) +} + +fn create_decode_stream_with_spawn( + schema: &Schema, + num_rows: u64, + batch_size: u32, + is_structural: bool, + should_validate: bool, + spawn_structural_batch_decode_tasks: bool, + rx: mpsc::UnboundedReceiver>, ) -> Result> { if is_structural { let arrow_schema = ArrowSchema::from(schema); @@ -1897,10 +2020,14 @@ pub fn create_decode_stream( should_validate, /*is_root=*/ true, )?; - Ok( - StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder) - .into_stream(), + Ok(StructuralBatchDecodeStream::new( + rx, + batch_size, + num_rows, + structural_decoder, + spawn_structural_batch_decode_tasks, ) + .into_stream()) } else { let arrow_schema = ArrowSchema::from(schema); let root_fields = arrow_schema.fields; @@ -1956,15 +2083,18 @@ fn create_scheduler_decoder( let num_rows = requested_rows.num_rows(); let is_structural = column_infos[0].is_structural(); + let spawn_structural_batch_decode_tasks = + should_spawn_structural_batch_decode_tasks(&requested_rows, &filter); let (tx, rx) = mpsc::unbounded_channel(); - let decode_stream = create_decode_stream( + let decode_stream = create_decode_stream_with_spawn( &target_schema, num_rows, config.batch_size, is_structural, config.decoder_config.validate_on_decode, + spawn_structural_batch_decode_tasks, rx, )?; @@ -2664,12 +2794,16 @@ pub async fn decode_batch( let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); let is_structural = version >= LanceFileVersion::V2_1; - let mut decode_stream = create_decode_stream( + let requested_rows = RequestedRows::Ranges(vec![0..batch.num_rows]); + let spawn_structural_batch_decode_tasks = + should_spawn_structural_batch_decode_tasks(&requested_rows, filter); + let mut decode_stream = create_decode_stream_with_spawn( &batch.schema, batch.num_rows, batch.num_rows as u32, is_structural, should_validate, + spawn_structural_batch_decode_tasks, rx, )?; decode_stream.next().await.unwrap().task.await @@ -2679,6 +2813,7 @@ pub async fn decode_batch( // test coalesce indices to ranges mod tests { use super::*; + use bytes::Bytes; #[test] fn test_coalesce_indices_to_ranges_with_single_index() { @@ -2700,4 +2835,72 @@ mod tests { let ranges = DecodeBatchScheduler::indices_to_ranges(&indices); assert_eq!(ranges, vec![1..4, 5..8, 9..10]); } + + #[test] + fn test_spawn_policy_auto_small_take() { + let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); + assert!(!should_spawn_structural_batch_decode_tasks_with_mode( + StructuralBatchDecodeSpawnMode::Auto, + 8, + &requested_rows, + &FilterExpression::no_filter() + )); + } + + #[test] + fn test_spawn_policy_auto_large_take() { + let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); + assert!(should_spawn_structural_batch_decode_tasks_with_mode( + StructuralBatchDecodeSpawnMode::Auto, + 2, + &requested_rows, + &FilterExpression::no_filter() + )); + } + + #[test] + fn test_spawn_policy_auto_ranges() { + let requested_rows = RequestedRows::Ranges(vec![0..100]); + assert!(should_spawn_structural_batch_decode_tasks_with_mode( + StructuralBatchDecodeSpawnMode::Auto, + 1024, + &requested_rows, + &FilterExpression::no_filter() + )); + } + + #[test] + fn test_spawn_policy_auto_filter_disables_spawn() { + let requested_rows = RequestedRows::Ranges(vec![0..100]); + let filter = FilterExpression(Bytes::from_static(b"has-filter")); + assert!(!should_spawn_structural_batch_decode_tasks_with_mode( + StructuralBatchDecodeSpawnMode::Auto, + 1024, + &requested_rows, + &filter + )); + } + + #[test] + fn test_spawn_policy_always() { + let requested_rows = RequestedRows::Indices(vec![1]); + let filter = FilterExpression(Bytes::from_static(b"has-filter")); + assert!(should_spawn_structural_batch_decode_tasks_with_mode( + StructuralBatchDecodeSpawnMode::Always, + 1024, + &requested_rows, + &filter + )); + } + + #[test] + fn test_spawn_policy_never() { + let requested_rows = RequestedRows::Ranges(vec![0..100]); + assert!(!should_spawn_structural_batch_decode_tasks_with_mode( + StructuralBatchDecodeSpawnMode::Never, + 1024, + &requested_rows, + &FilterExpression::no_filter() + )); + } } From 7e25ba2c2d463caec22f52d65ea95489ddf499c5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:12:01 +0800 Subject: [PATCH 05/11] perf: keep only spawn mode env for structural decode --- rust/lance-encoding/src/decoder.rs | 38 +++++------------------------- 1 file changed, 6 insertions(+), 32 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 3e063a33803..7617c38d83e 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -261,8 +261,6 @@ use crate::{BufferScheduler, EncodingsIo}; const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE"; -const ENV_LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES: &str = - "LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES"; const DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES: usize = 1024; /// Top-level encoding message for a page. Wraps both the @@ -1906,29 +1904,8 @@ fn structural_batch_decode_spawn_mode() -> StructuralBatchDecodeSpawnMode { } } -fn structural_point_lookup_max_indices() -> usize { - let from_env = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES); - match from_env { - Ok(raw) => match raw.parse::() { - Ok(parsed) => parsed, - Err(err) => { - warn!( - "Failed to parse {}='{}': {}. Falling back to default {}.", - ENV_LANCE_STRUCTURAL_BATCH_DECODE_POINT_LOOKUP_MAX_INDICES, - raw, - err, - DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES - ); - DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES - } - }, - Err(_) => DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES, - } -} - fn should_spawn_structural_batch_decode_tasks_with_mode( mode: StructuralBatchDecodeSpawnMode, - point_lookup_max_indices: usize, requested_rows: &RequestedRows, filter: &FilterExpression, ) -> bool { @@ -1941,7 +1918,9 @@ fn should_spawn_structural_batch_decode_tasks_with_mode( } match requested_rows { RequestedRows::Ranges(_) => true, - RequestedRows::Indices(indices) => indices.len() > point_lookup_max_indices, + RequestedRows::Indices(indices) => { + indices.len() > DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES + } } } } @@ -1953,7 +1932,6 @@ fn should_spawn_structural_batch_decode_tasks( ) -> bool { should_spawn_structural_batch_decode_tasks_with_mode( structural_batch_decode_spawn_mode(), - structural_point_lookup_max_indices(), requested_rows, filter, ) @@ -2841,7 +2819,6 @@ mod tests { let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Auto, - 8, &requested_rows, &FilterExpression::no_filter() )); @@ -2849,10 +2826,11 @@ mod tests { #[test] fn test_spawn_policy_auto_large_take() { - let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); + let requested_rows = RequestedRows::Indices( + (0..=(DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES as u64)).collect(), + ); assert!(should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Auto, - 2, &requested_rows, &FilterExpression::no_filter() )); @@ -2863,7 +2841,6 @@ mod tests { let requested_rows = RequestedRows::Ranges(vec![0..100]); assert!(should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Auto, - 1024, &requested_rows, &FilterExpression::no_filter() )); @@ -2875,7 +2852,6 @@ mod tests { let filter = FilterExpression(Bytes::from_static(b"has-filter")); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Auto, - 1024, &requested_rows, &filter )); @@ -2887,7 +2863,6 @@ mod tests { let filter = FilterExpression(Bytes::from_static(b"has-filter")); assert!(should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Always, - 1024, &requested_rows, &filter )); @@ -2898,7 +2873,6 @@ mod tests { let requested_rows = RequestedRows::Ranges(vec![0..100]); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Never, - 1024, &requested_rows, &FilterExpression::no_filter() )); From 46f498e059ba05032eadca9eb3edcc903bb249a8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:14:07 +0800 Subject: [PATCH 06/11] refactor: simplify structural spawn auto policy --- rust/lance-encoding/src/decoder.rs | 65 +++++------------------------- 1 file changed, 10 insertions(+), 55 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 7617c38d83e..d36b8d2d63d 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -261,7 +261,6 @@ use crate::{BufferScheduler, EncodingsIo}; const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE"; -const DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES: usize = 1024; /// Top-level encoding message for a page. Wraps both the /// legacy pb::ArrayEncoding and the newer pb::PageLayout @@ -1907,33 +1906,18 @@ fn structural_batch_decode_spawn_mode() -> StructuralBatchDecodeSpawnMode { fn should_spawn_structural_batch_decode_tasks_with_mode( mode: StructuralBatchDecodeSpawnMode, requested_rows: &RequestedRows, - filter: &FilterExpression, ) -> bool { match mode { StructuralBatchDecodeSpawnMode::Always => true, StructuralBatchDecodeSpawnMode::Never => false, - StructuralBatchDecodeSpawnMode::Auto => { - if !filter.is_noop() { - return false; - } - match requested_rows { - RequestedRows::Ranges(_) => true, - RequestedRows::Indices(indices) => { - indices.len() > DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES - } - } - } + StructuralBatchDecodeSpawnMode::Auto => matches!(requested_rows, RequestedRows::Ranges(_)), } } -fn should_spawn_structural_batch_decode_tasks( - requested_rows: &RequestedRows, - filter: &FilterExpression, -) -> bool { +fn should_spawn_structural_batch_decode_tasks(requested_rows: &RequestedRows) -> bool { should_spawn_structural_batch_decode_tasks_with_mode( structural_batch_decode_spawn_mode(), requested_rows, - filter, ) } @@ -2062,7 +2046,7 @@ fn create_scheduler_decoder( let is_structural = column_infos[0].is_structural(); let spawn_structural_batch_decode_tasks = - should_spawn_structural_batch_decode_tasks(&requested_rows, &filter); + should_spawn_structural_batch_decode_tasks(&requested_rows); let (tx, rx) = mpsc::unbounded_channel(); @@ -2774,7 +2758,7 @@ pub async fn decode_batch( let is_structural = version >= LanceFileVersion::V2_1; let requested_rows = RequestedRows::Ranges(vec![0..batch.num_rows]); let spawn_structural_batch_decode_tasks = - should_spawn_structural_batch_decode_tasks(&requested_rows, filter); + should_spawn_structural_batch_decode_tasks(&requested_rows); let mut decode_stream = create_decode_stream_with_spawn( &batch.schema, batch.num_rows, @@ -2791,7 +2775,6 @@ pub async fn decode_batch( // test coalesce indices to ranges mod tests { use super::*; - use bytes::Bytes; #[test] fn test_coalesce_indices_to_ranges_with_single_index() { @@ -2814,57 +2797,30 @@ mod tests { assert_eq!(ranges, vec![1..4, 5..8, 9..10]); } - #[test] - fn test_spawn_policy_auto_small_take() { - let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); - assert!(!should_spawn_structural_batch_decode_tasks_with_mode( - StructuralBatchDecodeSpawnMode::Auto, - &requested_rows, - &FilterExpression::no_filter() - )); - } - - #[test] - fn test_spawn_policy_auto_large_take() { - let requested_rows = RequestedRows::Indices( - (0..=(DEFAULT_STRUCTURAL_POINT_LOOKUP_MAX_INDICES as u64)).collect(), - ); - assert!(should_spawn_structural_batch_decode_tasks_with_mode( - StructuralBatchDecodeSpawnMode::Auto, - &requested_rows, - &FilterExpression::no_filter() - )); - } - #[test] fn test_spawn_policy_auto_ranges() { let requested_rows = RequestedRows::Ranges(vec![0..100]); assert!(should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Auto, - &requested_rows, - &FilterExpression::no_filter() + &requested_rows )); } #[test] - fn test_spawn_policy_auto_filter_disables_spawn() { - let requested_rows = RequestedRows::Ranges(vec![0..100]); - let filter = FilterExpression(Bytes::from_static(b"has-filter")); + fn test_spawn_policy_auto_indices_never_spawn() { + let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Auto, - &requested_rows, - &filter + &requested_rows )); } #[test] fn test_spawn_policy_always() { let requested_rows = RequestedRows::Indices(vec![1]); - let filter = FilterExpression(Bytes::from_static(b"has-filter")); assert!(should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Always, - &requested_rows, - &filter + &requested_rows )); } @@ -2873,8 +2829,7 @@ mod tests { let requested_rows = RequestedRows::Ranges(vec![0..100]); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( StructuralBatchDecodeSpawnMode::Never, - &requested_rows, - &FilterExpression::no_filter() + &requested_rows )); } } From 37c9728bbcec3e563f49bea689ec6e70045995a7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:19:41 +0800 Subject: [PATCH 07/11] refactor: simplify structural spawn mode parsing --- rust/lance-encoding/src/decoder.rs | 74 +++++++++++------------------- 1 file changed, 27 insertions(+), 47 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index d36b8d2d63d..3bf95462420 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1874,51 +1874,25 @@ pub struct SchedulerDecoderConfig { pub decoder_config: DecoderConfig, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum StructuralBatchDecodeSpawnMode { - Auto, - Always, - Never, -} - -fn structural_batch_decode_spawn_mode() -> StructuralBatchDecodeSpawnMode { - let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); - match mode - .ok() - .as_deref() - .map(|value| value.trim().to_ascii_lowercase()) - .as_deref() - { - None | Some("") | Some("auto") => StructuralBatchDecodeSpawnMode::Auto, - Some("always") => StructuralBatchDecodeSpawnMode::Always, - Some("never") => StructuralBatchDecodeSpawnMode::Never, - Some(other) => { - warn!( - "Unknown value '{}' for {}. Valid values are auto|always|never. Falling back to auto.", - other, - ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE - ); - StructuralBatchDecodeSpawnMode::Auto - } - } -} - fn should_spawn_structural_batch_decode_tasks_with_mode( - mode: StructuralBatchDecodeSpawnMode, + mode: Option<&str>, requested_rows: &RequestedRows, ) -> bool { match mode { - StructuralBatchDecodeSpawnMode::Always => true, - StructuralBatchDecodeSpawnMode::Never => false, - StructuralBatchDecodeSpawnMode::Auto => matches!(requested_rows, RequestedRows::Ranges(_)), + Some("always") => true, + Some("never") => false, + _ => matches!(requested_rows, RequestedRows::Ranges(_)), } } fn should_spawn_structural_batch_decode_tasks(requested_rows: &RequestedRows) -> bool { - should_spawn_structural_batch_decode_tasks_with_mode( - structural_batch_decode_spawn_mode(), - requested_rows, - ) + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + should_spawn_structural_batch_decode_tasks_with_mode(mode.ok().as_deref(), requested_rows) +} + +fn should_spawn_structural_batch_decode_tasks_without_request() -> bool { + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + !matches!(mode.ok().as_deref(), Some("never")) } fn check_scheduler_on_drop( @@ -1949,12 +1923,9 @@ pub fn create_decode_stream( should_validate: bool, rx: mpsc::UnboundedReceiver>, ) -> Result> { - let spawn_structural_batch_decode_tasks = match structural_batch_decode_spawn_mode() { - StructuralBatchDecodeSpawnMode::Always => true, - StructuralBatchDecodeSpawnMode::Never => false, - // Keep existing behavior for contextless call sites (historically we spawned). - StructuralBatchDecodeSpawnMode::Auto => true, - }; + // Keep existing behavior for contextless call sites (historically we spawned). + let spawn_structural_batch_decode_tasks = + should_spawn_structural_batch_decode_tasks_without_request(); create_decode_stream_with_spawn( schema, num_rows, @@ -2801,7 +2772,7 @@ mod tests { fn test_spawn_policy_auto_ranges() { let requested_rows = RequestedRows::Ranges(vec![0..100]); assert!(should_spawn_structural_batch_decode_tasks_with_mode( - StructuralBatchDecodeSpawnMode::Auto, + Some("auto"), &requested_rows )); } @@ -2810,7 +2781,7 @@ mod tests { fn test_spawn_policy_auto_indices_never_spawn() { let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( - StructuralBatchDecodeSpawnMode::Auto, + Some("auto"), &requested_rows )); } @@ -2819,7 +2790,7 @@ mod tests { fn test_spawn_policy_always() { let requested_rows = RequestedRows::Indices(vec![1]); assert!(should_spawn_structural_batch_decode_tasks_with_mode( - StructuralBatchDecodeSpawnMode::Always, + Some("always"), &requested_rows )); } @@ -2828,7 +2799,16 @@ mod tests { fn test_spawn_policy_never() { let requested_rows = RequestedRows::Ranges(vec![0..100]); assert!(!should_spawn_structural_batch_decode_tasks_with_mode( - StructuralBatchDecodeSpawnMode::Never, + Some("never"), + &requested_rows + )); + } + + #[test] + fn test_spawn_policy_unknown_defaults_to_auto() { + let requested_rows = RequestedRows::Ranges(vec![0..100]); + assert!(should_spawn_structural_batch_decode_tasks_with_mode( + Some("something-else"), &requested_rows )); } From 41d077a6e29b359fd1f1599fecceda3e194712ff Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 16:34:32 +0800 Subject: [PATCH 08/11] refactor: inline structural spawn selection logic --- rust/lance-encoding/src/decoder.rs | 105 +++-------------------------- rust/lance-encoding/src/testing.rs | 1 + 2 files changed, 11 insertions(+), 95 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 3bf95462420..51baa3f5bd6 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1874,27 +1874,6 @@ pub struct SchedulerDecoderConfig { pub decoder_config: DecoderConfig, } -fn should_spawn_structural_batch_decode_tasks_with_mode( - mode: Option<&str>, - requested_rows: &RequestedRows, -) -> bool { - match mode { - Some("always") => true, - Some("never") => false, - _ => matches!(requested_rows, RequestedRows::Ranges(_)), - } -} - -fn should_spawn_structural_batch_decode_tasks(requested_rows: &RequestedRows) -> bool { - let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); - should_spawn_structural_batch_decode_tasks_with_mode(mode.ok().as_deref(), requested_rows) -} - -fn should_spawn_structural_batch_decode_tasks_without_request() -> bool { - let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); - !matches!(mode.ok().as_deref(), Some("never")) -} - fn check_scheduler_on_drop( stream: BoxStream<'static, ReadBatchTask>, scheduler_handle: tokio::task::JoinHandle<()>, @@ -1916,28 +1895,6 @@ fn check_scheduler_on_drop( } pub fn create_decode_stream( - schema: &Schema, - num_rows: u64, - batch_size: u32, - is_structural: bool, - should_validate: bool, - rx: mpsc::UnboundedReceiver>, -) -> Result> { - // Keep existing behavior for contextless call sites (historically we spawned). - let spawn_structural_batch_decode_tasks = - should_spawn_structural_batch_decode_tasks_without_request(); - create_decode_stream_with_spawn( - schema, - num_rows, - batch_size, - is_structural, - should_validate, - spawn_structural_batch_decode_tasks, - rx, - ) -} - -fn create_decode_stream_with_spawn( schema: &Schema, num_rows: u64, batch_size: u32, @@ -2016,12 +1973,16 @@ fn create_scheduler_decoder( let num_rows = requested_rows.num_rows(); let is_structural = column_infos[0].is_structural(); - let spawn_structural_batch_decode_tasks = - should_spawn_structural_batch_decode_tasks(&requested_rows); + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() { + Some("always") => true, + Some("never") => false, + _ => matches!(requested_rows, RequestedRows::Ranges(_)), + }; let (tx, rx) = mpsc::unbounded_channel(); - let decode_stream = create_decode_stream_with_spawn( + let decode_stream = create_decode_stream( &target_schema, num_rows, config.batch_size, @@ -2727,10 +2688,9 @@ pub async fn decode_batch( let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); let is_structural = version >= LanceFileVersion::V2_1; - let requested_rows = RequestedRows::Ranges(vec![0..batch.num_rows]); - let spawn_structural_batch_decode_tasks = - should_spawn_structural_batch_decode_tasks(&requested_rows); - let mut decode_stream = create_decode_stream_with_spawn( + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never")); + let mut decode_stream = create_decode_stream( &batch.schema, batch.num_rows, batch.num_rows as u32, @@ -2767,49 +2727,4 @@ mod tests { let ranges = DecodeBatchScheduler::indices_to_ranges(&indices); assert_eq!(ranges, vec![1..4, 5..8, 9..10]); } - - #[test] - fn test_spawn_policy_auto_ranges() { - let requested_rows = RequestedRows::Ranges(vec![0..100]); - assert!(should_spawn_structural_batch_decode_tasks_with_mode( - Some("auto"), - &requested_rows - )); - } - - #[test] - fn test_spawn_policy_auto_indices_never_spawn() { - let requested_rows = RequestedRows::Indices(vec![1, 3, 5]); - assert!(!should_spawn_structural_batch_decode_tasks_with_mode( - Some("auto"), - &requested_rows - )); - } - - #[test] - fn test_spawn_policy_always() { - let requested_rows = RequestedRows::Indices(vec![1]); - assert!(should_spawn_structural_batch_decode_tasks_with_mode( - Some("always"), - &requested_rows - )); - } - - #[test] - fn test_spawn_policy_never() { - let requested_rows = RequestedRows::Ranges(vec![0..100]); - assert!(!should_spawn_structural_batch_decode_tasks_with_mode( - Some("never"), - &requested_rows - )); - } - - #[test] - fn test_spawn_policy_unknown_defaults_to_auto() { - let requested_rows = RequestedRows::Ranges(vec![0..100]); - assert!(should_spawn_structural_batch_decode_tasks_with_mode( - Some("something-else"), - &requested_rows - )); - } } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 37df889035f..8bed74506df 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -216,6 +216,7 @@ async fn test_decode( batch_size, is_structural_encoding, /*should_validate=*/ true, + /*spawn_structural_batch_decode_tasks=*/ is_structural_encoding, rx, ) .unwrap(); From 62df58a938e5e650f61c97cb753cba2fd3cb6001 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 24 Feb 2026 17:03:46 +0800 Subject: [PATCH 09/11] refactor: minimize structural decode spawn policy logic --- rust/lance-encoding/src/decoder.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 51baa3f5bd6..251f2c99770 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1691,6 +1691,13 @@ pub struct StructuralBatchDecodeStream { rows_drained: u64, scheduler_exhausted: bool, emitted_batch_size_warning: Arc, + // Decode scheduling policy selected at planning time. + // + // Performance tradeoff: + // - true: spawn `into_batch` onto Tokio, which improves scan throughput by allowing + // more decode parallelism. + // - false: run `into_batch` inline, which avoids Tokio scheduling overhead and is + // typically better for point lookups / small takes. spawn_batch_decode_tasks: bool, } @@ -1798,13 +1805,12 @@ impl StructuralBatchDecodeStream { let next_task = next_task.transpose().map(|next_task| { let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0); let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); + // Capture the per-stream policy once so every emitted batch task follows the + // same throughput-vs-overhead choice made by the scheduler. let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks; let task = async move { let next_task = next_task?; if spawn_batch_decode_tasks { - // Real decode work happens inside into_batch, which can block the current - // thread for a long time. By spawning it as a new task, we allow Tokio's - // worker threads to keep making progress. tokio::spawn( async move { next_task.into_batch(emitted_batch_size_warning) }, ) From 0a9c2507a51fbdd8dd9e9681ee224aeaa8637c4f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 26 Feb 2026 01:29:09 +0800 Subject: [PATCH 10/11] fix: pass spawn flag to decode stream benchmark --- rust/lance-encoding/benches/decoder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index c59ef23820a..1b64f8fe711 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -535,6 +535,7 @@ fn bench_decode_compressed_parallel(c: &mut Criterion) { BATCH_SIZE, true, // is_structural for V2_2 false, + false, rx, ) .unwrap(); From 7beb3473fce6ef7daaf05e67bf74bb2c1c6028f7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 26 Feb 2026 02:12:48 +0800 Subject: [PATCH 11/11] test: tolerate clock skew in credential cache ttl --- .../src/credentials/cache.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/rust/lance-namespace-impls/src/credentials/cache.rs b/rust/lance-namespace-impls/src/credentials/cache.rs index 6e7c6c4dcf7..4f59bd14f80 100644 --- a/rust/lance-namespace-impls/src/credentials/cache.rs +++ b/rust/lance-namespace-impls/src/credentials/cache.rs @@ -333,6 +333,21 @@ mod tests { #[test] fn test_calculate_cache_ttl() { + const CLOCK_SKEW_TOLERANCE_SECS: u64 = 2; + + fn assert_ttl_close_to(ttl: Option, expected_secs: u64) { + let actual_secs = ttl.map(|duration| duration.as_secs()); + assert!( + matches!( + actual_secs, + Some(actual) + if actual <= expected_secs + && expected_secs.saturating_sub(actual) <= CLOCK_SKEW_TOLERANCE_SECS + ), + "expected ttl close to {expected_secs}s, got {actual_secs:?}" + ); + } + let now_millis = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() @@ -341,12 +356,12 @@ mod tests { // Credentials with 1 hour remaining -> TTL should be 30 minutes (capped) let creds_1h = VendedCredentials::new(HashMap::new(), now_millis + 3600 * 1000); let ttl = CachingCredentialVendor::calculate_cache_ttl(&creds_1h); - assert_eq!(ttl, Some(Duration::from_secs(MAX_CACHE_TTL_SECS))); + assert_ttl_close_to(ttl, MAX_CACHE_TTL_SECS); // Credentials with 10 minutes remaining -> TTL should be 5 minutes let creds_10m = VendedCredentials::new(HashMap::new(), now_millis + 10 * 60 * 1000); let ttl = CachingCredentialVendor::calculate_cache_ttl(&creds_10m); - assert_eq!(ttl, Some(Duration::from_secs(5 * 60))); + assert_ttl_close_to(ttl, 5 * 60); // Credentials with 1 minute remaining -> TTL should be None (too short) let creds_1m = VendedCredentials::new(HashMap::new(), now_millis + 60 * 1000);