Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/cleaner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::kvstorage::KVStorage;
use crate::locks::{self, LocksStorage};
use crate::s3storage::S3Storage;
use anyhow::Result;
use serde::Deserialize;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct Cleaner {
bucket_name: String,
kvstorage: Arc<Mutex<Box<KVStorage>>>,
s3storage: Arc<Mutex<Box<S3Storage>>>,
locks: Arc<LocksStorage>,
config: CleanerConfig,
}

Expand All @@ -54,12 +56,14 @@ impl Cleaner {
bucket_name: String,
kvstorage: Arc<Mutex<Box<KVStorage>>>,
s3storage: Arc<Mutex<Box<S3Storage>>>,
locks: Arc<LocksStorage>,
config: CleanerConfig,
) -> Self {
Self {
bucket_name,
kvstorage,
s3storage,
locks,
config,
}
}
Expand Down Expand Up @@ -308,6 +312,18 @@ impl Cleaner {
}

for key in keys {
// Acquire hash lock before checking refcount and deleting
// This prevents race with PUT operations that might be incrementing refcount
let hash_lock_key = locks::hash_lock(&self.bucket_name, &key);
let hash_lock = self.locks.prepare_lock(hash_lock_key).await;
let hash_guard = match hash_lock.acquire_exclusive().await {
Ok(g) => g,
Err(e) => {
error!("Failed to acquire hash lock for {}: {}", key, e);
continue;
}
};

let refcount = self
.kvstorage
.lock()
Expand All @@ -321,15 +337,19 @@ impl Cleaner {
// Delete the S3 object
if let Err(e) = self.s3storage.lock().await.delete_object(&key).await {
error!("Failed to delete S3 object {}: {}", key, e);
let _ = hash_guard.release().await;
continue;
}

deleted_count += 1;

if deleted_count >= self.config.max_deletes_per_run {
let _ = hash_guard.release().await;
return Ok(deleted_count);
}
}

let _ = hash_guard.release().await;
}

