From 0280ba28262414a1f4bc21a62e590285a0e15be3 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Tue, 2 Dec 2025 19:23:41 +0100 Subject: [PATCH 1/2] Add more debug, v1 migration fixes --- src/filetracker_client/mod.rs | 64 ++++++++++++++++++++++++++++++----- src/routes/ft/get_file.rs | 10 +++--- src/routes/ft/put_file.rs | 40 ++++++++++++++++++---- 3 files changed, 96 insertions(+), 18 deletions(-) diff --git a/src/filetracker_client/mod.rs b/src/filetracker_client/mod.rs index 1d4d86b..c57b2c2 100644 --- a/src/filetracker_client/mod.rs +++ b/src/filetracker_client/mod.rs @@ -53,8 +53,19 @@ impl FiletrackerClient { let response = self.client.get(&url).send().await?; if !response.status().is_success() { - error!("Failed to list files: HTTP {}", response.status()); - bail!("HTTP {}", response.status()) + let status = response.status(); + let x_exception = response + .headers() + .get("X-Exception") + .and_then(|v| v.to_str().ok()) + .unwrap_or("unknown") + .to_string(); + let body = response.text().await.unwrap_or_default(); + error!( + "Failed to list files: HTTP {} - X-Exception: {} - Body: {}", + status, x_exception, body + ); + bail!("HTTP {} - {}", status, x_exception) } let body = response.text().await?; @@ -80,8 +91,19 @@ impl FiletrackerClient { } if !response.status().is_success() { - error!("Failed to get file: HTTP {}", response.status()); - bail!("HTTP {}", response.status()) + let status = response.status(); + let x_exception = response + .headers() + .get("X-Exception") + .and_then(|v| v.to_str().ok()) + .unwrap_or("unknown") + .to_string(); + let body = response.text().await.unwrap_or_default(); + error!( + "Failed to get file: HTTP {} - X-Exception: {} - Body: {}", + status, x_exception, body + ); + bail!("HTTP {} - {}", status, x_exception) } // Extract headers @@ -150,6 +172,10 @@ impl FiletrackerClient { let mut request = self.client.put(&url).body(data); + // V1 filetracker reads timestamp from Last-Modified header, not query param + // We send both for compatibility with V1 and V2 + request = request.header("Last-Modified", ×tamp_rfc2822); + if is_compressed { request = request.header("Content-Encoding", "gzip"); } @@ -161,8 +187,19 @@ impl FiletrackerClient { let response = request.send().await?; if !response.status().is_success() { - error!("Failed to put file: HTTP {}", response.status()); - bail!("HTTP {}", response.status()) + let status = response.status(); + let x_exception = response + .headers() + .get("X-Exception") + .and_then(|v| v.to_str().ok()) + .unwrap_or("unknown") + .to_string(); + let body = response.text().await.unwrap_or_default(); + error!( + "Failed to put file: HTTP {} - X-Exception: {} - Body: {}", + status, x_exception, body + ); + bail!("HTTP {} - {}", status, x_exception) } debug!("Put file to filetracker successfully"); @@ -192,8 +229,19 @@ impl FiletrackerClient { } if !response.status().is_success() { - error!("Failed to delete file: HTTP {}", response.status()); - bail!("HTTP {}", response.status()) + let status = response.status(); + let x_exception = response + .headers() + .get("X-Exception") + .and_then(|v| v.to_str().ok()) + .unwrap_or("unknown") + .to_string(); + let body = response.text().await.unwrap_or_default(); + error!( + "Failed to delete file: HTTP {} - X-Exception: {} - Body: {}", + status, x_exception, body + ); + bail!("HTTP {} - {}", status, x_exception) } debug!("Deleted file from filetracker successfully"); diff --git a/src/routes/ft/get_file.rs b/src/routes/ft/get_file.rs index 39e7fcb..28decce 100644 --- a/src/routes/ft/get_file.rs +++ b/src/routes/ft/get_file.rs @@ -266,15 +266,17 @@ pub async fn ft_get_file( .observe(start.elapsed().as_secs_f64()); // 7. Return file with appropriate headers (matching original filetracker) + let last_modified_header = utils::format_rfc2822_timestamp(modified_time); + debug!( + "GET {} returning Last-Modified: {} (unix: {})", + path, last_modified_header, modified_time + ); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/octet-stream") .header("Content-Length", blob_data.len().to_string()) .header("Content-Encoding", "gzip") - .header( - "Last-Modified", - utils::format_rfc2822_timestamp(modified_time), - ) + .header("Last-Modified", last_modified_header) .header("Logical-Size", logical_size.to_string()) .body(Body::from(blob_data)) .unwrap() diff --git a/src/routes/ft/put_file.rs b/src/routes/ft/put_file.rs index 1c22866..4f21221 100644 --- a/src/routes/ft/put_file.rs +++ b/src/routes/ft/put_file.rs @@ -41,8 +41,21 @@ pub async fn ft_put_file( }; // 1. Parse and validate timestamp (required for PUT) + let raw_timestamp = headers + .get("last-modified") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + .or_else(|| query.last_modified.clone()); let timestamp = match utils::extract_timestamp(&headers, query.last_modified.as_ref(), true) { - Ok(ts) => ts, + Ok(ts) => { + debug!( + "PUT {} timestamp: raw='{}' -> parsed={}", + path, + raw_timestamp.as_deref().unwrap_or("none"), + ts + ); + ts + } Err(e) => { error!("Failed to extract timestamp: {}", e); record_metrics("400"); @@ -475,16 +488,26 @@ pub async fn ft_put_file( if let Some(filetracker_client) = &state.filetracker_client { debug!("Live migration mode: also writing to filetracker"); - // Reconstruct the data that needs to be sent to filetracker - // We need to use the final_data (compressed) that was stored + // V1 filetracker doesn't understand compression - it stores files uncompressed. + // We need to decompress before sending, otherwise V1 stores gzip bytes + // and later returns them without Content-Encoding header, causing corruption. + let uncompressed_for_v1 = match storage_helpers::decompress_gzip(&final_data) { + Ok(data) => data, + Err(e) => { + warn!("Failed to decompress for V1 filetracker dual-write: {}", e); + // Skip dual-write if decompression fails + final_data.clone() + } + }; + let result = filetracker_client .put_file( path, - final_data.clone(), + uncompressed_for_v1, timestamp, logical_size, &digest, - true, // Always compressed in storage + false, // V1 filetracker stores uncompressed ) .await; @@ -500,10 +523,15 @@ pub async fn ft_put_file( } record_metrics("200"); + let last_modified_header = utils::format_rfc2822_timestamp(timestamp); + debug!( + "PUT {} complete, returning Last-Modified: {} (unix: {})", + path, last_modified_header, timestamp + ); Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/plain") - .header("Last-Modified", utils::format_rfc2822_timestamp(timestamp)) + .header("Last-Modified", last_modified_header) .body("".to_string()) .unwrap() } From 1bf917f4c690bdacecdfb8c5d0b10f336fec9119 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Tue, 2 Dec 2025 19:57:09 +0100 Subject: [PATCH 2/2] Add s3 key sharding --- src/config.rs | 21 +++ src/s3storage/minio.rs | 231 ++++++++++++++++++++++++++++---- src/s3storage/mod.rs | 30 +++++ tests/concurrent_stress_test.rs | 2 + tests/integration_test.rs | 1 + tests/metrics_test.rs | 2 + tests/migration_test.rs | 1 + tests/postgres_locks_test.rs | 1 + 8 files changed, 261 insertions(+), 28 deletions(-) diff --git a/src/config.rs b/src/config.rs index bb06970..898ec74 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,7 @@ pub use crate::kvstorage::KVStorageType; pub use crate::kvstorage::postgres::PostgresConfig; pub use crate::kvstorage::sqlite::SQLiteConfig; pub use crate::locks::LocksType; +pub use crate::s3storage::KeyShardingConfig; pub use crate::s3storage::S3StorageType; pub use crate::s3storage::minio::MinIOConfig; @@ -203,6 +204,16 @@ impl BucketConfig { if let Ok(val) = std::env::var("S3_SECRET_KEY") { minio.secret_key = val; } + if let Ok(val) = std::env::var("S3_KEY_SHARDING_ENABLED") + && let Ok(enabled) = val.parse() + { + minio.key_sharding.enabled = enabled; + } + if let Ok(val) = std::env::var("S3_KEY_SHARDING_DEPTH") + && let Ok(depth) = val.parse() + { + minio.key_sharding.depth = depth; + } } // Filetracker URL for live migration @@ -229,6 +240,16 @@ impl BucketConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(true), + key_sharding: KeyShardingConfig { + enabled: std::env::var("S3_KEY_SHARDING_ENABLED") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(true), + depth: std::env::var("S3_KEY_SHARDING_DEPTH") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(2), + }, }) } else { None diff --git a/src/s3storage/minio.rs b/src/s3storage/minio.rs index 7c59acd..a8ce2b4 100644 --- a/src/s3storage/minio.rs +++ b/src/s3storage/minio.rs @@ -1,5 +1,5 @@ use crate::config::BucketConfig; -use crate::s3storage::S3StorageTrait; +use crate::s3storage::{KeyShardingConfig, S3StorageTrait}; use anyhow::Result; use anyhow::anyhow; use anyhow::bail; @@ -18,6 +18,8 @@ pub struct MinIOConfig { pub secret_key: String, #[serde(default = "default_force_path_style")] pub force_path_style: bool, + #[serde(default)] + pub key_sharding: KeyShardingConfig, } fn default_force_path_style() -> bool { @@ -28,6 +30,7 @@ fn default_force_path_style() -> bool { pub struct MinIOClient { client: Client, bucket: String, + sharding: KeyShardingConfig, } #[async_trait] @@ -63,10 +66,15 @@ impl S3StorageTrait for MinIOClient { let client = Client::from_conf(s3_config); debug!("MinIO client initialized for bucket: {}", config.name); + debug!( + "Key sharding: enabled={}, depth={}", + minio_config.key_sharding.enabled, minio_config.key_sharding.depth + ); let minio_client = MinIOClient { client, bucket: config.name.clone(), + sharding: minio_config.key_sharding.clone(), }; // Create bucket if it doesn't exist @@ -76,94 +84,95 @@ impl S3StorageTrait for MinIOClient { } async fn put_object(&self, key: &str, data: Vec) -> Result<()> { - debug!("Putting object: {} (size: {} bytes)", key, data.len()); + let s3_key = self.hash_to_s3_key(key); + debug!( + "Putting object: {} -> {} (size: {} bytes)", + key, + s3_key, + data.len() + ); self.client .put_object() .bucket(&self.bucket) - .key(key) + .key(&s3_key) .body(ByteStream::from(data)) .content_type("application/octet-stream") .send() .await?; - debug!("Successfully put object: {}", key); + debug!("Successfully put object: {}", s3_key); Ok(()) } async fn get_object(&self, key: &str) -> Result> { - debug!("Getting object: {}", key); + let s3_key = self.hash_to_s3_key(key); + debug!("Getting object: {} -> {}", key, s3_key); let resp = self .client .get_object() .bucket(&self.bucket) - .key(key) + .key(&s3_key) .send() .await?; let data = resp.body.collect().await?.into_bytes().to_vec(); debug!( "Successfully got object: {} (size: {} bytes)", - key, + s3_key, data.len() ); Ok(data) } async fn delete_object(&self, key: &str) -> Result<()> { - debug!("Deleting object: {}", key); + let s3_key = self.hash_to_s3_key(key); + debug!("Deleting object: {} -> {}", key, s3_key); let delete_future = self .client .delete_object() .bucket(&self.bucket) - .key(key) + .key(&s3_key) .send(); // Add 10 second timeout to prevent indefinite hanging match tokio::time::timeout(std::time::Duration::from_secs(10), delete_future).await { Ok(Ok(_)) => { - debug!("Successfully deleted object: {}", key); + debug!("Successfully deleted object: {}", s3_key); Ok(()) } Ok(Err(e)) => { - tracing::error!("Failed to delete object {}: {}", key, e); - bail!(e) + error!("Failed to delete object {}: {}", s3_key, e); + Err(anyhow::Error::from(e)) } Err(_) => { - tracing::error!("Timeout deleting object {}", key); + error!("Timeout deleting object {}", s3_key); bail!("Timeout deleting object") } } } async fn object_exists(&self, key: &str) -> Result { - debug!("Checking if object exists: {}", key); + let s3_key = self.hash_to_s3_key(key); + debug!("Checking if object exists: {} -> {}", key, s3_key); match self .client .head_object() .bucket(&self.bucket) - .key(key) + .key(&s3_key) .send() .await { Ok(_) => { - debug!("Object exists: {}", key); + debug!("Object exists: {}", s3_key); Ok(true) } Err(err) => { - let err_string = err.to_string(); - debug!("Head object error: {}", err_string); - - if err_string.contains("NotFound") - || err_string.contains("404") - || err_string.contains("NoSuchKey") - || format!("{:?}", err).contains("NotFound") - || format!("{:?}", err).contains("NoSuchKey") - { - debug!("Object does not exist: {}", key); + if Self::is_not_found_error(&err) { + debug!("Object does not exist: {}", s3_key); Ok(false) } else { debug!("Error checking object existence: {}", err); @@ -190,16 +199,17 @@ impl S3StorageTrait for MinIOClient { let resp = request.send().await?; + // Strip sharding prefixes to return raw hashes let keys: Vec = resp .contents() .iter() - .filter_map(|obj| obj.key().map(|k| k.to_string())) + .filter_map(|obj| obj.key().map(|k| self.s3_key_to_hash(k))) .collect(); let next_token = resp.next_continuation_token().map(|t| t.to_string()); debug!( - "Listed {} objects, has more: {}", + "Listed {} objects (hashes), has more: {}", keys.len(), next_token.is_some() ); @@ -220,6 +230,46 @@ impl S3StorageTrait for MinIOClient { } impl MinIOClient { + /// Transform a raw hash to S3 key with sharding prefix + /// "abcdef..." -> "ab/cd/abcdef..." + fn hash_to_s3_key(&self, hash: &str) -> String { + if !self.sharding.enabled { + return hash.to_string(); + } + + // Need at least depth*2 characters for sharding + if hash.len() < self.sharding.depth * 2 { + return hash.to_string(); + } + + let mut parts = Vec::with_capacity(self.sharding.depth + 1); + for i in 0..self.sharding.depth { + parts.push(&hash[i * 2..(i + 1) * 2]); + } + parts.push(hash); + parts.join("/") + } + + /// Extract raw hash from S3 key (strip sharding prefix) + /// "ab/cd/abcdef..." -> "abcdef..." + fn s3_key_to_hash(&self, key: &str) -> String { + if !self.sharding.enabled { + return key.to_string(); + } + // The hash is always the last component + key.rsplit('/').next().unwrap_or(key).to_string() + } + + /// Check if an error indicates object not found + fn is_not_found_error(err: &aws_sdk_s3::error::SdkError) -> bool { + let err_string = err.to_string(); + err_string.contains("NotFound") + || err_string.contains("404") + || err_string.contains("NoSuchKey") + || format!("{:?}", err).contains("NotFound") + || format!("{:?}", err).contains("NoSuchKey") + } + /// Ensures the bucket exists, creating it if necessary async fn ensure_bucket_exists(&self) -> Result<()> { debug!("Checking if bucket exists: {}", self.bucket); @@ -279,3 +329,128 @@ impl MinIOClient { } } } + +#[cfg(test)] +mod tests { + use super::*; + + // Helper to create a test sharding config + fn make_sharding(enabled: bool, depth: usize) -> KeyShardingConfig { + KeyShardingConfig { enabled, depth } + } + + // Helper struct to test key transformation without needing full MinIOClient + struct TestClient { + sharding: KeyShardingConfig, + } + + impl TestClient { + fn hash_to_s3_key(&self, hash: &str) -> String { + if !self.sharding.enabled { + return hash.to_string(); + } + + if hash.len() < self.sharding.depth * 2 { + return hash.to_string(); + } + + let mut parts = Vec::with_capacity(self.sharding.depth + 1); + for i in 0..self.sharding.depth { + parts.push(&hash[i * 2..(i + 1) * 2]); + } + parts.push(hash); + parts.join("/") + } + + fn s3_key_to_hash(&self, key: &str) -> String { + if !self.sharding.enabled { + return key.to_string(); + } + key.rsplit('/').next().unwrap_or(key).to_string() + } + } + + #[test] + fn test_hash_to_s3_key_disabled() { + let client = TestClient { + sharding: make_sharding(false, 2), + }; + let hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + assert_eq!(client.hash_to_s3_key(hash), hash); + } + + #[test] + fn test_hash_to_s3_key_depth_2() { + let client = TestClient { + sharding: make_sharding(true, 2), + }; + let hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + assert_eq!( + client.hash_to_s3_key(hash), + "ab/cd/abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" + ); + } + + #[test] + fn test_hash_to_s3_key_depth_3() { + let client = TestClient { + sharding: make_sharding(true, 3), + }; + let hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + assert_eq!( + client.hash_to_s3_key(hash), + "ab/cd/ef/abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" + ); + } + + #[test] + fn test_hash_to_s3_key_short_hash() { + let client = TestClient { + sharding: make_sharding(true, 2), + }; + // Hash too short for depth 2 (needs at least 4 chars) + let hash = "abc"; + assert_eq!(client.hash_to_s3_key(hash), hash); + } + + #[test] + fn test_s3_key_to_hash_disabled() { + let client = TestClient { + sharding: make_sharding(false, 2), + }; + let key = "ab/cd/abcdef123"; + assert_eq!(client.s3_key_to_hash(key), key); + } + + #[test] + fn test_s3_key_to_hash_enabled() { + let client = TestClient { + sharding: make_sharding(true, 2), + }; + let key = "ab/cd/abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + assert_eq!( + client.s3_key_to_hash(key), + "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" + ); + } + + #[test] + fn test_s3_key_to_hash_no_slashes() { + let client = TestClient { + sharding: make_sharding(true, 2), + }; + let key = "abcdef1234567890"; + assert_eq!(client.s3_key_to_hash(key), key); + } + + #[test] + fn test_roundtrip() { + let client = TestClient { + sharding: make_sharding(true, 2), + }; + let hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; + let s3_key = client.hash_to_s3_key(hash); + let recovered = client.s3_key_to_hash(&s3_key); + assert_eq!(recovered, hash); + } +} diff --git a/src/s3storage/mod.rs b/src/s3storage/mod.rs index 8e241f6..70070d2 100644 --- a/src/s3storage/mod.rs +++ b/src/s3storage/mod.rs @@ -6,6 +6,36 @@ use tracing::{debug, info}; pub mod minio; +/// Configuration for S3 key sharding +/// When enabled, transforms keys like "abcdef..." to "ab/cd/abcdef..." +/// to distribute objects across directories and avoid ext4 performance issues +#[derive(Debug, Deserialize, Clone)] +pub struct KeyShardingConfig { + /// Whether key sharding is enabled + #[serde(default = "default_sharding_enabled")] + pub enabled: bool, + /// Number of prefix levels (2 = ab/cd/, 3 = ab/cd/ef/) + #[serde(default = "default_sharding_depth")] + pub depth: usize, +} + +fn default_sharding_enabled() -> bool { + true +} + +fn default_sharding_depth() -> usize { + 2 +} + +impl Default for KeyShardingConfig { + fn default() -> Self { + Self { + enabled: default_sharding_enabled(), + depth: default_sharding_depth(), + } + } +} + #[derive(Debug, Deserialize, Clone)] pub enum S3StorageType { #[serde(rename = "minio")] diff --git a/tests/concurrent_stress_test.rs b/tests/concurrent_stress_test.rs index 4250a5f..4bf7a82 100644 --- a/tests/concurrent_stress_test.rs +++ b/tests/concurrent_stress_test.rs @@ -72,6 +72,7 @@ async fn create_test_app_with_state() -> (Router, Arc) { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None, @@ -135,6 +136,7 @@ async fn is_minio_available() -> bool { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None, diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 72c1399..3590986 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -79,6 +79,7 @@ async fn create_test_app_with_state() -> (Router, Arc) { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None, diff --git a/tests/metrics_test.rs b/tests/metrics_test.rs index 3ea8974..7a00919 100644 --- a/tests/metrics_test.rs +++ b/tests/metrics_test.rs @@ -32,6 +32,7 @@ async fn create_test_app_state() -> Arc { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: Default::default(), filetracker_url: None, @@ -274,6 +275,7 @@ async fn test_migration_active_metric() { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: Default::default(), filetracker_url: Some("http://localhost:8000".to_string()), diff --git a/tests/migration_test.rs b/tests/migration_test.rs index f70e488..366b76f 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -194,6 +194,7 @@ async fn create_test_app_state() -> Arc { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None, diff --git a/tests/postgres_locks_test.rs b/tests/postgres_locks_test.rs index 7ebaaf0..84dcc30 100644 --- a/tests/postgres_locks_test.rs +++ b/tests/postgres_locks_test.rs @@ -27,6 +27,7 @@ mod postgres_locks_tests { access_key: "minioadmin".to_string(), secret_key: "minioadmin".to_string(), force_path_style: true, + key_sharding: Default::default(), }), cleaner: s3dedup::cleaner::CleanerConfig::default(), filetracker_url: None,