Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/lance-encoding/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ fn bench_decode_compressed_parallel(c: &mut Criterion) {
BATCH_SIZE,
true, // is_structural for V2_2
false,
false,
rx,
)
.unwrap();
Expand Down
49 changes: 45 additions & 4 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ use crate::{BufferScheduler, EncodingsIo};

// If users are getting batches over 10MiB large then it's time to reduce the batch size
const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
"LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";

/// Top-level encoding message for a page. Wraps both the
/// legacy pb::ArrayEncoding and the newer pb::PageLayout
Expand Down Expand Up @@ -1689,6 +1691,14 @@ pub struct StructuralBatchDecodeStream {
rows_drained: u64,
scheduler_exhausted: bool,
emitted_batch_size_warning: Arc<Once>,
// Decode scheduling policy selected at planning time.
//
// Performance tradeoff:
// - true: spawn `into_batch` onto Tokio, which improves scan throughput by allowing
// more decode parallelism.
// - false: run `into_batch` inline, which avoids Tokio scheduling overhead and is
// typically better for point lookups / small takes.
spawn_batch_decode_tasks: bool,
}

impl StructuralBatchDecodeStream {
Expand All @@ -1706,6 +1716,7 @@ impl StructuralBatchDecodeStream {
rows_per_batch: u32,
num_rows: u64,
root_decoder: StructuralStructDecoder,
spawn_batch_decode_tasks: bool,
) -> Self {
Self {
context: DecoderContext::new(scheduled),
Expand All @@ -1716,6 +1727,7 @@ impl StructuralBatchDecodeStream {
rows_drained: 0,
scheduler_exhausted: false,
emitted_batch_size_warning: Arc::new(Once::new()),
spawn_batch_decode_tasks,
}
}

Expand Down Expand Up @@ -1793,9 +1805,23 @@ impl StructuralBatchDecodeStream {
let next_task = next_task.transpose().map(|next_task| {
let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
// Capture the per-stream policy once so every emitted batch task follows the
// same throughput-vs-overhead choice made by the scheduler.
let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
let task = async move {
let next_task = next_task?;
async move { next_task.into_batch(emitted_batch_size_warning) }.await
if spawn_batch_decode_tasks {
tokio::spawn(
async move { next_task.into_batch(emitted_batch_size_warning) },
)
.await
.map_err(|err| Error::Wrapped {
error: err.into(),
location: location!(),
})?
} else {
next_task.into_batch(emitted_batch_size_warning)
}
};
(task, num_rows)
});
Expand Down Expand Up @@ -1880,6 +1906,7 @@ pub fn create_decode_stream(
batch_size: u32,
is_structural: bool,
should_validate: bool,
spawn_structural_batch_decode_tasks: bool,
rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
) -> Result<BoxStream<'static, ReadBatchTask>> {
if is_structural {
Expand All @@ -1889,10 +1916,14 @@ pub fn create_decode_stream(
should_validate,
/*is_root=*/ true,
)?;
Ok(
StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder)
.into_stream(),
Ok(StructuralBatchDecodeStream::new(
rx,
batch_size,
num_rows,
structural_decoder,
spawn_structural_batch_decode_tasks,
)
.into_stream())
} else {
let arrow_schema = ArrowSchema::from(schema);
let root_fields = arrow_schema.fields;
Expand Down Expand Up @@ -1948,6 +1979,12 @@ fn create_scheduler_decoder(
let num_rows = requested_rows.num_rows();

let is_structural = column_infos[0].is_structural();
let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
Some("always") => true,
Some("never") => false,
_ => matches!(requested_rows, RequestedRows::Ranges(_)),
};

let (tx, rx) = mpsc::unbounded_channel();

Expand All @@ -1957,6 +1994,7 @@ fn create_scheduler_decoder(
config.batch_size,
is_structural,
config.decoder_config.validate_on_decode,
spawn_structural_batch_decode_tasks,
rx,
)?;

Expand Down Expand Up @@ -2656,12 +2694,15 @@ pub async fn decode_batch(
let (tx, rx) = unbounded_channel();
decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
let is_structural = version >= LanceFileVersion::V2_1;
let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
let mut decode_stream = create_decode_stream(
&batch.schema,
batch.num_rows,
batch.num_rows as u32,
is_structural,
should_validate,
spawn_structural_batch_decode_tasks,
rx,
)?;
decode_stream.next().await.unwrap().task.await
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ async fn test_decode(
batch_size,
is_structural_encoding,
/*should_validate=*/ true,
/*spawn_structural_batch_decode_tasks=*/ is_structural_encoding,
rx,
)
.unwrap();
Expand Down
19 changes: 17 additions & 2 deletions rust/lance-namespace-impls/src/credentials/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,21 @@ mod tests {

#[test]
fn test_calculate_cache_ttl() {
const CLOCK_SKEW_TOLERANCE_SECS: u64 = 2;

fn assert_ttl_close_to(ttl: Option<Duration>, expected_secs: u64) {
let actual_secs = ttl.map(|duration| duration.as_secs());
assert!(
matches!(
actual_secs,
Some(actual)
if actual <= expected_secs
&& expected_secs.saturating_sub(actual) <= CLOCK_SKEW_TOLERANCE_SECS
),
"expected ttl close to {expected_secs}s, got {actual_secs:?}"
);
}

let now_millis = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
Expand All @@ -341,12 +356,12 @@ mod tests {
// Credentials with 1 hour remaining -> TTL should be 30 minutes (capped)
let creds_1h = VendedCredentials::new(HashMap::new(), now_millis + 3600 * 1000);
let ttl = CachingCredentialVendor::calculate_cache_ttl(&creds_1h);
assert_eq!(ttl, Some(Duration::from_secs(MAX_CACHE_TTL_SECS)));
assert_ttl_close_to(ttl, MAX_CACHE_TTL_SECS);

// Credentials with 10 minutes remaining -> TTL should be 5 minutes
let creds_10m = VendedCredentials::new(HashMap::new(), now_millis + 10 * 60 * 1000);
let ttl = CachingCredentialVendor::calculate_cache_ttl(&creds_10m);
assert_eq!(ttl, Some(Duration::from_secs(5 * 60)));
assert_ttl_close_to(ttl, 5 * 60);

// Credentials with 1 minute remaining -> TTL should be None (too short)
let creds_1m = VendedCredentials::new(HashMap::new(), now_millis + 60 * 1000);
Expand Down