diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 0a5adaa..27ac7f0 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -4,10 +4,16 @@ use crate::routes::ft::storage_helpers; use anyhow::{Context, Result}; use futures_util::future::join_all; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::Semaphore; use tracing::{error, info, warn}; +/// Default number of retries for failed file migrations +const DEFAULT_MAX_RETRIES: u32 = 3; + +/// Base delay for exponential backoff (in milliseconds) +const RETRY_BASE_DELAY_MS: u64 = 1000; + pub mod v1_filesystem; pub struct MigrationStats { @@ -17,7 +23,19 @@ pub struct MigrationStats { pub skipped: usize, } +/// Result of a single file migration attempt with retries +enum MigrationResult { + /// File was successfully migrated + Migrated, + /// File was skipped (already exists with same or newer version) + Skipped, + /// File failed permanently after all retries + PermanentFailure(String, anyhow::Error), +} + /// Migrate all files from filetracker to s3dedup +/// +/// If any file fails to migrate after retries, the entire migration is aborted. pub async fn migrate_all_files( filetracker_client: Arc, app_state: Arc, @@ -25,6 +43,10 @@ pub async fn migrate_all_files( ) -> Result { info!("Starting offline migration from filetracker to s3dedup"); info!("Max concurrency: {}", max_concurrency); + info!( + "Retry policy: {} retries with exponential backoff", + DEFAULT_MAX_RETRIES + ); // List all files from filetracker // Don't pass timestamp parameter to avoid triggering a bug in the original Filetracker server @@ -44,9 +66,14 @@ pub async fn migrate_all_files( } // Track stats - let migrated = Arc::new(tokio::sync::Mutex::new(0usize)); - let failed = Arc::new(tokio::sync::Mutex::new(0usize)); - let skipped = Arc::new(tokio::sync::Mutex::new(0usize)); + let migrated = Arc::new(AtomicUsize::new(0)); + let skipped = Arc::new(AtomicUsize::new(0)); + + // Abort flag - set to true if any file permanently fails + let abort_flag = Arc::new(AtomicBool::new(false)); + // Store the first permanent failure for error reporting + let first_failure: Arc>> = + Arc::new(tokio::sync::Mutex::new(None)); // Process files in batches to avoid spawning millions of tasks // Use batch size = max_concurrency * 10 to keep task overhead reasonable @@ -54,6 +81,12 @@ pub async fn migrate_all_files( let semaphore = Arc::new(Semaphore::new(max_concurrency)); for (batch_idx, batch) in files.chunks(batch_size).enumerate() { + // Check if we should abort before starting a new batch + if abort_flag.load(Ordering::SeqCst) { + info!("Migration aborted due to permanent failure, stopping batch processing"); + break; + } + let batch_start = batch_idx * batch_size; info!( "Processing batch {}/{} (files {}-{})", @@ -69,45 +102,67 @@ pub async fn migrate_all_files( let filetracker_client = filetracker_client.clone(); let app_state = app_state.clone(); let migrated = migrated.clone(); - let failed = failed.clone(); let skipped = skipped.clone(); let semaphore = semaphore.clone(); + let abort_flag = abort_flag.clone(); + let first_failure = first_failure.clone(); let path = path.clone(); let file_idx = batch_start + idx_in_batch; let handle = tokio::spawn(async move { + // Check abort flag before starting + if abort_flag.load(Ordering::SeqCst) { + return; + } + // Acquire semaphore permit let _permit = match semaphore.acquire().await { Ok(permit) => permit, Err(_) => { error!("Semaphore closed unexpectedly for file: {}", path); - *failed.lock().await += 1; return; } }; + // Check abort flag again after acquiring permit + if abort_flag.load(Ordering::SeqCst) { + return; + } + // Log progress every 100 files if file_idx.is_multiple_of(100) && file_idx > 0 { - let current_migrated = *migrated.lock().await; - let current_failed = *failed.lock().await; - let current_skipped = *skipped.lock().await; + let current_migrated = migrated.load(Ordering::Relaxed); + let current_skipped = skipped.load(Ordering::Relaxed); info!( - "Progress: {}/{} (migrated: {}, skipped: {}, failed: {})", - file_idx, total_files, current_migrated, current_skipped, current_failed + "Progress: {}/{} (migrated: {}, skipped: {})", + file_idx, total_files, current_migrated, current_skipped ); } - // Migrate the file - match migrate_single_file(&filetracker_client, app_state, &path).await { - Ok(true) => { - *migrated.lock().await += 1; + // Migrate the file with retries + match migrate_single_file_with_retry( + &filetracker_client, + app_state, + &path, + DEFAULT_MAX_RETRIES, + ) + .await + { + MigrationResult::Migrated => { + migrated.fetch_add(1, Ordering::Relaxed); } - Ok(false) => { - *skipped.lock().await += 1; + MigrationResult::Skipped => { + skipped.fetch_add(1, Ordering::Relaxed); } - Err(e) => { - error!("Failed to migrate file {}: {}", path, e); - *failed.lock().await += 1; + MigrationResult::PermanentFailure(failed_path, err) => { + // Signal abort to all other tasks + abort_flag.store(true, Ordering::SeqCst); + + // Store the first failure + let mut failure_guard = first_failure.lock().await; + if failure_guard.is_none() { + *failure_guard = Some((failed_path, err.to_string())); + } } } }); @@ -116,28 +171,145 @@ pub async fn migrate_all_files( } // Wait for this batch to complete before moving to next batch - // TODO: Propagate errors? (Tokio returns `Err`, when thread from `JoinHandle` panicked). let _ = join_all(handles).await; } - let migrated_count = *migrated.lock().await; - let failed_count = *failed.lock().await; - let skipped_count = *skipped.lock().await; + let migrated_count = migrated.load(Ordering::Relaxed); + let skipped_count = skipped.load(Ordering::Relaxed); + + // Check if migration was aborted + if abort_flag.load(Ordering::SeqCst) { + let failure_info = first_failure.lock().await; + if let Some((path, error_msg)) = failure_info.as_ref() { + error!( + "Migration aborted due to permanent failure of file '{}': {}", + path, error_msg + ); + anyhow::bail!( + "Migration failed: file '{}' failed after {} retries: {}", + path, + DEFAULT_MAX_RETRIES, + error_msg + ); + } else { + anyhow::bail!("Migration failed: unknown permanent failure"); + } + } info!("Migration complete:"); info!(" Total files: {}", total_files); info!(" Migrated: {}", migrated_count); info!(" Skipped: {}", skipped_count); - info!(" Failed: {}", failed_count); Ok(MigrationStats { total_files, migrated: migrated_count, - failed: failed_count, + failed: 0, skipped: skipped_count, }) } +/// Migrate a single file with retry logic and exponential backoff +async fn migrate_single_file_with_retry( + filetracker_client: &FiletrackerClient, + app_state: Arc, + path: &str, + max_retries: u32, +) -> MigrationResult { + let mut last_error = None; + + for attempt in 0..=max_retries { + if attempt > 0 { + // Exponential backoff: 1s, 2s, 4s, ... + let delay_ms = RETRY_BASE_DELAY_MS * (1 << (attempt - 1)); + warn!( + "Retrying migration of '{}' (attempt {}/{}) after {}ms delay", + path, + attempt + 1, + max_retries + 1, + delay_ms + ); + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + } + + match migrate_single_file(filetracker_client, app_state.clone(), path).await { + Ok(true) => return MigrationResult::Migrated, + Ok(false) => return MigrationResult::Skipped, + Err(e) => { + if attempt < max_retries { + warn!( + "Migration attempt {} for '{}' failed: {}", + attempt + 1, + path, + e + ); + } + last_error = Some(e); + } + } + } + + // All retries exhausted + let err = last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error")); + error!( + "Permanent failure migrating '{}' after {} attempts: {}", + path, + max_retries + 1, + err + ); + MigrationResult::PermanentFailure(path.to_string(), err) +} + +/// Migrate a single V1 file with retry logic and exponential backoff +async fn migrate_single_file_from_v1_fs_with_retry( + app_state: Arc, + file_info: &v1_filesystem::V1FileInfo, + max_retries: u32, +) -> MigrationResult { + let mut last_error = None; + + for attempt in 0..=max_retries { + if attempt > 0 { + // Exponential backoff: 1s, 2s, 4s, ... + let delay_ms = RETRY_BASE_DELAY_MS * (1 << (attempt - 1)); + warn!( + "Retrying V1 migration of '{}' (attempt {}/{}) after {}ms delay", + file_info.relative_path, + attempt + 1, + max_retries + 1, + delay_ms + ); + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + } + + match migrate_single_file_from_v1_fs(app_state.clone(), file_info).await { + Ok(true) => return MigrationResult::Migrated, + Ok(false) => return MigrationResult::Skipped, + Err(e) => { + if attempt < max_retries { + warn!( + "V1 migration attempt {} for '{}' failed: {}", + attempt + 1, + file_info.relative_path, + e + ); + } + last_error = Some(e); + } + } + } + + // All retries exhausted + let err = last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error")); + error!( + "Permanent failure migrating V1 file '{}' after {} attempts: {}", + file_info.relative_path, + max_retries + 1, + err + ); + MigrationResult::PermanentFailure(file_info.relative_path.clone(), err) +} + /// Migrate a single file from filetracker metadata (for on-the-fly migration during GET) pub async fn migrate_single_file_from_metadata( app_state: &AppState, @@ -509,6 +681,8 @@ pub async fn live_migration_worker( /// /// This function uses chunked processing to avoid loading all file metadata into memory, /// making it suitable for directories with millions of files. +/// +/// If any file fails to migrate after retries, the entire migration is aborted. pub async fn migrate_all_files_from_v1_fs( v1_dir: &str, app_state: Arc, @@ -519,15 +693,24 @@ pub async fn migrate_all_files_from_v1_fs( v1_dir ); info!("Max concurrency: {}", max_concurrency); + info!( + "Retry policy: {} retries with exponential backoff", + DEFAULT_MAX_RETRIES + ); info!("Processing directory in chunks to handle large file counts efficiently"); // Track stats across all chunks using atomics to avoid async locks in sync context let total_files = Arc::new(AtomicUsize::new(0)); let migrated = Arc::new(AtomicUsize::new(0)); - let failed = Arc::new(AtomicUsize::new(0)); let skipped = Arc::new(AtomicUsize::new(0)); let semaphore = Arc::new(Semaphore::new(max_concurrency)); + // Abort flag - set to true if any file permanently fails + let abort_flag = Arc::new(AtomicBool::new(false)); + // Store the first permanent failure for error reporting + let first_failure: Arc>> = + Arc::new(tokio::sync::Mutex::new(None)); + // Chunk size for filesystem walking: 10,000 files per chunk // This keeps memory usage reasonable while still being efficient let filesystem_chunk_size = 10_000; @@ -560,6 +743,12 @@ pub async fn migrate_all_files_from_v1_fs( // Process chunks as they arrive let mut chunk_count = 0; while let Some(file_chunk) = chunk_rx.recv().await { + // Check if we should abort before processing a new chunk + if abort_flag.load(Ordering::SeqCst) { + info!("V1 migration aborted due to permanent failure, stopping chunk processing"); + break; + } + chunk_count += 1; let chunk_size = file_chunk.len(); total_files.fetch_add(chunk_size, Ordering::Relaxed); @@ -573,17 +762,29 @@ pub async fn migrate_all_files_from_v1_fs( // Process this chunk in task batches let total_batches = file_chunk.chunks(task_batch_size).len(); for (batch_idx, batch) in file_chunk.chunks(task_batch_size).enumerate() { + // Check if we should abort before starting a new batch + if abort_flag.load(Ordering::SeqCst) { + info!("V1 migration aborted due to permanent failure, stopping batch processing"); + break; + } + let mut handles = vec![]; for file_info in batch.iter() { let app_state = app_state.clone(); let migrated = migrated.clone(); - let failed = failed.clone(); let skipped = skipped.clone(); let semaphore = semaphore.clone(); + let abort_flag = abort_flag.clone(); + let first_failure = first_failure.clone(); let file_info = file_info.clone(); let handle = tokio::spawn(async move { + // Check abort flag before starting + if abort_flag.load(Ordering::SeqCst) { + return; + } + // Acquire semaphore permit let _permit = match semaphore.acquire().await { Ok(permit) => permit, @@ -592,22 +793,38 @@ pub async fn migrate_all_files_from_v1_fs( "Semaphore closed unexpectedly for file: {}", file_info.relative_path ); - failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); return; } }; - // Migrate the file - match migrate_single_file_from_v1_fs(app_state, &file_info).await { - Ok(true) => { + // Check abort flag again after acquiring permit + if abort_flag.load(Ordering::SeqCst) { + return; + } + + // Migrate the file with retries + match migrate_single_file_from_v1_fs_with_retry( + app_state, + &file_info, + DEFAULT_MAX_RETRIES, + ) + .await + { + MigrationResult::Migrated => { migrated.fetch_add(1, Ordering::Relaxed); } - Ok(false) => { + MigrationResult::Skipped => { skipped.fetch_add(1, Ordering::Relaxed); } - Err(e) => { - error!("Failed to migrate file {}: {}", file_info.relative_path, e); - failed.fetch_add(1, Ordering::Relaxed); + MigrationResult::PermanentFailure(failed_path, err) => { + // Signal abort to all other tasks + abort_flag.store(true, Ordering::SeqCst); + + // Store the first failure + let mut failure_guard = first_failure.lock().await; + if failure_guard.is_none() { + *failure_guard = Some((failed_path, err.to_string())); + } } } }); @@ -621,44 +838,66 @@ pub async fn migrate_all_files_from_v1_fs( // Log progress periodically if batch_idx % 10 == 0 || batch_idx == total_batches - 1 { let current_migrated = migrated.load(Ordering::Relaxed); - let current_failed = failed.load(Ordering::Relaxed); let current_skipped = skipped.load(Ordering::Relaxed); info!( - "Progress: {} files discovered (migrated: {}, skipped: {}, failed: {})", - total_so_far, current_migrated, current_skipped, current_failed + "Progress: {} files discovered (migrated: {}, skipped: {})", + total_so_far, current_migrated, current_skipped ); } } } - // Wait for walker to complete + // Wait for walker to complete (ignore errors if we're aborting) match walker_handle.await { Ok(Ok(())) => {} Ok(Err(e)) => { - error!("Filesystem walker failed: {}", e); - anyhow::bail!("Filesystem walker failed: {}", e); + // Only report walker errors if we're not already aborting + if !abort_flag.load(Ordering::SeqCst) { + error!("Filesystem walker failed: {}", e); + anyhow::bail!("Filesystem walker failed: {}", e); + } } Err(e) => { - error!("Walker task panicked: {}", e); - anyhow::bail!("Walker task panicked: {}", e); + // Only report walker panics if we're not already aborting + if !abort_flag.load(Ordering::SeqCst) { + error!("Walker task panicked: {}", e); + anyhow::bail!("Walker task panicked: {}", e); + } } } let total_count = total_files.load(Ordering::Relaxed); let migrated_count = migrated.load(Ordering::Relaxed); - let failed_count = failed.load(Ordering::Relaxed); let skipped_count = skipped.load(Ordering::Relaxed); + // Check if migration was aborted + if abort_flag.load(Ordering::SeqCst) { + let failure_info = first_failure.lock().await; + if let Some((path, error_msg)) = failure_info.as_ref() { + error!( + "V1 migration aborted due to permanent failure of file '{}': {}", + path, error_msg + ); + anyhow::bail!( + "V1 migration failed: file '{}' failed after {} retries: {}", + path, + DEFAULT_MAX_RETRIES, + error_msg + ); + } else { + anyhow::bail!("V1 migration failed: unknown permanent failure"); + } + } + info!("V1 filesystem migration complete:"); info!(" Total files: {}", total_count); info!(" Migrated: {}", migrated_count); info!(" Skipped: {}", skipped_count); - info!(" Failed: {}", failed_count); Ok(MigrationStats { total_files: total_count, migrated: migrated_count, - failed: failed_count, + failed: 0, skipped: skipped_count, }) }