diff --git a/docs/src/guide/object_store.md b/docs/src/guide/object_store.md index a1f0c2341ef..1710e3b5100 100644 --- a/docs/src/guide/object_store.md +++ b/docs/src/guide/object_store.md @@ -38,8 +38,8 @@ These options apply to all object stores. | `proxy_url` | URL of a proxy server to use for requests. Default, `None`. | | `proxy_ca_certificate` | PEM-formatted CA certificate for proxy connections | | `proxy_excludes` | List of hosts that bypass proxy. This is a comma separated list of domains and IP masks. Any subdomain of the provided domain will be bypassed. For example, `example.com, 192.168.1.0/24` would bypass `https://api.example.com`, `https://www.example.com`, and any IP in the range `192.168.1.0/24`. | -| `client_max_retries` | Number of times for a s3 client to retry the request. Default, `10`. | -| `client_retry_timeout` | Timeout for a s3 client to retry the request in seconds. Default, `180`. | +| `client_max_retries` | Number of times for the object store client to retry the request. Default, `3`. | +| `client_retry_timeout` | Timeout for the object store client to retry the request in seconds. Default, `180`. | ## S3 Configuration diff --git a/docs/src/guide/performance.md b/docs/src/guide/performance.md index 11ca2e23b72..fb94b94e7ed 100644 --- a/docs/src/guide/performance.md +++ b/docs/src/guide/performance.md @@ -163,11 +163,31 @@ In summary, scans could use up to `(2 * io_buffer_size) + (batch_size * num_comp Keep in mind that `io_buffer_size` is a soft limit (e.g. we cannot read less than one page at a time right now) and so it is not necessarily a bug if you see memory usage exceed this limit by a small margin. -The above limits refer to limits per-scan. There is an additional limit on the number of IOPS that is applied -across the entire process. This limit is specified by the `LANCE_PROCESS_IO_THREADS_LIMIT` environment variable. -The default is 128 which is more than enough for most workloads. You can increase this limit if you are working -with a high-throughput workload. You can even disable this limit entirely by setting it to zero. Note that this -can often lead to issues with excessive retries and timeouts from the object store. +### Cloud Store Throttling + +Cloud object stores (S3, GCS, Azure) are automatically wrapped with an AIMD (Additive Increase / Multiplicative +Decrease) rate limiter. When the store returns throttle errors (HTTP 429/503), the request rate decreases +multiplicatively. During sustained success, the rate increases additively. This applies to all operations +(reads, writes, deletes, lists) and replaces the old `LANCE_PROCESS_IO_THREADS_LIMIT` process-wide cap. + +Local and in-memory stores are **not** throttled. + +The AIMD throttle can be tuned via storage options or environment variables. Storage options take precedence +over environment variables: + +| Setting | Storage Option Key | Env Var | Default | +| ------------------ | ------------------------------- | ------------------------------- | ------- | +| Initial rate | `lance_aimd_initial_rate` | `LANCE_AIMD_INITIAL_RATE` | 2000 | +| Min rate | `lance_aimd_min_rate` | `LANCE_AIMD_MIN_RATE` | 1 | +| Max rate | `lance_aimd_max_rate` | `LANCE_AIMD_MAX_RATE` | 5000 | +| Decrease factor | `lance_aimd_decrease_factor` | `LANCE_AIMD_DECREASE_FACTOR` | 0.5 | +| Additive increment | `lance_aimd_additive_increment` | `LANCE_AIMD_ADDITIVE_INCREMENT` | 300 | +| Burst capacity | `lance_aimd_burst_capacity` | `LANCE_AIMD_BURST_CAPACITY` | 100 | + +These initial settings are balanced and should work for most +use cases. For example, S3 can typically get up to 5000 +req/s and with these settings we should get there in about +10 seconds. ## Indexes diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index c0d0acf51a2..4eb1e4ff403 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -874,7 +874,7 @@ impl StorageOptions { .iter() .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries")) .and_then(|(_, value)| value.parse::().ok()) - .unwrap_or(10) + .unwrap_or(3) } /// Seconds of timeout to set in RetryConfig for object store client diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 2055a7144d2..a06fcaef140 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -30,6 +30,7 @@ use crate::object_store::{ DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor, StorageOptionsProvider, + throttle::{AimdThrottleConfig, AimdThrottledStore}, }; use lance_core::error::{Error, Result}; @@ -44,12 +45,12 @@ impl AwsStoreProvider { storage_options: &StorageOptions, is_s3_express: bool, ) -> Result> { - let max_retries = storage_options.client_max_retries(); - let retry_timeout = storage_options.client_retry_timeout(); + // Use a low retry count since the AIMD throttle layer handles + // throttle recovery with its own retry loop. let retry_config = RetryConfig { backoff: Default::default(), - max_retries, - retry_timeout: Duration::from_secs(retry_timeout), + max_retries: storage_options.client_max_retries(), + retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()), }; let mut s3_storage_options = storage_options.as_s3_options(); @@ -159,6 +160,19 @@ impl ObjectStoreProvider for AwsStoreProvider { self.build_amazon_s3_store(&mut base_path, params, &storage_options, is_s3_express) .await? }; + let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?; + let inner = if throttle_config.is_disabled() { + inner + } else if storage_options.client_max_retries() == 0 { + log::warn!( + "AIMD throttle disabled: the current implementation relies on the object store \ + client surfacing retry errors, which requires client_max_retries > 0. \ + No throttle or retry layer will be applied." + ); + inner + } else { + Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc + }; Ok(ObjectStore { inner, diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index ba9c8a34831..647d324bd3e 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -21,6 +21,7 @@ use url::Url; use crate::object_store::{ DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, + throttle::{AimdThrottleConfig, AimdThrottledStore}, }; use lance_core::error::{Error, Result}; @@ -113,12 +114,12 @@ impl AzureBlobStoreProvider { base_path: &Url, storage_options: &StorageOptions, ) -> Result> { - let max_retries = storage_options.client_max_retries(); - let retry_timeout = storage_options.client_retry_timeout(); + // Use a low retry count since the AIMD throttle layer handles + // throttle recovery with its own retry loop. let retry_config = RetryConfig { backoff: Default::default(), - max_retries, - retry_timeout: Duration::from_secs(retry_timeout), + max_retries: storage_options.client_max_retries(), + retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()), }; let mut builder = MicrosoftAzureBuilder::new() @@ -163,6 +164,19 @@ impl ObjectStoreProvider for AzureBlobStoreProvider { self.build_microsoft_azure_store(&base_path, &storage_options) .await? }; + let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?; + let inner = if throttle_config.is_disabled() { + inner + } else if storage_options.client_max_retries() == 0 { + log::warn!( + "AIMD throttle disabled: the current implementation relies on the object store \ + client surfacing retry errors, which requires client_max_retries > 0. \ + No throttle or retry layer will be applied." + ); + inner + } else { + Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc + }; Ok(ObjectStore { inner, diff --git a/rust/lance-io/src/object_store/providers/gcp.rs b/rust/lance-io/src/object_store/providers/gcp.rs index 7bfe3715566..f234a8e31ee 100644 --- a/rust/lance-io/src/object_store/providers/gcp.rs +++ b/rust/lance-io/src/object_store/providers/gcp.rs @@ -16,6 +16,7 @@ use url::Url; use crate::object_store::{ DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, + throttle::{AimdThrottleConfig, AimdThrottledStore}, }; use lance_core::error::{Error, Result}; @@ -58,12 +59,12 @@ impl GcsStoreProvider { base_path: &Url, storage_options: &StorageOptions, ) -> Result> { - let max_retries = storage_options.client_max_retries(); - let retry_timeout = storage_options.client_retry_timeout(); + // Use a low retry count since the AIMD throttle layer handles + // throttle recovery with its own retry loop. let retry_config = RetryConfig { backoff: Default::default(), - max_retries, - retry_timeout: Duration::from_secs(retry_timeout), + max_retries: storage_options.client_max_retries(), + retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()), }; let mut builder = GoogleCloudStorageBuilder::new() @@ -108,6 +109,19 @@ impl ObjectStoreProvider for GcsStoreProvider { self.build_google_cloud_store(&base_path, &storage_options) .await? }; + let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?; + let inner = if throttle_config.is_disabled() { + inner + } else if storage_options.client_max_retries() == 0 { + log::warn!( + "AIMD throttle disabled: the current implementation relies on the object store \ + client surfacing retry errors, which requires client_max_retries > 0. \ + No throttle or retry layer will be applied." + ); + inner + } else { + Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc + }; Ok(ObjectStore { inner, diff --git a/rust/lance-io/src/object_store/throttle.rs b/rust/lance-io/src/object_store/throttle.rs index 378bcff4c7d..a632ddda7e0 100644 --- a/rust/lance-io/src/object_store/throttle.rs +++ b/rust/lance-io/src/object_store/throttle.rs @@ -20,6 +20,7 @@ //! let throttled = AimdThrottledStore::new(target, AimdThrottleConfig::default()).unwrap(); //! ``` +use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; use std::ops::Range; use std::sync::Arc; @@ -34,8 +35,9 @@ use object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart, }; +use rand::Rng; use tokio::sync::Mutex; -use tracing::debug; +use tracing::{debug, warn}; /// Check whether an `object_store::Error` represents a throttle response /// (HTTP 429 / 503) from a cloud object store. @@ -81,6 +83,12 @@ pub struct AimdThrottleConfig { pub list: AimdConfig, /// Maximum tokens that can accumulate for bursts (shared across all categories). pub burst_capacity: u32, + /// Maximum number of retries for throttle errors within the AIMD layer. + pub max_retries: usize, + /// Minimum backoff in milliseconds between retry attempts. + pub min_backoff_ms: u64, + /// Maximum backoff in milliseconds between retry attempts. + pub max_backoff_ms: u64, } impl Default for AimdThrottleConfig { @@ -92,6 +100,9 @@ impl Default for AimdThrottleConfig { delete: aimd.clone(), list: aimd, burst_capacity: 100, + max_retries: 3, + min_backoff_ms: 100, + max_backoff_ms: 300, } } } @@ -134,12 +145,157 @@ impl AimdThrottleConfig { Self { list: aimd, ..self } } + /// Returns `true` when the AIMD throttle layer should be bypassed entirely. + pub fn is_disabled(&self) -> bool { + self.max_retries == 0 + } + pub fn with_burst_capacity(self, burst_capacity: u32) -> Self { Self { burst_capacity, ..self } } + + /// Build an `AimdThrottleConfig` from storage options and environment variables. + /// + /// Storage options take precedence over environment variables, which take + /// precedence over defaults. A single AIMD config is applied to all four + /// operation categories (read/write/delete/list). + /// + /// | Setting | Storage Option Key | Env Var | Default | + /// |----------------------|----------------------------------|----------------------------------|---------| + /// | Initial rate | `lance_aimd_initial_rate` | `LANCE_AIMD_INITIAL_RATE` | 2000 | + /// | Min rate | `lance_aimd_min_rate` | `LANCE_AIMD_MIN_RATE` | 1 | + /// | Max rate | `lance_aimd_max_rate` | `LANCE_AIMD_MAX_RATE` | 5000 | + /// | Decrease factor | `lance_aimd_decrease_factor` | `LANCE_AIMD_DECREASE_FACTOR` | 0.5 | + /// | Additive increment | `lance_aimd_additive_increment` | `LANCE_AIMD_ADDITIVE_INCREMENT` | 300 | + /// | Burst capacity | `lance_aimd_burst_capacity` | `LANCE_AIMD_BURST_CAPACITY` | 100 | + /// | Max retries | `lance_aimd_max_retries` | `LANCE_AIMD_MAX_RETRIES` | 3 | + /// | Min backoff ms | `lance_aimd_min_backoff_ms` | `LANCE_AIMD_MIN_BACKOFF_MS` | 100 | + /// | Max backoff ms | `lance_aimd_max_backoff_ms` | `LANCE_AIMD_MAX_BACKOFF_MS` | 300 | + pub fn from_storage_options( + storage_options: Option<&HashMap>, + ) -> lance_core::Result { + fn resolve_f64( + key: &str, + storage_options: Option<&HashMap>, + default: f64, + ) -> lance_core::Result { + let env_key = key.to_ascii_uppercase(); + if let Some(val) = storage_options.and_then(|opts| opts.get(key)) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for storage option '{key}': '{val}'" + )) + }) + } else if let Ok(val) = std::env::var(&env_key) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for env var '{env_key}': '{val}'" + )) + }) + } else { + Ok(default) + } + } + + fn resolve_u32( + key: &str, + storage_options: Option<&HashMap>, + default: u32, + ) -> lance_core::Result { + let env_key = key.to_ascii_uppercase(); + if let Some(val) = storage_options.and_then(|opts| opts.get(key)) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for storage option '{key}': '{val}'" + )) + }) + } else if let Ok(val) = std::env::var(&env_key) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for env var '{env_key}': '{val}'" + )) + }) + } else { + Ok(default) + } + } + + fn resolve_usize( + key: &str, + storage_options: Option<&HashMap>, + default: usize, + ) -> lance_core::Result { + let env_key = key.to_ascii_uppercase(); + if let Some(val) = storage_options.and_then(|opts| opts.get(key)) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for storage option '{key}': '{val}'" + )) + }) + } else if let Ok(val) = std::env::var(&env_key) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for env var '{env_key}': '{val}'" + )) + }) + } else { + Ok(default) + } + } + + fn resolve_u64( + key: &str, + storage_options: Option<&HashMap>, + default: u64, + ) -> lance_core::Result { + let env_key = key.to_ascii_uppercase(); + if let Some(val) = storage_options.and_then(|opts| opts.get(key)) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for storage option '{key}': '{val}'" + )) + }) + } else if let Ok(val) = std::env::var(&env_key) { + val.parse::().map_err(|_| { + lance_core::Error::invalid_input(format!( + "Invalid value for env var '{env_key}': '{val}'" + )) + }) + } else { + Ok(default) + } + } + + let initial_rate = resolve_f64("lance_aimd_initial_rate", storage_options, 2000.0)?; + let min_rate = resolve_f64("lance_aimd_min_rate", storage_options, 1.0)?; + let max_rate = resolve_f64("lance_aimd_max_rate", storage_options, 5000.0)?; + let decrease_factor = resolve_f64("lance_aimd_decrease_factor", storage_options, 0.5)?; + let additive_increment = + resolve_f64("lance_aimd_additive_increment", storage_options, 300.0)?; + let burst_capacity = resolve_u32("lance_aimd_burst_capacity", storage_options, 100)?; + let max_retries = resolve_usize("lance_aimd_max_retries", storage_options, 3)?; + let min_backoff_ms = resolve_u64("lance_aimd_min_backoff_ms", storage_options, 100)?; + let max_backoff_ms = resolve_u64("lance_aimd_max_backoff_ms", storage_options, 300)?; + + let aimd = AimdConfig::default() + .with_initial_rate(initial_rate) + .with_min_rate(min_rate) + .with_max_rate(max_rate) + .with_decrease_factor(decrease_factor) + .with_additive_increment(additive_increment); + + Ok(Self { + max_retries, + min_backoff_ms, + max_backoff_ms, + ..Self::default() + .with_aimd(aimd) + .with_burst_capacity(burst_capacity) + }) + } } struct TokenBucketState { @@ -153,10 +309,19 @@ struct OperationThrottle { controller: AimdController, bucket: Mutex, burst_capacity: f64, + max_retries: usize, + min_backoff_ms: u64, + max_backoff_ms: u64, } impl OperationThrottle { - fn new(aimd_config: AimdConfig, burst_capacity: f64) -> lance_core::Result { + fn new( + aimd_config: AimdConfig, + burst_capacity: f64, + max_retries: usize, + min_backoff_ms: u64, + max_backoff_ms: u64, + ) -> lance_core::Result { let initial_rate = aimd_config.initial_rate; let controller = AimdController::new(aimd_config)?; Ok(Self { @@ -167,6 +332,9 @@ impl OperationThrottle { rate: initial_rate, }), burst_capacity, + max_retries, + min_backoff_ms, + max_backoff_ms, }) } @@ -212,36 +380,72 @@ impl OperationThrottle { let outcome = match result { Ok(_) => RequestOutcome::Success, Err(err) if is_throttle_error(err) => { - debug!("Throttle error detected in stream, decreasing rate"); + debug!("Throttle error detected in stream"); RequestOutcome::Throttled } Err(_) => RequestOutcome::Success, }; + let prev_rate = self.controller.current_rate(); let new_rate = self.controller.record_outcome(outcome); + if new_rate < prev_rate { + warn!( + previous_rate = format!("{prev_rate:.1}"), + new_rate = format!("{new_rate:.1}"), + "AIMD throttle: rate reduced due to throttle errors" + ); + } if let Ok(mut bucket) = self.bucket.try_lock() { bucket.rate = new_rate; } } /// Execute an operation with throttling: acquire token, run, classify result. + /// On throttle errors, retries up to `max_retries` times with a random + /// backoff between `min_backoff_ms` and `max_backoff_ms` between attempts. async fn throttled(&self, f: F) -> OSResult where - F: FnOnce() -> Fut, + F: Fn() -> Fut, Fut: std::future::Future>, { - self.acquire_token().await; - let result = f().await; - let outcome = match &result { - Ok(_) => RequestOutcome::Success, - Err(err) if is_throttle_error(err) => { - debug!("Throttle error detected, decreasing rate"); - RequestOutcome::Throttled + for attempt in 0..=self.max_retries { + self.acquire_token().await; + let result = f().await; + let outcome = match &result { + Ok(_) => RequestOutcome::Success, + Err(err) if is_throttle_error(err) => { + debug!("Throttle error detected"); + RequestOutcome::Throttled + } + Err(_) => RequestOutcome::Success, // Non-throttle errors don't indicate capacity problems + }; + let prev_rate = self.controller.current_rate(); + let new_rate = self.controller.record_outcome(outcome); + if new_rate < prev_rate { + warn!( + previous_rate = format!("{prev_rate:.1}"), + new_rate = format!("{new_rate:.1}"), + "AIMD throttle: rate reduced due to throttle errors" + ); } - Err(_) => RequestOutcome::Success, // Non-throttle errors don't indicate capacity problems - }; - let new_rate = self.controller.record_outcome(outcome); - self.update_bucket_rate(new_rate).await; - result + self.update_bucket_rate(new_rate).await; + + match &result { + Err(err) if is_throttle_error(err) && attempt < self.max_retries => { + let backoff_ms = + rand::rng().random_range(self.min_backoff_ms..=self.max_backoff_ms); + debug!( + attempt = attempt + 1, + max_retries = self.max_retries, + backoff_ms, + "Retrying after throttle error" + ); + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + continue; + } + _ => return result, + } + } + unreachable!() } } @@ -254,11 +458,17 @@ impl Debug for OperationThrottle { } } -/// A [`MultipartUpload`] wrapper that throttles `put_part` and observes -/// outcomes from `put_part` and `complete`, feeding them back to the write -/// AIMD controller. +/// A [`MultipartUpload`] wrapper that throttles and retries `put_part`, +/// `complete`, and `abort`, feeding outcomes back to the write AIMD +/// controller. +/// +/// Uses a `std::sync::Mutex` (not `tokio::sync::Mutex`) so that aborted +/// futures cannot cause deadlocks — the guard is always dropped +/// deterministically. The lock is held only briefly for the sync +/// `put_part` dispatch; `complete`/`abort` hold it across their await but +/// are never called concurrently with part uploads. struct ThrottledMultipartUpload { - target: Box, + target: Arc>>, write: Arc, } @@ -272,23 +482,66 @@ impl Debug for ThrottledMultipartUpload { impl MultipartUpload for ThrottledMultipartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { let write = Arc::clone(&self.write); - let fut = self.target.put_part(data); + let target = Arc::clone(&self.target); Box::pin(async move { - write.acquire_token().await; - let result = fut.await; - write.observe_outcome(&result); - result + write + .throttled(|| { + // The let binding is intentional: it ensures the + // MutexGuard is dropped before the future is awaited. + #[allow(clippy::let_and_return)] + let fut = target.lock().unwrap().put_part(data.clone()); + fut + }) + .await }) } async fn complete(&mut self) -> OSResult { - let result = self.target.complete().await; - self.write.observe_outcome(&result); - result + // &mut self guarantees no concurrent put_part futures are alive, + // so get_mut always succeeds (Arc refcount == 1). + let target = Arc::get_mut(&mut self.target) + .expect("complete called while put_part futures are still alive") + .get_mut() + .unwrap(); + for attempt in 0..=self.write.max_retries { + self.write.acquire_token().await; + let result = target.complete().await; + self.write.observe_outcome(&result); + + match &result { + Err(err) if is_throttle_error(err) && attempt < self.write.max_retries => { + let backoff_ms = rand::rng() + .random_range(self.write.min_backoff_ms..=self.write.max_backoff_ms); + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + continue; + } + _ => return result, + } + } + unreachable!() } async fn abort(&mut self) -> OSResult<()> { - self.target.abort().await + let target = Arc::get_mut(&mut self.target) + .expect("abort called while put_part futures are still alive") + .get_mut() + .unwrap(); + for attempt in 0..=self.write.max_retries { + self.write.acquire_token().await; + let result = target.abort().await; + self.write.observe_outcome(&result); + + match &result { + Err(err) if is_throttle_error(err) && attempt < self.write.max_retries => { + let backoff_ms = rand::rng() + .random_range(self.write.min_backoff_ms..=self.write.max_backoff_ms); + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + continue; + } + _ => return result, + } + } + unreachable!() } } @@ -339,12 +592,39 @@ impl AimdThrottledStore { config: AimdThrottleConfig, ) -> lance_core::Result { let burst = config.burst_capacity as f64; + let max_retries = config.max_retries; + let min_backoff_ms = config.min_backoff_ms; + let max_backoff_ms = config.max_backoff_ms; Ok(Self { target, - read: Arc::new(OperationThrottle::new(config.read, burst)?), - write: Arc::new(OperationThrottle::new(config.write, burst)?), - delete: Arc::new(OperationThrottle::new(config.delete, burst)?), - list: Arc::new(OperationThrottle::new(config.list, burst)?), + read: Arc::new(OperationThrottle::new( + config.read, + burst, + max_retries, + min_backoff_ms, + max_backoff_ms, + )?), + write: Arc::new(OperationThrottle::new( + config.write, + burst, + max_retries, + min_backoff_ms, + max_backoff_ms, + )?), + delete: Arc::new(OperationThrottle::new( + config.delete, + burst, + max_retries, + min_backoff_ms, + max_backoff_ms, + )?), + list: Arc::new(OperationThrottle::new( + config.list, + burst, + max_retries, + min_backoff_ms, + max_backoff_ms, + )?), }) } } @@ -354,7 +634,7 @@ impl AimdThrottledStore { impl ObjectStore for AimdThrottledStore { async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { self.write - .throttled(|| self.target.put(location, bytes)) + .throttled(|| self.target.put(location, bytes.clone())) .await } @@ -365,7 +645,7 @@ impl ObjectStore for AimdThrottledStore { opts: PutOptions, ) -> OSResult { self.write - .throttled(|| self.target.put_opts(location, bytes, opts)) + .throttled(|| self.target.put_opts(location, bytes.clone(), opts.clone())) .await } @@ -375,7 +655,7 @@ impl ObjectStore for AimdThrottledStore { .throttled(|| self.target.put_multipart(location)) .await?; Ok(Box::new(ThrottledMultipartUpload { - target, + target: Arc::new(std::sync::Mutex::new(target)), write: Arc::clone(&self.write), })) } @@ -387,10 +667,10 @@ impl ObjectStore for AimdThrottledStore { ) -> OSResult> { let target = self .write - .throttled(|| self.target.put_multipart_opts(location, opts)) + .throttled(|| self.target.put_multipart_opts(location, opts.clone())) .await?; Ok(Box::new(ThrottledMultipartUpload { - target, + target: Arc::new(std::sync::Mutex::new(target)), write: Arc::clone(&self.write), })) } @@ -401,7 +681,7 @@ impl ObjectStore for AimdThrottledStore { async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { self.read - .throttled(|| self.target.get_opts(location, options)) + .throttled(|| self.target.get_opts(location, options.clone())) .await } @@ -1133,10 +1413,11 @@ mod tests { let throttled = mock.throttle_count.load(Ordering::Relaxed); let total_mock = successes + throttled; - // Reader-side count must match mock-side count. - assert_eq!( - total_reader_requests, total_mock, - "Reader-side count ({total_reader_requests}) != mock-side count ({total_mock})" + // Mock-side count >= reader-side count because the AIMD layer retries + // throttle errors internally, causing multiple mock calls per reader call. + assert!( + total_mock >= total_reader_requests, + "Mock-side count ({total_mock}) should be >= reader-side count ({total_reader_requests})" ); // Mock capacity is 30/100ms = 300 req/s. Over 2s the theoretical max is @@ -1160,4 +1441,174 @@ mod tests { "AIMD should limit total requests, got {total_mock}" ); } + + /// A mock store that returns a configurable number of throttle errors + /// before succeeding on `get` operations. Used to test the retry logic + /// inside `OperationThrottle::throttled()`. + struct RetryTestMockStore { + inner: InMemory, + /// Number of throttle errors remaining before success. + errors_remaining: std::sync::Mutex, + /// Total number of `get` calls observed. + get_call_count: AtomicU64, + } + + impl RetryTestMockStore { + fn new(errors_before_success: usize) -> Self { + Self { + inner: InMemory::new(), + errors_remaining: std::sync::Mutex::new(errors_before_success), + get_call_count: AtomicU64::new(0), + } + } + } + + impl Display for RetryTestMockStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RetryTestMockStore") + } + } + + impl Debug for RetryTestMockStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RetryTestMockStore").finish() + } + } + + #[async_trait] + impl ObjectStore for RetryTestMockStore { + async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { + self.inner.put(location, bytes).await + } + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> OSResult { + self.inner.put_opts(location, bytes, opts).await + } + async fn put_multipart(&self, location: &Path) -> OSResult> { + self.inner.put_multipart(location).await + } + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> OSResult> { + self.inner.put_multipart_opts(location, opts).await + } + async fn get(&self, location: &Path) -> OSResult { + self.get_call_count.fetch_add(1, Ordering::Relaxed); + let should_error = { + let mut remaining = self.errors_remaining.lock().unwrap(); + if *remaining > 0 { + *remaining -= 1; + true + } else { + false + } + }; + if should_error { + Err(object_store::Error::Generic { + store: "RetryTestMock", + source: "request failed, after 3 retries, max_retries: 3, retry_timeout: 30s" + .into(), + }) + } else { + self.inner.get(location).await + } + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { + self.inner.get_opts(location, options).await + } + async fn get_range(&self, location: &Path, range: Range) -> OSResult { + self.inner.get_range(location, range).await + } + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { + self.inner.get_ranges(location, ranges).await + } + async fn head(&self, location: &Path) -> OSResult { + self.inner.head(location).await + } + async fn delete(&self, location: &Path) -> OSResult<()> { + self.inner.delete(location).await + } + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, OSResult>, + ) -> BoxStream<'a, OSResult> { + self.inner.delete_stream(locations) + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult> { + self.inner.list(prefix) + } + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, OSResult> { + self.inner.list_with_offset(prefix, offset) + } + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + self.inner.list_with_delimiter(prefix).await + } + async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { + self.inner.copy(from, to).await + } + async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { + self.inner.rename(from, to).await + } + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { + self.inner.rename_if_not_exists(from, to).await + } + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + } + + #[tokio::test] + async fn test_throttled_retries_on_throttle_error_then_succeeds() { + // Mock returns 2 throttle errors then succeeds (within MAX_RETRIES=3) + let mock = Arc::new(RetryTestMockStore::new(2)); + let path = Path::from("test/retry.txt"); + mock.put(&path, PutPayload::from_static(b"retry data")) + .await + .unwrap(); + + let config = AimdThrottleConfig::default(); + let throttled = + AimdThrottledStore::new(mock.clone() as Arc, config).unwrap(); + + let result = throttled.get(&path).await; + assert!(result.is_ok(), "Expected success after retries"); + + let bytes = result.unwrap().bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), b"retry data"); + + // Should have called get 3 times total: 2 failures + 1 success + assert_eq!(mock.get_call_count.load(Ordering::Relaxed), 3); + } + + #[tokio::test] + async fn test_throttled_fails_after_max_retries_exceeded() { + // Mock returns 4 throttle errors (more than MAX_RETRIES=3), + // so all 4 attempts (initial + 3 retries) will fail. + let mock = Arc::new(RetryTestMockStore::new(10)); + let path = Path::from("test/fail.txt"); + mock.put(&path, PutPayload::from_static(b"fail data")) + .await + .unwrap(); + + let config = AimdThrottleConfig::default(); + let throttled = + AimdThrottledStore::new(mock.clone() as Arc, config).unwrap(); + + let result = throttled.get(&path).await; + assert!(result.is_err(), "Expected error after max retries"); + assert!(is_throttle_error(&result.unwrap_err())); + + // Should have called get 4 times: initial attempt + 3 retries + assert_eq!(mock.get_call_count.load(Ordering::Relaxed), 4); + } } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index e27e9519425..7199c4d1b91 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -13,7 +13,7 @@ use std::ops::Range; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; -use tokio::sync::{Notify, Semaphore, SemaphorePermit}; +use tokio::sync::Notify; use lance_core::{Error, Result}; @@ -32,19 +32,6 @@ const BACKPRESSURE_DEBOUNCE: u64 = 60; static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0); // Global counter of how many bytes were read by the scheduler static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0); -// By default, we limit the number of IOPS across the entire process to 128 -// -// In theory this is enough for ~10GBps on S3 following the guidelines to issue -// 1 IOP per 80MBps. In practice, I have noticed slightly better performance going -// up to 256. -// -// However, non-S3 stores (e.g. GCS, Azure) can suffer significantly from too many -// concurrent IOPS. For safety, we set the default to 128 and let the user override -// this if needed. -// -// Note: this only limits things that run through the scheduler. It does not limit -// IOPS from other sources like writing or commits. -static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128; pub fn iops_counter() -> u64 { IOPS_COUNTER.load(Ordering::Acquire) @@ -54,97 +41,6 @@ pub fn bytes_read_counter() -> u64 { BYTES_READ_COUNTER.load(Ordering::Acquire) } -// There are two structures that control the I/O scheduler concurrency. First, -// we have a hard limit on the number of IOPS that can be issued concurrently. -// This limit is process-wide. -// -// Second, we try and limit how many I/O requests can be buffered in memory without -// being consumed by a decoder of some kind. This limit is per-scheduler. We cannot -// make this limit process wide without introducing deadlock (because the decoder for -// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1) -// and vice-versa. -// -// There is also a per-scan limit on the number of IOPS that can be issued concurrently. -// -// The process-wide limit exists when users need a hard limit on the number of parallel -// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating -// the network. (Note: a process-wide limit of X will not necessarily limit the number of -// open TCP connections to exactly X. The underlying object store may open more connections -// anyways) -// -// However, it can be too tough in some cases, e.g. when some scans are reading from -// cloud storage and other scans are reading from local disk. In these cases users don't -// need to set a process-limit and can rely on the per-scan limits. - -// The IopsQuota enforces the first of the above limits, it is the per-process hard cap -// on the number of IOPS that can be issued concurrently. -// -// The per-scan limits are enforced by IoQueue -struct IopsQuota { - // An Option is used here to avoid mutex overhead if no limit is set - iops_avail: Option, -} - -/// A reservation on the global IOPS quota -/// -/// When the reservation is dropped, the IOPS quota is released unless -/// [`Self::forget`] is called. -struct IopsReservation<'a> { - value: Option>, -} - -impl IopsReservation<'_> { - // Forget the reservation, so it won't be released on drop - fn forget(&mut self) { - if let Some(value) = self.value.take() { - value.forget(); - } - } -} - -impl IopsQuota { - // By default, we throttle the number of scan IOPS across the entire process - // - // However, the user can disable this by setting the environment variable - // LANCE_PROCESS_IO_THREADS_LIMIT to zero (or a negative integer). - fn new() -> Self { - let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT") - .map(|s| { - s.parse::().unwrap_or_else(|_| { - log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s); - DEFAULT_PROCESS_IOPS_LIMIT - }) - }) - .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT); - let iops_avail = if initial_capacity <= 0 { - None - } else { - Some(Semaphore::new(initial_capacity as usize)) - }; - Self { iops_avail } - } - - // Return a reservation on the global IOPS quota - fn release(&self) { - if let Some(iops_avail) = self.iops_avail.as_ref() { - iops_avail.add_permits(1); - } - } - - // Acquire a reservation on the global IOPS quota - async fn acquire(&self) -> IopsReservation<'_> { - if let Some(iops_avail) = self.iops_avail.as_ref() { - IopsReservation { - value: Some(iops_avail.acquire().await.unwrap()), - } - } else { - IopsReservation { value: None } - } - } -} - -static IOPS_QUOTA: std::sync::LazyLock = std::sync::LazyLock::new(IopsQuota::new); - // We want to allow requests that have a lower priority than any // currently in-flight request. This helps avoid potential deadlocks // related to backpressure. Unfortunately, it is quite expensive to @@ -303,17 +199,8 @@ impl IoQueue { async fn pop(&self) -> Option { loop { { - // First, grab a reservation on the global IOPS quota - // If we then get a task to run, transfer the reservation - // to the task. Otherwise, the reservation will be released - // when iop_res is dropped. - let mut iop_res = IOPS_QUOTA.acquire().await; - // Next, try and grab a reservation from the queue let mut state = self.state.lock().unwrap(); if let Some(task) = state.next_task() { - // Reservation successfully acquired, we will release the global - // global reservation after task has run. - iop_res.forget(); return Some(task); } @@ -501,7 +388,6 @@ impl IoTask { range_end = self.to_read.end, "File I/O completed" ); - IOPS_QUOTA.release(); (self.when_done)(bytes); } }