Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 109 additions & 12 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,35 +101,65 @@ use lance_datafusion::substrait::parse_substrait;
use snafu::location;

pub(crate) const BATCH_SIZE_FALLBACK: usize = 8192;

/// Parse an environment variable as a specific type, logging a warning on parse failure.
fn parse_env_var<T: std::str::FromStr>(env_var_name: &str, default_val: &str) -> Option<T>
where
T::Err: std::fmt::Display,
{
std::env::var(env_var_name)
.ok()
.and_then(|val| match val.parse() {
Ok(value) => Some(value),
Err(e) => {
log::warn!(
"Failed to parse the environment variable {}='{}': {}, the default value is: {}.",
env_var_name,
val,
e,
default_val
);
None
}
})
}

// For backwards compatibility / historical reasons we re-calculate the default batch size
// on each call
pub fn get_default_batch_size() -> Option<usize> {
std::env::var("LANCE_DEFAULT_BATCH_SIZE")
.map(|val| Some(val.parse().unwrap()))
.unwrap_or(None)
parse_env_var("LANCE_DEFAULT_BATCH_SIZE", &BATCH_SIZE_FALLBACK.to_string())
}

pub const LEGACY_DEFAULT_FRAGMENT_READAHEAD: usize = 4;

pub static DEFAULT_FRAGMENT_READAHEAD: LazyLock<Option<usize>> = LazyLock::new(|| {
std::env::var("LANCE_DEFAULT_FRAGMENT_READAHEAD")
.map(|val| Some(val.parse().unwrap()))
.unwrap_or(None)
parse_env_var(
"LANCE_DEFAULT_FRAGMENT_READAHEAD",
&LEGACY_DEFAULT_FRAGMENT_READAHEAD.to_string(),
)
});

const DEFAULT_XTR_OVERFETCH_VALUE: u32 = 10;

pub static DEFAULT_XTR_OVERFETCH: LazyLock<u32> = LazyLock::new(|| {
std::env::var("LANCE_XTR_OVERFETCH")
.map(|val| val.parse().unwrap())
.unwrap_or(10)
parse_env_var(
"LANCE_XTR_OVERFETCH",
&DEFAULT_XTR_OVERFETCH_VALUE.to_string(),
)
.unwrap_or(DEFAULT_XTR_OVERFETCH_VALUE)
});

// We want to support ~256 concurrent reads to maximize throughput on cloud storage systems
// Our typical page size is 8MiB (though not all reads are this large yet due to offset buffers, validity buffers, etc.)
// So we want to support 256 * 8MiB ~= 2GiB of queued reads
const DEFAULT_IO_BUFFER_SIZE_VALUE: u64 = 2 * 1024 * 1024 * 1024;

pub static DEFAULT_IO_BUFFER_SIZE: LazyLock<u64> = LazyLock::new(|| {
std::env::var("LANCE_DEFAULT_IO_BUFFER_SIZE")
.map(|val| val.parse().unwrap())
.unwrap_or(2 * 1024 * 1024 * 1024)
parse_env_var(
"LANCE_DEFAULT_IO_BUFFER_SIZE",
&DEFAULT_IO_BUFFER_SIZE_VALUE.to_string(),
)
.unwrap_or(DEFAULT_IO_BUFFER_SIZE_VALUE)
});

/// Defines an ordering for a single column
Expand Down Expand Up @@ -4247,6 +4277,73 @@ mod test {
assert_plan_node_equals, DatagenExt, FragmentCount, FragmentRowCount, ThrottledStoreWrapper,
};

#[test]
fn test_env_var_parsing() {
// Test that invalid environment variable values don't panic

// Test invalid LANCE_DEFAULT_BATCH_SIZE
std::env::set_var("LANCE_DEFAULT_BATCH_SIZE", "not_a_number");
let result = get_default_batch_size();
assert_eq!(result, None, "Should return None for invalid batch size");

// Test valid LANCE_DEFAULT_BATCH_SIZE
std::env::set_var("LANCE_DEFAULT_BATCH_SIZE", "2048");
let result = get_default_batch_size();
assert_eq!(result, Some(2048), "Should parse valid batch size");

// Test unset LANCE_DEFAULT_BATCH_SIZE
std::env::remove_var("LANCE_DEFAULT_BATCH_SIZE");
let result = get_default_batch_size();
assert_eq!(result, None, "Should return None when env var is not set");
}

#[test]
fn test_parse_env_var() {
// Test parse_env_var with different types to ensure full coverage

// Test with a unique env var name to avoid conflicts
let test_var = "LANCE_TEST_PARSE_ENV_VAR_USIZE";

// Test valid usize parsing
std::env::set_var(test_var, "12345");
let result: Option<usize> = parse_env_var(test_var, "Using default.");
assert_eq!(result, Some(12345));

// Test invalid usize parsing (triggers warning log)
std::env::set_var(test_var, "not_a_number");
let result: Option<usize> = parse_env_var(test_var, "Using default.");
assert_eq!(result, None);

// Test unset env var
std::env::remove_var(test_var);
let result: Option<usize> = parse_env_var(test_var, "Using default.");
assert_eq!(result, None);

// Test with u32 type
let test_var_u32 = "LANCE_TEST_PARSE_ENV_VAR_U32";
std::env::set_var(test_var_u32, "42");
let result: Option<u32> = parse_env_var(test_var_u32, "Using default value.");
assert_eq!(result, Some(42));

std::env::set_var(test_var_u32, "invalid");
let result: Option<u32> = parse_env_var(test_var_u32, "Using default value.");
assert_eq!(result, None);

std::env::remove_var(test_var_u32);

// Test with u64 type
let test_var_u64 = "LANCE_TEST_PARSE_ENV_VAR_U64";
std::env::set_var(test_var_u64, "9999999999");
let result: Option<u64> = parse_env_var(test_var_u64, "Using default value.");
assert_eq!(result, Some(9999999999));

std::env::set_var(test_var_u64, "-1");
let result: Option<u64> = parse_env_var(test_var_u64, "Using default value.");
assert_eq!(result, None);

std::env::remove_var(test_var_u64);
}

async fn make_binary_vector_dataset() -> Result<(TempStrDir, Dataset)> {
let tmp_dir = TempStrDir::default();
let dim = 4;
Expand Down
Loading