Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/src/guide/object_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ These options apply to all object stores.
| `proxy_url` | URL of a proxy server to use for requests. Default, `None`. |
| `proxy_ca_certificate` | PEM-formatted CA certificate for proxy connections |
| `proxy_excludes` | List of hosts that bypass proxy. This is a comma separated list of domains and IP masks. Any subdomain of the provided domain will be bypassed. For example, `example.com, 192.168.1.0/24` would bypass `https://api.example.com`, `https://www.example.com`, and any IP in the range `192.168.1.0/24`. |
| `client_max_retries` | Number of times for a s3 client to retry the request. Default, `10`. |
| `client_retry_timeout` | Timeout for a s3 client to retry the request in seconds. Default, `180`. |
| `client_max_retries` | Number of times for the object store client to retry the request. Default, `3`. |
| `client_retry_timeout` | Timeout for the object store client to retry the request in seconds. Default, `180`. |

## S3 Configuration

Expand Down
30 changes: 25 additions & 5 deletions docs/src/guide/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,31 @@ In summary, scans could use up to `(2 * io_buffer_size) + (batch_size * num_comp
Keep in mind that `io_buffer_size` is a soft limit (e.g. we cannot read less than one page at a time right now)
and so it is not necessarily a bug if you see memory usage exceed this limit by a small margin.

The above limits refer to limits per-scan. There is an additional limit on the number of IOPS that is applied
across the entire process. This limit is specified by the `LANCE_PROCESS_IO_THREADS_LIMIT` environment variable.
The default is 128 which is more than enough for most workloads. You can increase this limit if you are working
with a high-throughput workload. You can even disable this limit entirely by setting it to zero. Note that this
can often lead to issues with excessive retries and timeouts from the object store.
### Cloud Store Throttling

Cloud object stores (S3, GCS, Azure) are automatically wrapped with an AIMD (Additive Increase / Multiplicative
Decrease) rate limiter. When the store returns throttle errors (HTTP 429/503), the request rate decreases
multiplicatively. During sustained success, the rate increases additively. This applies to all operations
(reads, writes, deletes, lists) and replaces the old `LANCE_PROCESS_IO_THREADS_LIMIT` process-wide cap.

Local and in-memory stores are **not** throttled.

The AIMD throttle can be tuned via storage options or environment variables. Storage options take precedence
over environment variables:

| Setting | Storage Option Key | Env Var | Default |
| ------------------ | ------------------------------- | ------------------------------- | ------- |
| Initial rate | `lance_aimd_initial_rate` | `LANCE_AIMD_INITIAL_RATE` | 2000 |
| Min rate | `lance_aimd_min_rate` | `LANCE_AIMD_MIN_RATE` | 1 |
| Max rate | `lance_aimd_max_rate` | `LANCE_AIMD_MAX_RATE` | 5000 |
| Decrease factor | `lance_aimd_decrease_factor` | `LANCE_AIMD_DECREASE_FACTOR` | 0.5 |
| Additive increment | `lance_aimd_additive_increment` | `LANCE_AIMD_ADDITIVE_INCREMENT` | 300 |
| Burst capacity | `lance_aimd_burst_capacity` | `LANCE_AIMD_BURST_CAPACITY` | 100 |

These initial settings are balanced and should work for most
use cases. For example, S3 can typically get up to 5000
req/s and with these settings we should get there in about
10 seconds.

## Indexes

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ impl StorageOptions {
.iter()
.find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
.and_then(|(_, value)| value.parse::<usize>().ok())
.unwrap_or(10)
.unwrap_or(3)
}

/// Seconds of timeout to set in RetryConfig for object store client
Expand Down
22 changes: 18 additions & 4 deletions rust/lance-io/src/object_store/providers/aws.rs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems multipart upload process is not covered. Is it expected?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've added multipart to the retry handling. I still can't apply the outer retry loop to the delete stream or list stream methods. These return a stream of items and there is no way of mapping results to underlying object store requests so there is nothing I can retry there. Since these operations are hopefully rare I think it will be ok.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::object_store::{
DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
StorageOptionsProvider,
throttle::{AimdThrottleConfig, AimdThrottledStore},
};
use lance_core::error::{Error, Result};

Expand All @@ -44,12 +45,12 @@ impl AwsStoreProvider {
storage_options: &StorageOptions,
is_s3_express: bool,
) -> Result<Arc<dyn OSObjectStore>> {
let max_retries = storage_options.client_max_retries();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_max_retries is not useful anymore, do we need to remove it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should still be used. We actually rely on object_store's retries to make the AIMD throttle work. The object_store errors have no "is_temporary" so we have no other way of knowing if an error is a temporary error except to see if object_store applied retries to it.

So by default we should do 3 object store retries now. Each time those fail we apply an AIMD retry (longer backoff, cut throttle). So in total we get 9 retries instead of 10.

let retry_timeout = storage_options.client_retry_timeout();
// Use a low retry count since the AIMD throttle layer handles
// throttle recovery with its own retry loop.
let retry_config = RetryConfig {
backoff: Default::default(),
max_retries,
retry_timeout: Duration::from_secs(retry_timeout),
max_retries: storage_options.client_max_retries(),
retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
};

let mut s3_storage_options = storage_options.as_s3_options();
Expand Down Expand Up @@ -159,6 +160,19 @@ impl ObjectStoreProvider for AwsStoreProvider {
self.build_amazon_s3_store(&mut base_path, params, &storage_options, is_s3_express)
.await?
};
let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
let inner = if throttle_config.is_disabled() {
inner
} else if storage_options.client_max_retries() == 0 {
log::warn!(
"AIMD throttle disabled: the current implementation relies on the object store \
client surfacing retry errors, which requires client_max_retries > 0. \
No throttle or retry layer will be applied."
);
inner
} else {
Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
};

Ok(ObjectStore {
inner,
Expand Down
22 changes: 18 additions & 4 deletions rust/lance-io/src/object_store/providers/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use url::Url;
use crate::object_store::{
DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
ObjectStoreParams, ObjectStoreProvider, StorageOptions,
throttle::{AimdThrottleConfig, AimdThrottledStore},
};
use lance_core::error::{Error, Result};

Expand Down Expand Up @@ -113,12 +114,12 @@ impl AzureBlobStoreProvider {
base_path: &Url,
storage_options: &StorageOptions,
) -> Result<Arc<dyn OSObjectStore>> {
let max_retries = storage_options.client_max_retries();
let retry_timeout = storage_options.client_retry_timeout();
// Use a low retry count since the AIMD throttle layer handles
// throttle recovery with its own retry loop.
let retry_config = RetryConfig {
backoff: Default::default(),
max_retries,
retry_timeout: Duration::from_secs(retry_timeout),
max_retries: storage_options.client_max_retries(),
retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
};

let mut builder = MicrosoftAzureBuilder::new()
Expand Down Expand Up @@ -163,6 +164,19 @@ impl ObjectStoreProvider for AzureBlobStoreProvider {
self.build_microsoft_azure_store(&base_path, &storage_options)
.await?
};
let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
let inner = if throttle_config.is_disabled() {
inner
} else if storage_options.client_max_retries() == 0 {
log::warn!(
"AIMD throttle disabled: the current implementation relies on the object store \
client surfacing retry errors, which requires client_max_retries > 0. \
No throttle or retry layer will be applied."
);
inner
} else {
Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
};

Ok(ObjectStore {
inner,
Expand Down
22 changes: 18 additions & 4 deletions rust/lance-io/src/object_store/providers/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use url::Url;
use crate::object_store::{
DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
ObjectStoreParams, ObjectStoreProvider, StorageOptions,
throttle::{AimdThrottleConfig, AimdThrottledStore},
};
use lance_core::error::{Error, Result};

Expand Down Expand Up @@ -58,12 +59,12 @@ impl GcsStoreProvider {
base_path: &Url,
storage_options: &StorageOptions,
) -> Result<Arc<dyn OSObjectStore>> {
let max_retries = storage_options.client_max_retries();
let retry_timeout = storage_options.client_retry_timeout();
// Use a low retry count since the AIMD throttle layer handles
// throttle recovery with its own retry loop.
let retry_config = RetryConfig {
backoff: Default::default(),
max_retries,
retry_timeout: Duration::from_secs(retry_timeout),
max_retries: storage_options.client_max_retries(),
retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
};

let mut builder = GoogleCloudStorageBuilder::new()
Expand Down Expand Up @@ -108,6 +109,19 @@ impl ObjectStoreProvider for GcsStoreProvider {
self.build_google_cloud_store(&base_path, &storage_options)
.await?
};
let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
let inner = if throttle_config.is_disabled() {
inner
} else if storage_options.client_max_retries() == 0 {
log::warn!(
"AIMD throttle disabled: the current implementation relies on the object store \
client surfacing retry errors, which requires client_max_retries > 0. \
No throttle or retry layer will be applied."
);
inner
} else {
Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
};

Ok(ObjectStore {
inner,
Expand Down
Loading
Loading