diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index c59ef23820a..1b64f8fe711 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -535,6 +535,7 @@ fn bench_decode_compressed_parallel(c: &mut Criterion) { BATCH_SIZE, true, // is_structural for V2_2 false, + false, rx, ) .unwrap(); diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 67996c41f3c..251f2c99770 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -259,6 +259,8 @@ use crate::{BufferScheduler, EncodingsIo}; // If users are getting batches over 10MiB large then it's time to reduce the batch size const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; +const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str = + "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE"; /// Top-level encoding message for a page. Wraps both the /// legacy pb::ArrayEncoding and the newer pb::PageLayout @@ -1689,6 +1691,14 @@ pub struct StructuralBatchDecodeStream { rows_drained: u64, scheduler_exhausted: bool, emitted_batch_size_warning: Arc, + // Decode scheduling policy selected at planning time. + // + // Performance tradeoff: + // - true: spawn `into_batch` onto Tokio, which improves scan throughput by allowing + // more decode parallelism. + // - false: run `into_batch` inline, which avoids Tokio scheduling overhead and is + // typically better for point lookups / small takes. + spawn_batch_decode_tasks: bool, } impl StructuralBatchDecodeStream { @@ -1706,6 +1716,7 @@ impl StructuralBatchDecodeStream { rows_per_batch: u32, num_rows: u64, root_decoder: StructuralStructDecoder, + spawn_batch_decode_tasks: bool, ) -> Self { Self { context: DecoderContext::new(scheduled), @@ -1716,6 +1727,7 @@ impl StructuralBatchDecodeStream { rows_drained: 0, scheduler_exhausted: false, emitted_batch_size_warning: Arc::new(Once::new()), + spawn_batch_decode_tasks, } } @@ -1793,9 +1805,23 @@ impl StructuralBatchDecodeStream { let next_task = next_task.transpose().map(|next_task| { let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0); let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); + // Capture the per-stream policy once so every emitted batch task follows the + // same throughput-vs-overhead choice made by the scheduler. + let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks; let task = async move { let next_task = next_task?; - async move { next_task.into_batch(emitted_batch_size_warning) }.await + if spawn_batch_decode_tasks { + tokio::spawn( + async move { next_task.into_batch(emitted_batch_size_warning) }, + ) + .await + .map_err(|err| Error::Wrapped { + error: err.into(), + location: location!(), + })? + } else { + next_task.into_batch(emitted_batch_size_warning) + } }; (task, num_rows) }); @@ -1880,6 +1906,7 @@ pub fn create_decode_stream( batch_size: u32, is_structural: bool, should_validate: bool, + spawn_structural_batch_decode_tasks: bool, rx: mpsc::UnboundedReceiver>, ) -> Result> { if is_structural { @@ -1889,10 +1916,14 @@ pub fn create_decode_stream( should_validate, /*is_root=*/ true, )?; - Ok( - StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder) - .into_stream(), + Ok(StructuralBatchDecodeStream::new( + rx, + batch_size, + num_rows, + structural_decoder, + spawn_structural_batch_decode_tasks, ) + .into_stream()) } else { let arrow_schema = ArrowSchema::from(schema); let root_fields = arrow_schema.fields; @@ -1948,6 +1979,12 @@ fn create_scheduler_decoder( let num_rows = requested_rows.num_rows(); let is_structural = column_infos[0].is_structural(); + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() { + Some("always") => true, + Some("never") => false, + _ => matches!(requested_rows, RequestedRows::Ranges(_)), + }; let (tx, rx) = mpsc::unbounded_channel(); @@ -1957,6 +1994,7 @@ fn create_scheduler_decoder( config.batch_size, is_structural, config.decoder_config.validate_on_decode, + spawn_structural_batch_decode_tasks, rx, )?; @@ -2656,12 +2694,15 @@ pub async fn decode_batch( let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); let is_structural = version >= LanceFileVersion::V2_1; + let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE); + let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never")); let mut decode_stream = create_decode_stream( &batch.schema, batch.num_rows, batch.num_rows as u32, is_structural, should_validate, + spawn_structural_batch_decode_tasks, rx, )?; decode_stream.next().await.unwrap().task.await diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 37df889035f..8bed74506df 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -216,6 +216,7 @@ async fn test_decode( batch_size, is_structural_encoding, /*should_validate=*/ true, + /*spawn_structural_batch_decode_tasks=*/ is_structural_encoding, rx, ) .unwrap(); diff --git a/rust/lance-namespace-impls/src/credentials/cache.rs b/rust/lance-namespace-impls/src/credentials/cache.rs index 6e7c6c4dcf7..4f59bd14f80 100644 --- a/rust/lance-namespace-impls/src/credentials/cache.rs +++ b/rust/lance-namespace-impls/src/credentials/cache.rs @@ -333,6 +333,21 @@ mod tests { #[test] fn test_calculate_cache_ttl() { + const CLOCK_SKEW_TOLERANCE_SECS: u64 = 2; + + fn assert_ttl_close_to(ttl: Option, expected_secs: u64) { + let actual_secs = ttl.map(|duration| duration.as_secs()); + assert!( + matches!( + actual_secs, + Some(actual) + if actual <= expected_secs + && expected_secs.saturating_sub(actual) <= CLOCK_SKEW_TOLERANCE_SECS + ), + "expected ttl close to {expected_secs}s, got {actual_secs:?}" + ); + } + let now_millis = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() @@ -341,12 +356,12 @@ mod tests { // Credentials with 1 hour remaining -> TTL should be 30 minutes (capped) let creds_1h = VendedCredentials::new(HashMap::new(), now_millis + 3600 * 1000); let ttl = CachingCredentialVendor::calculate_cache_ttl(&creds_1h); - assert_eq!(ttl, Some(Duration::from_secs(MAX_CACHE_TTL_SECS))); + assert_ttl_close_to(ttl, MAX_CACHE_TTL_SECS); // Credentials with 10 minutes remaining -> TTL should be 5 minutes let creds_10m = VendedCredentials::new(HashMap::new(), now_millis + 10 * 60 * 1000); let ttl = CachingCredentialVendor::calculate_cache_ttl(&creds_10m); - assert_eq!(ttl, Some(Duration::from_secs(5 * 60))); + assert_ttl_close_to(ttl, 5 * 60); // Credentials with 1 minute remaining -> TTL should be None (too short) let creds_1m = VendedCredentials::new(HashMap::new(), now_millis + 60 * 1000);