continuation_token = next_token;
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct HealthChecks {
pub struct AppState {
pub bucket_name: String,
pub kvstorage: Arc<Mutex<Box<kvstorage::KVStorage>>>,
pub locks: Box<locks::LocksStorage>,
pub locks: Arc<locks::LocksStorage>,
pub s3storage: Arc<Mutex<Box<s3storage::S3Storage>>>,
pub filetracker_client: Option<Arc<filetracker_client::FiletrackerClient>>,
pub metrics: Arc<metrics::Metrics>,
Expand All @@ -46,7 +46,7 @@ impl AppState {
Ok(Arc::new(Self {
bucket_name: config.bucket.name.clone(),
kvstorage: Arc::new(Mutex::new(kvstorage)),
locks,
locks: Arc::new(*locks),
s3storage: Arc::new(Mutex::new(s3storage)),
filetracker_client: None,
metrics,
Expand All @@ -69,7 +69,7 @@ impl AppState {
Ok(Arc::new(Self {
bucket_name: config.bucket.name.clone(),
kvstorage: Arc::new(Mutex::new(kvstorage)),
locks,
locks: Arc::new(*locks),
s3storage: Arc::new(Mutex::new(s3storage)),
filetracker_client: Some(Arc::new(filetracker_client)),
metrics,
Expand Down
3 changes: 1 addition & 2 deletions src/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ pub(crate) fn file_lock(bucket: &str, path: &str) -> String {
/**
* Get key for lock on hash
*/
#[allow(dead_code)]
fn hash_lock(bucket: &str, hash: &str) -> String {
pub(crate) fn hash_lock(bucket: &str, hash: &str) -> String {
format!("hash:{}:{}", bucket, hash)
}

Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ fn start_background_tasks(app_state: Arc<AppState>, bucket_config: &config::Buck
bucket_config.name.clone(),
app_state.kvstorage.clone(),
app_state.s3storage.clone(),
app_state.locks.clone(),
bucket_config.cleaner.clone(),
));
cleaner.start();
Expand Down
66 changes: 66 additions & 0 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ pub async fn migrate_single_file_from_metadata(
return Ok(());
}

// Acquire hash lock for S3/refcount operations
let hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &digest);
let hash_lock = locks.prepare_lock(hash_lock_key).await;
let hash_guard = hash_lock
.acquire_exclusive()
.await
.context("Failed to acquire hash lock for migration")?;

// Check if blob already exists in S3
let blob_exists = app_state
.s3storage
Expand Down Expand Up @@ -227,6 +235,9 @@ pub async fn migrate_single_file_from_metadata(
.increment_ref_count(&app_state.bucket_name, &digest)
.await?;

// Release new hash lock
let _ = hash_guard.release().await;

// Handle overwriting existing file
if current_modified > 0 {
let old_hash = app_state
Expand All @@ -237,6 +248,14 @@ pub async fn migrate_single_file_from_metadata(
.await?;

if !old_hash.is_empty() && old_hash != digest {
// Acquire lock on old hash before decrement
let old_hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &old_hash);
let old_hash_lock = locks.prepare_lock(old_hash_lock_key).await;
let old_hash_guard = old_hash_lock
.acquire_exclusive()
.await
.context("Failed to acquire old hash lock for migration")?;

// Decrement old reference count atomically and get new count
let old_ref_count = app_state
.kvstorage
Expand All @@ -259,6 +278,9 @@ pub async fn migrate_single_file_from_metadata(
old_hash, e
);
}

// Release old hash lock
let _ = old_hash_guard.release().await;
}
}

Expand Down Expand Up @@ -340,6 +362,14 @@ async fn migrate_single_file(
return Ok(false);
}

// Acquire hash lock for S3/refcount operations
let hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &digest);
let hash_lock = locks_storage.prepare_lock(hash_lock_key).await;
let hash_guard = hash_lock
.acquire_exclusive()
.await
.context("Failed to acquire hash lock for migration")?;

// Check if blob already exists in S3
let blob_exists = app_state
.s3storage
Expand Down Expand Up @@ -374,6 +404,9 @@ async fn migrate_single_file(
.increment_ref_count(&app_state.bucket_name, &digest)
.await?;

// Release new hash lock
let _ = hash_guard.release().await;

// Handle overwriting existing file
if current_modified > 0 {
let old_hash = app_state
Expand All @@ -384,6 +417,14 @@ async fn migrate_single_file(
.await?;

if !old_hash.is_empty() && old_hash != digest {
// Acquire lock on old hash before decrement
let old_hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &old_hash);
let old_hash_lock = locks_storage.prepare_lock(old_hash_lock_key).await;
let old_hash_guard = old_hash_lock
.acquire_exclusive()
.await
.context("Failed to acquire old hash lock for migration")?;

// Decrement old reference count atomically and get new count
let old_ref_count = app_state
.kvstorage
Expand All @@ -406,6 +447,9 @@ async fn migrate_single_file(
old_hash, e
);
}

// Release old hash lock
let _ = old_hash_guard.release().await;
}
}

Expand Down Expand Up @@ -684,6 +728,14 @@ async fn migrate_single_file_from_v1_fs(
return Ok(false);
}

// Acquire hash lock for S3/refcount operations
let hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &digest);
let hash_lock = locks_storage.prepare_lock(hash_lock_key).await;
let hash_guard = hash_lock
.acquire_exclusive()
.await
.context("Failed to acquire hash lock for migration")?;

// Check if blob already exists in S3
let blob_exists = app_state
.s3storage
Expand Down Expand Up @@ -726,6 +778,9 @@ async fn migrate_single_file_from_v1_fs(
.increment_ref_count(&app_state.bucket_name, &digest)
.await?;

// Release new hash lock
let _ = hash_guard.release().await;

// Handle overwriting existing file
if current_modified > 0 {
let old_hash = app_state
Expand All @@ -736,6 +791,14 @@ async fn migrate_single_file_from_v1_fs(
.await?;

if !old_hash.is_empty() && old_hash != digest {
// Acquire lock on old hash before decrement
let old_hash_lock_key = crate::locks::hash_lock(&app_state.bucket_name, &old_hash);
let old_hash_lock = locks_storage.prepare_lock(old_hash_lock_key).await;
let old_hash_guard = old_hash_lock
.acquire_exclusive()
.await
.context("Failed to acquire old hash lock for migration")?;

// Decrement old reference count atomically and get new count
let old_ref_count = app_state
.kvstorage
Expand All @@ -758,6 +821,9 @@ async fn migrate_single_file_from_v1_fs(
old_hash, e
);
}

// Release old hash lock
let _ = old_hash_guard.release().await;
}
}

Expand Down
26 changes: 23 additions & 3 deletions src/routes/ft/delete_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,23 @@ pub async fn ft_delete_file(
.unwrap();
}

// 6. Decrement reference count atomically and get new count
// 6. Acquire hash lock before refcount operations
let hash_lock_key = locks::hash_lock(&state.bucket_name, &hash);
let hash_lock = locks_storage.prepare_lock(hash_lock_key).await;
let hash_guard = match hash_lock.acquire_exclusive().await {
Ok(g) => g,
Err(e) => {
error!("Failed to acquire hash lock: {}", e);
record_metrics("500");
let _ = guard.release().await;
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body("Failed to acquire hash lock".to_string())
.unwrap();
}
};

// 7. Decrement reference count atomically and get new count
let ref_count = match state
.kvstorage
.lock()
Expand All @@ -146,6 +162,7 @@ pub async fn ft_delete_file(
Err(e) => {
error!("Failed to decrement ref count: {}", e);
record_metrics("500");
let _ = hash_guard.release().await;
let _ = guard.release().await;
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
Expand All @@ -154,7 +171,7 @@ pub async fn ft_delete_file(
}
};

// 7. Check if we should delete the blob (ref count is now 0)
// 8. Check if we should delete the blob (ref count is now 0)
if ref_count <= 0 {
debug!("Deleting blob with hash: {}", hash);
// Delete blob from S3
Expand All @@ -172,7 +189,10 @@ pub async fn ft_delete_file(
.await;
}

// 8. Delete file metadata (path -> hash mapping and timestamp)
// Release hash lock - done with refcount/S3 operations
let _ = hash_guard.release().await;

// 9. Delete file metadata (path -> hash mapping and timestamp)
if let Err(e) = state
.kvstorage
.lock()
Expand Down
Loading