diff --git a/Cargo.lock b/Cargo.lock index a61df6c..25d4917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2788,7 +2788,7 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "s3dedup" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index c628141..6880dd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "s3dedup" -version = "1.1.0" +version = "1.1.1" edition = "2024" [lib] diff --git a/scripts/metrics.sh b/scripts/metrics.sh index ac30958..8e3a19f 100755 --- a/scripts/metrics.sh +++ b/scripts/metrics.sh @@ -1,7 +1,7 @@ #!/bin/bash # Get metrics URL from first argument or use default -METRICS_URL="${1:-localhost:9999/metrics}" +METRICS_URL="${1:-localhost:8080/metrics}" while true; do clear @@ -103,4 +103,4 @@ while true; do echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" sleep 5 -done \ No newline at end of file +done diff --git a/src/cleaner/mod.rs b/src/cleaner/mod.rs index 52073cc..3fbc0e3 100644 --- a/src/cleaner/mod.rs +++ b/src/cleaner/mod.rs @@ -1,9 +1,14 @@ use crate::kvstorage::KVStorage; use crate::locks::{self, LocksStorage}; +use crate::metrics::{ + CLEANER_DELETED_BLOBS_TOTAL, CLEANER_ERRORS_TOTAL, CLEANER_FREED_BYTES_TOTAL, + CLEANER_LAST_RUN_TIMESTAMP, CLEANER_TOTAL_RUNS, +}; use crate::s3storage::S3Storage; use anyhow::Result; use serde::Deserialize; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; @@ -89,6 +94,9 @@ impl Cleaner { info!("Running cleanup cycle for bucket: {}", self.bucket_name); if let Err(e) = self.run_cleanup().await { + CLEANER_ERRORS_TOTAL + .with_label_values(&[&self.bucket_name]) + .inc(); error!( "Cleanup cycle failed for bucket {}: {}", self.bucket_name, e @@ -100,7 +108,21 @@ impl Cleaner { /// Run a full cleanup cycle pub async fn run_cleanup(&self) -> Result<()> { + // Increment run counter and update timestamp at start + // This ensures both metrics are consistent even if the run fails + CLEANER_TOTAL_RUNS + .with_label_values(&[&self.bucket_name]) + .inc(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + CLEANER_LAST_RUN_TIMESTAMP + .with_label_values(&[&self.bucket_name]) + .set(timestamp); + let mut total_deletes = 0; + let mut total_bytes_freed: u64 = 0; // Case 1: Clean ref_files pointing to non-existent hashes info!("Phase 1: Cleaning ref_files with missing hashes"); @@ -111,6 +133,7 @@ impl Cleaner { "Reached max deletes limit ({}) in phase 1, stopping cleanup cycle", self.config.max_deletes_per_run ); + self.update_cleanup_metrics(total_deletes, total_bytes_freed); return Ok(()); } @@ -123,18 +146,22 @@ impl Cleaner { "Reached max deletes limit ({}) in phase 2, stopping cleanup cycle", self.config.max_deletes_per_run ); + self.update_cleanup_metrics(total_deletes, total_bytes_freed); return Ok(()); } // Case 3: Clean S3 objects with no refcount or refcount = 0 info!("Phase 3: Cleaning S3 objects with no refcount or refcount = 0"); - total_deletes += self.clean_unused_s3_objects().await?; + let (s3_deletes, s3_bytes_freed) = self.clean_unused_s3_objects().await?; + total_deletes += s3_deletes; + total_bytes_freed += s3_bytes_freed; if total_deletes >= self.config.max_deletes_per_run { warn!( "Reached max deletes limit ({}) in phase 3, stopping cleanup cycle", self.config.max_deletes_per_run ); + self.update_cleanup_metrics(total_deletes, total_bytes_freed); return Ok(()); } @@ -143,13 +170,36 @@ impl Cleaner { total_deletes += self.clean_orphaned_logical_sizes().await?; info!( - "Cleanup cycle complete for bucket: {} (total items deleted: {})", - self.bucket_name, total_deletes + "Cleanup cycle complete for bucket: {} (total items deleted: {}, bytes freed: {})", + self.bucket_name, total_deletes, total_bytes_freed ); + self.update_cleanup_metrics(total_deletes, total_bytes_freed); Ok(()) } + /// Update cleanup metrics after a run + fn update_cleanup_metrics(&self, total_deletes: usize, total_bytes_freed: u64) { + // Update timestamp (use unwrap_or to handle edge case of system time before UNIX_EPOCH) + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + CLEANER_LAST_RUN_TIMESTAMP + .with_label_values(&[&self.bucket_name]) + .set(timestamp); + + // Update deleted blobs counter + CLEANER_DELETED_BLOBS_TOTAL + .with_label_values(&[&self.bucket_name]) + .inc_by(total_deletes as u64); + + // Update freed bytes counter + CLEANER_FREED_BYTES_TOTAL + .with_label_values(&[&self.bucket_name]) + .inc_by(total_bytes_freed); + } + /// Clean ref_files that point to non-existent hashes in refcount table async fn clean_orphaned_ref_files(&self) -> Result { let mut deleted_count = 0; @@ -327,8 +377,10 @@ impl Cleaner { } /// Clean S3 objects that have no refcount or refcount = 0 - async fn clean_unused_s3_objects(&self) -> Result { + /// Returns (deleted_count, bytes_freed) + async fn clean_unused_s3_objects(&self) -> Result<(usize, u64)> { let mut deleted_count = 0; + let mut bytes_freed: u64 = 0; let mut continuation_token: Option = None; loop { @@ -373,6 +425,15 @@ impl Cleaner { if refcount == 0 { debug!("Found unused S3 object: key={} (refcount=0)", key); + // Get compressed size before deleting (for metrics) + let compressed_size = self + .kvstorage + .lock() + .await + .get_compressed_size(&self.bucket_name, &key) + .await + .unwrap_or(0); + // Delete the S3 object if let Err(e) = self.s3storage.lock().await.delete_object(&key).await { error!( @@ -386,12 +447,13 @@ impl Cleaner { } deleted_count += 1; + bytes_freed += compressed_size as u64; if deleted_count >= self.config.max_deletes_per_run { if let Err(e) = hash_guard.release().await { warn!("Failed to release hash lock: {}", e); } - return Ok(deleted_count); + return Ok((deleted_count, bytes_freed)); } } @@ -406,7 +468,7 @@ impl Cleaner { } } - Ok(deleted_count) + Ok((deleted_count, bytes_freed)) } /// Clean logical_size entries that have no corresponding refcount diff --git a/src/locks/memory.rs b/src/locks/memory.rs index 9546748..ffc977a 100644 --- a/src/locks/memory.rs +++ b/src/locks/memory.rs @@ -1,5 +1,7 @@ use crate::locks::{ExclusiveLockGuard, Lock, LockStorage, SharedLockGuard}; +use crate::metrics::LOCK_QUEUE_SIZE; use async_trait::async_trait; +use std::sync::atomic::{AtomicI64, Ordering}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::spawn_blocking; @@ -10,6 +12,8 @@ type LockMap = Arc>>>>; #[derive(Clone)] pub(crate) struct MemoryLocks { locks: LockMap, + /// Counter tracking active LockedKey instances (locks in use) + active_locks: Arc, } struct LockedKey<'a> { @@ -57,6 +61,9 @@ impl<'a> Lock for LockedKey<'a> { impl<'a> Drop for LockedKey<'a> { fn drop(&mut self) { + self.parent.active_locks.fetch_sub(1, Ordering::Relaxed); + LOCK_QUEUE_SIZE.set(self.parent.active_locks.load(Ordering::Relaxed)); + // Lock the map to prevent concurrent modifications while we check the refcount // parking_lot allows sync access, held very briefly (just a refcount check + hash remove) let mut locks = self.parent.locks.write(); @@ -90,10 +97,14 @@ impl LockStorage for MemoryLocks { fn new() -> Box { Box::new(Self { locks: Arc::new(parking_lot::RwLock::new(HashMap::new())), + active_locks: Arc::new(AtomicI64::new(0)), }) } async fn prepare_lock<'a>(&'a self, key: String) -> Box { + self.active_locks.fetch_add(1, Ordering::Relaxed); + LOCK_QUEUE_SIZE.set(self.active_locks.load(Ordering::Relaxed)); + let lock = self.get_or_create_lock(key.clone()).await; Box::new(LockedKey { lock, diff --git a/src/main.rs b/src/main.rs index b5c71dc..1b44977 100644 --- a/src/main.rs +++ b/src/main.rs @@ -150,7 +150,7 @@ fn start_background_tasks(app_state: Arc, bucket_config: &config::Buck // Start metrics updater task let metrics_state = app_state.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); loop { interval.tick().await; if let Err(e) = metrics_state.update_storage_metrics().await { diff --git a/src/metrics.rs b/src/metrics.rs index 44068c7..f7686c3 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -86,16 +86,11 @@ lazy_static! { ) .unwrap(); - pub static ref DEDUP_HIT_RATE: Gauge = register_gauge!( - "s3dedup_dedup_hit_rate", - "Deduplication hit rate (hits / (hits + misses))" - ) - .unwrap(); - // Cleaner metrics - pub static ref CLEANER_LAST_RUN_TIMESTAMP: IntGauge = register_int_gauge!( + pub static ref CLEANER_LAST_RUN_TIMESTAMP: IntGaugeVec = register_int_gauge_vec!( "s3dedup_cleaner_last_run_timestamp_seconds", - "Timestamp of last successful cleaner run" + "Timestamp of last successful cleaner run", + &["bucket"] ) .unwrap(); @@ -174,20 +169,6 @@ lazy_static! { ) .unwrap(); - pub static ref DB_CONNECTION_ACQUIRE_DURATION_SECONDS: HistogramVec = register_histogram_vec!( - "s3dedup_db_connection_acquire_duration_seconds", - "Time to acquire a database connection", - &["storage_type"] - ) - .unwrap(); - - pub static ref DB_CONNECTION_ACQUIRE_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec!( - "s3dedup_db_connection_acquire_errors_total", - "Total failed database connection acquisitions", - &["storage_type", "reason"] - ) - .unwrap(); - // Version tracking pub static ref INSTANCE_VERSION: IntGaugeVec = register_int_gauge_vec!( "s3dedup_instance_version_info", diff --git a/src/migration/mod.rs b/src/migration/mod.rs index b7b05e6..e25714c 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -1,5 +1,6 @@ use crate::AppState; use crate::filetracker_client::{FileMetadata, FiletrackerClient}; +use crate::metrics::MIGRATION_FILES_MIGRATED; use crate::routes::ft::storage_helpers; use anyhow::{Context, Result}; use futures_util::future::join_all; @@ -801,6 +802,12 @@ async fn migrate_single_file( if let Err(e) = guard.release().await { warn!("Failed to release file lock: {}", e); } + + // Increment migration metric + MIGRATION_FILES_MIGRATED + .with_label_values(&[&app_state.bucket_name]) + .inc(); + Ok(true) } @@ -1313,5 +1320,11 @@ async fn migrate_single_file_from_v1_fs( if let Err(e) = guard.release().await { warn!("Failed to release file lock: {}", e); } + + // Increment migration metric + MIGRATION_FILES_MIGRATED + .with_label_values(&[&app_state.bucket_name]) + .inc(); + Ok(true) }