Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use crate::kvstorage::KVStorageType;
pub use crate::kvstorage::postgres::PostgresConfig;
pub use crate::kvstorage::sqlite::SQLiteConfig;
pub use crate::locks::LocksType;
pub use crate::s3storage::KeyShardingConfig;
pub use crate::s3storage::S3StorageType;
pub use crate::s3storage::minio::MinIOConfig;

Expand Down Expand Up @@ -203,6 +204,16 @@ impl BucketConfig {
if let Ok(val) = std::env::var("S3_SECRET_KEY") {
minio.secret_key = val;
}
if let Ok(val) = std::env::var("S3_KEY_SHARDING_ENABLED")
&& let Ok(enabled) = val.parse()
{
minio.key_sharding.enabled = enabled;
}
if let Ok(val) = std::env::var("S3_KEY_SHARDING_DEPTH")
&& let Ok(depth) = val.parse()
{
minio.key_sharding.depth = depth;
}
}

// Filetracker URL for live migration
Expand All @@ -229,6 +240,16 @@ impl BucketConfig {
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(true),
key_sharding: KeyShardingConfig {
enabled: std::env::var("S3_KEY_SHARDING_ENABLED")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(true),
depth: std::env::var("S3_KEY_SHARDING_DEPTH")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2),
},
})
} else {
None
Expand Down
64 changes: 56 additions & 8 deletions src/filetracker_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,19 @@ impl FiletrackerClient {
let response = self.client.get(&url).send().await?;

if !response.status().is_success() {
error!("Failed to list files: HTTP {}", response.status());
bail!("HTTP {}", response.status())
let status = response.status();
let x_exception = response
.headers()
.get("X-Exception")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let body = response.text().await.unwrap_or_default();
error!(
"Failed to list files: HTTP {} - X-Exception: {} - Body: {}",
status, x_exception, body
);
bail!("HTTP {} - {}", status, x_exception)
}

let body = response.text().await?;
Expand All @@ -80,8 +91,19 @@ impl FiletrackerClient {
}

if !response.status().is_success() {
error!("Failed to get file: HTTP {}", response.status());
bail!("HTTP {}", response.status())
let status = response.status();
let x_exception = response
.headers()
.get("X-Exception")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let body = response.text().await.unwrap_or_default();
error!(
"Failed to get file: HTTP {} - X-Exception: {} - Body: {}",
status, x_exception, body
);
bail!("HTTP {} - {}", status, x_exception)
}

// Extract headers
Expand Down Expand Up @@ -150,6 +172,10 @@ impl FiletrackerClient {

let mut request = self.client.put(&url).body(data);

// V1 filetracker reads timestamp from Last-Modified header, not query param
// We send both for compatibility with V1 and V2
request = request.header("Last-Modified", &timestamp_rfc2822);

if is_compressed {
request = request.header("Content-Encoding", "gzip");
}
Expand All @@ -161,8 +187,19 @@ impl FiletrackerClient {
let response = request.send().await?;

if !response.status().is_success() {
error!("Failed to put file: HTTP {}", response.status());
bail!("HTTP {}", response.status())
let status = response.status();
let x_exception = response
.headers()
.get("X-Exception")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let body = response.text().await.unwrap_or_default();
error!(
"Failed to put file: HTTP {} - X-Exception: {} - Body: {}",
status, x_exception, body
);
bail!("HTTP {} - {}", status, x_exception)
}

debug!("Put file to filetracker successfully");
Expand Down Expand Up @@ -192,8 +229,19 @@ impl FiletrackerClient {
}

if !response.status().is_success() {
error!("Failed to delete file: HTTP {}", response.status());
bail!("HTTP {}", response.status())
let status = response.status();
let x_exception = response
.headers()
.get("X-Exception")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let body = response.text().await.unwrap_or_default();
error!(
"Failed to delete file: HTTP {} - X-Exception: {} - Body: {}",
status, x_exception, body
);
bail!("HTTP {} - {}", status, x_exception)
}

debug!("Deleted file from filetracker successfully");
Expand Down
10 changes: 6 additions & 4 deletions src/routes/ft/get_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,17 @@ pub async fn ft_get_file(
.observe(start.elapsed().as_secs_f64());

// 7. Return file with appropriate headers (matching original filetracker)
let last_modified_header = utils::format_rfc2822_timestamp(modified_time);
debug!(
"GET {} returning Last-Modified: {} (unix: {})",
path, last_modified_header, modified_time
);
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/octet-stream")
.header("Content-Length", blob_data.len().to_string())
.header("Content-Encoding", "gzip")
.header(
"Last-Modified",
utils::format_rfc2822_timestamp(modified_time),
)
.header("Last-Modified", last_modified_header)
.header("Logical-Size", logical_size.to_string())
.body(Body::from(blob_data))
.unwrap()
Expand Down
40 changes: 34 additions & 6 deletions src/routes/ft/put_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,21 @@ pub async fn ft_put_file(
};

// 1. Parse and validate timestamp (required for PUT)
let raw_timestamp = headers
.get("last-modified")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.or_else(|| query.last_modified.clone());
let timestamp = match utils::extract_timestamp(&headers, query.last_modified.as_ref(), true) {
Ok(ts) => ts,
Ok(ts) => {
debug!(
"PUT {} timestamp: raw='{}' -> parsed={}",
path,
raw_timestamp.as_deref().unwrap_or("none"),
ts
);
ts
}
Err(e) => {
error!("Failed to extract timestamp: {}", e);
record_metrics("400");
Expand Down Expand Up @@ -475,16 +488,26 @@ pub async fn ft_put_file(
if let Some(filetracker_client) = &state.filetracker_client {
debug!("Live migration mode: also writing to filetracker");

// Reconstruct the data that needs to be sent to filetracker
// We need to use the final_data (compressed) that was stored
// V1 filetracker doesn't understand compression - it stores files uncompressed.
// We need to decompress before sending, otherwise V1 stores gzip bytes
// and later returns them without Content-Encoding header, causing corruption.
let uncompressed_for_v1 = match storage_helpers::decompress_gzip(&final_data) {
Ok(data) => data,
Err(e) => {
warn!("Failed to decompress for V1 filetracker dual-write: {}", e);
// Skip dual-write if decompression fails
final_data.clone()
}
};

let result = filetracker_client
.put_file(
path,
final_data.clone(),
uncompressed_for_v1,
timestamp,
logical_size,
&digest,
true, // Always compressed in storage
false, // V1 filetracker stores uncompressed
)
.await;

Expand All @@ -500,10 +523,15 @@ pub async fn ft_put_file(
}

record_metrics("200");
let last_modified_header = utils::format_rfc2822_timestamp(timestamp);
debug!(
"PUT {} complete, returning Last-Modified: {} (unix: {})",
path, last_modified_header, timestamp
);
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain")
.header("Last-Modified", utils::format_rfc2822_timestamp(timestamp))
.header("Last-Modified", last_modified_header)
.body("".to_string())
.unwrap()
}
Expand Down
Loading