diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index fc5f636f5b1..78079ac30d0 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -101,35 +101,65 @@ use lance_datafusion::substrait::parse_substrait; use snafu::location; pub(crate) const BATCH_SIZE_FALLBACK: usize = 8192; + +/// Parse an environment variable as a specific type, logging a warning on parse failure. +fn parse_env_var(env_var_name: &str, default_val: &str) -> Option +where + T::Err: std::fmt::Display, +{ + std::env::var(env_var_name) + .ok() + .and_then(|val| match val.parse() { + Ok(value) => Some(value), + Err(e) => { + log::warn!( + "Failed to parse the environment variable {}='{}': {}, the default value is: {}.", + env_var_name, + val, + e, + default_val + ); + None + } + }) +} + // For backwards compatibility / historical reasons we re-calculate the default batch size // on each call pub fn get_default_batch_size() -> Option { - std::env::var("LANCE_DEFAULT_BATCH_SIZE") - .map(|val| Some(val.parse().unwrap())) - .unwrap_or(None) + parse_env_var("LANCE_DEFAULT_BATCH_SIZE", &BATCH_SIZE_FALLBACK.to_string()) } pub const LEGACY_DEFAULT_FRAGMENT_READAHEAD: usize = 4; pub static DEFAULT_FRAGMENT_READAHEAD: LazyLock> = LazyLock::new(|| { - std::env::var("LANCE_DEFAULT_FRAGMENT_READAHEAD") - .map(|val| Some(val.parse().unwrap())) - .unwrap_or(None) + parse_env_var( + "LANCE_DEFAULT_FRAGMENT_READAHEAD", + &LEGACY_DEFAULT_FRAGMENT_READAHEAD.to_string(), + ) }); +const DEFAULT_XTR_OVERFETCH_VALUE: u32 = 10; + pub static DEFAULT_XTR_OVERFETCH: LazyLock = LazyLock::new(|| { - std::env::var("LANCE_XTR_OVERFETCH") - .map(|val| val.parse().unwrap()) - .unwrap_or(10) + parse_env_var( + "LANCE_XTR_OVERFETCH", + &DEFAULT_XTR_OVERFETCH_VALUE.to_string(), + ) + .unwrap_or(DEFAULT_XTR_OVERFETCH_VALUE) }); // We want to support ~256 concurrent reads to maximize throughput on cloud storage systems // Our typical page size is 8MiB (though not all reads are this large yet due to offset buffers, validity buffers, etc.) // So we want to support 256 * 8MiB ~= 2GiB of queued reads +const DEFAULT_IO_BUFFER_SIZE_VALUE: u64 = 2 * 1024 * 1024 * 1024; + pub static DEFAULT_IO_BUFFER_SIZE: LazyLock = LazyLock::new(|| { - std::env::var("LANCE_DEFAULT_IO_BUFFER_SIZE") - .map(|val| val.parse().unwrap()) - .unwrap_or(2 * 1024 * 1024 * 1024) + parse_env_var( + "LANCE_DEFAULT_IO_BUFFER_SIZE", + &DEFAULT_IO_BUFFER_SIZE_VALUE.to_string(), + ) + .unwrap_or(DEFAULT_IO_BUFFER_SIZE_VALUE) }); /// Defines an ordering for a single column @@ -4247,6 +4277,73 @@ mod test { assert_plan_node_equals, DatagenExt, FragmentCount, FragmentRowCount, ThrottledStoreWrapper, }; + #[test] + fn test_env_var_parsing() { + // Test that invalid environment variable values don't panic + + // Test invalid LANCE_DEFAULT_BATCH_SIZE + std::env::set_var("LANCE_DEFAULT_BATCH_SIZE", "not_a_number"); + let result = get_default_batch_size(); + assert_eq!(result, None, "Should return None for invalid batch size"); + + // Test valid LANCE_DEFAULT_BATCH_SIZE + std::env::set_var("LANCE_DEFAULT_BATCH_SIZE", "2048"); + let result = get_default_batch_size(); + assert_eq!(result, Some(2048), "Should parse valid batch size"); + + // Test unset LANCE_DEFAULT_BATCH_SIZE + std::env::remove_var("LANCE_DEFAULT_BATCH_SIZE"); + let result = get_default_batch_size(); + assert_eq!(result, None, "Should return None when env var is not set"); + } + + #[test] + fn test_parse_env_var() { + // Test parse_env_var with different types to ensure full coverage + + // Test with a unique env var name to avoid conflicts + let test_var = "LANCE_TEST_PARSE_ENV_VAR_USIZE"; + + // Test valid usize parsing + std::env::set_var(test_var, "12345"); + let result: Option = parse_env_var(test_var, "Using default."); + assert_eq!(result, Some(12345)); + + // Test invalid usize parsing (triggers warning log) + std::env::set_var(test_var, "not_a_number"); + let result: Option = parse_env_var(test_var, "Using default."); + assert_eq!(result, None); + + // Test unset env var + std::env::remove_var(test_var); + let result: Option = parse_env_var(test_var, "Using default."); + assert_eq!(result, None); + + // Test with u32 type + let test_var_u32 = "LANCE_TEST_PARSE_ENV_VAR_U32"; + std::env::set_var(test_var_u32, "42"); + let result: Option = parse_env_var(test_var_u32, "Using default value."); + assert_eq!(result, Some(42)); + + std::env::set_var(test_var_u32, "invalid"); + let result: Option = parse_env_var(test_var_u32, "Using default value."); + assert_eq!(result, None); + + std::env::remove_var(test_var_u32); + + // Test with u64 type + let test_var_u64 = "LANCE_TEST_PARSE_ENV_VAR_U64"; + std::env::set_var(test_var_u64, "9999999999"); + let result: Option = parse_env_var(test_var_u64, "Using default value."); + assert_eq!(result, Some(9999999999)); + + std::env::set_var(test_var_u64, "-1"); + let result: Option = parse_env_var(test_var_u64, "Using default value."); + assert_eq!(result, None); + + std::env::remove_var(test_var_u64); + } + async fn make_binary_vector_dataset() -> Result<(TempStrDir, Dataset)> { let tmp_dir = TempStrDir::default(); let dim = 4;