Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "s3dedup"
version = "1.1.0"
version = "1.1.1"
edition = "2024"

[lib]
Expand Down
4 changes: 2 additions & 2 deletions scripts/metrics.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

# Get metrics URL from first argument or use default
METRICS_URL="${1:-localhost:9999/metrics}"
METRICS_URL="${1:-localhost:8080/metrics}"

while true; do
clear
Expand Down Expand Up @@ -103,4 +103,4 @@ while true; do
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"

sleep 5
done
done
74 changes: 68 additions & 6 deletions src/cleaner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::kvstorage::KVStorage;
use crate::locks::{self, LocksStorage};
use crate::metrics::{
CLEANER_DELETED_BLOBS_TOTAL, CLEANER_ERRORS_TOTAL, CLEANER_FREED_BYTES_TOTAL,
CLEANER_LAST_RUN_TIMESTAMP, CLEANER_TOTAL_RUNS,
};
use crate::s3storage::S3Storage;
use anyhow::Result;
use serde::Deserialize;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -89,6 +94,9 @@ impl Cleaner {
info!("Running cleanup cycle for bucket: {}", self.bucket_name);

if let Err(e) = self.run_cleanup().await {
CLEANER_ERRORS_TOTAL
.with_label_values(&[&self.bucket_name])
.inc();
error!(
"Cleanup cycle failed for bucket {}: {}",
self.bucket_name, e
Expand All @@ -100,7 +108,21 @@ impl Cleaner {

/// Run a full cleanup cycle
pub async fn run_cleanup(&self) -> Result<()> {
// Increment run counter and update timestamp at start
// This ensures both metrics are consistent even if the run fails
CLEANER_TOTAL_RUNS
.with_label_values(&[&self.bucket_name])
.inc();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
CLEANER_LAST_RUN_TIMESTAMP
.with_label_values(&[&self.bucket_name])
.set(timestamp);

let mut total_deletes = 0;
let mut total_bytes_freed: u64 = 0;

// Case 1: Clean ref_files pointing to non-existent hashes
info!("Phase 1: Cleaning ref_files with missing hashes");
Expand All @@ -111,6 +133,7 @@ impl Cleaner {
"Reached max deletes limit ({}) in phase 1, stopping cleanup cycle",
self.config.max_deletes_per_run
);
self.update_cleanup_metrics(total_deletes, total_bytes_freed);
return Ok(());
}

Expand All @@ -123,18 +146,22 @@ impl Cleaner {
"Reached max deletes limit ({}) in phase 2, stopping cleanup cycle",
self.config.max_deletes_per_run
);
self.update_cleanup_metrics(total_deletes, total_bytes_freed);
return Ok(());
}

// Case 3: Clean S3 objects with no refcount or refcount = 0
info!("Phase 3: Cleaning S3 objects with no refcount or refcount = 0");
total_deletes += self.clean_unused_s3_objects().await?;
let (s3_deletes, s3_bytes_freed) = self.clean_unused_s3_objects().await?;
total_deletes += s3_deletes;
total_bytes_freed += s3_bytes_freed;

if total_deletes >= self.config.max_deletes_per_run {
warn!(
"Reached max deletes limit ({}) in phase 3, stopping cleanup cycle",
self.config.max_deletes_per_run
);
self.update_cleanup_metrics(total_deletes, total_bytes_freed);
return Ok(());
}

Expand All @@ -143,13 +170,36 @@ impl Cleaner {
total_deletes += self.clean_orphaned_logical_sizes().await?;

info!(
"Cleanup cycle complete for bucket: {} (total items deleted: {})",
self.bucket_name, total_deletes
"Cleanup cycle complete for bucket: {} (total items deleted: {}, bytes freed: {})",
self.bucket_name, total_deletes, total_bytes_freed
);

self.update_cleanup_metrics(total_deletes, total_bytes_freed);
Ok(())
}

/// Update cleanup metrics after a run
fn update_cleanup_metrics(&self, total_deletes: usize, total_bytes_freed: u64) {
// Update timestamp (use unwrap_or to handle edge case of system time before UNIX_EPOCH)
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
CLEANER_LAST_RUN_TIMESTAMP
.with_label_values(&[&self.bucket_name])
.set(timestamp);

// Update deleted blobs counter
CLEANER_DELETED_BLOBS_TOTAL
.with_label_values(&[&self.bucket_name])
.inc_by(total_deletes as u64);

// Update freed bytes counter
CLEANER_FREED_BYTES_TOTAL
.with_label_values(&[&self.bucket_name])
.inc_by(total_bytes_freed);
}

/// Clean ref_files that point to non-existent hashes in refcount table
async fn clean_orphaned_ref_files(&self) -> Result<usize> {
let mut deleted_count = 0;
Expand Down Expand Up @@ -327,8 +377,10 @@ impl Cleaner {
}

/// Clean S3 objects that have no refcount or refcount = 0
async fn clean_unused_s3_objects(&self) -> Result<usize> {
/// Returns (deleted_count, bytes_freed)
async fn clean_unused_s3_objects(&self) -> Result<(usize, u64)> {
let mut deleted_count = 0;
let mut bytes_freed: u64 = 0;
let mut continuation_token: Option<String> = None;

loop {
Expand Down Expand Up @@ -373,6 +425,15 @@ impl Cleaner {
if refcount == 0 {
debug!("Found unused S3 object: key={} (refcount=0)", key);

// Get compressed size before deleting (for metrics)
let compressed_size = self
.kvstorage
.lock()
.await
.get_compressed_size(&self.bucket_name, &key)
.await
.unwrap_or(0);

// Delete the S3 object
if let Err(e) = self.s3storage.lock().await.delete_object(&key).await {
error!(
Expand All @@ -386,12 +447,13 @@ impl Cleaner {
}

deleted_count += 1;
bytes_freed += compressed_size as u64;

if deleted_count >= self.config.max_deletes_per_run {
if let Err(e) = hash_guard.release().await {
warn!("Failed to release hash lock: {}", e);
}
return Ok(deleted_count);
return Ok((deleted_count, bytes_freed));
}
}

Expand All @@ -406,7 +468,7 @@ impl Cleaner {
}
}

Ok(deleted_count)
Ok((deleted_count, bytes_freed))
}

/// Clean logical_size entries that have no corresponding refcount
Expand Down
11 changes: 11 additions & 0 deletions src/locks/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::locks::{ExclusiveLockGuard, Lock, LockStorage, SharedLockGuard};
use crate::metrics::LOCK_QUEUE_SIZE;
use async_trait::async_trait;
use std::sync::atomic::{AtomicI64, Ordering};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::spawn_blocking;
Expand All @@ -10,6 +12,8 @@ type LockMap = Arc<parking_lot::RwLock<HashMap<String, Arc<TokioRwLock<()>>>>>;
#[derive(Clone)]
pub(crate) struct MemoryLocks {
locks: LockMap,
/// Counter tracking active LockedKey instances (locks in use)
active_locks: Arc<AtomicI64>,
}

struct LockedKey<'a> {
Expand Down Expand Up @@ -57,6 +61,9 @@ impl<'a> Lock for LockedKey<'a> {

impl<'a> Drop for LockedKey<'a> {
fn drop(&mut self) {
self.parent.active_locks.fetch_sub(1, Ordering::Relaxed);
LOCK_QUEUE_SIZE.set(self.parent.active_locks.load(Ordering::Relaxed));

// Lock the map to prevent concurrent modifications while we check the refcount
// parking_lot allows sync access, held very briefly (just a refcount check + hash remove)
let mut locks = self.parent.locks.write();
Expand Down Expand Up @@ -90,10 +97,14 @@ impl LockStorage for MemoryLocks {
fn new() -> Box<Self> {
Box::new(Self {
locks: Arc::new(parking_lot::RwLock::new(HashMap::new())),
active_locks: Arc::new(AtomicI64::new(0)),
})
}

async fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send> {
self.active_locks.fetch_add(1, Ordering::Relaxed);
LOCK_QUEUE_SIZE.set(self.active_locks.load(Ordering::Relaxed));

let lock = self.get_or_create_lock(key.clone()).await;
Box::new(LockedKey {
lock,
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn start_background_tasks(app_state: Arc<AppState>, bucket_config: &config::Buck
// Start metrics updater task
let metrics_state = app_state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = metrics_state.update_storage_metrics().await {
Expand Down
25 changes: 3 additions & 22 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,11 @@ lazy_static! {
)
.unwrap();

pub static ref DEDUP_HIT_RATE: Gauge = register_gauge!(
"s3dedup_dedup_hit_rate",
"Deduplication hit rate (hits / (hits + misses))"
)
.unwrap();

// Cleaner metrics
pub static ref CLEANER_LAST_RUN_TIMESTAMP: IntGauge = register_int_gauge!(
pub static ref CLEANER_LAST_RUN_TIMESTAMP: IntGaugeVec = register_int_gauge_vec!(
"s3dedup_cleaner_last_run_timestamp_seconds",
"Timestamp of last successful cleaner run"
"Timestamp of last successful cleaner run",
&["bucket"]
)
.unwrap();

Expand Down Expand Up @@ -174,20 +169,6 @@ lazy_static! {
)
.unwrap();

pub static ref DB_CONNECTION_ACQUIRE_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
"s3dedup_db_connection_acquire_duration_seconds",
"Time to acquire a database connection",
&["storage_type"]
)
.unwrap();

pub static ref DB_CONNECTION_ACQUIRE_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec!(
"s3dedup_db_connection_acquire_errors_total",
"Total failed database connection acquisitions",
&["storage_type", "reason"]
)
.unwrap();

// Version tracking
pub static ref INSTANCE_VERSION: IntGaugeVec = register_int_gauge_vec!(
"s3dedup_instance_version_info",
Expand Down
13 changes: 13 additions & 0 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::AppState;
use crate::filetracker_client::{FileMetadata, FiletrackerClient};
use crate::metrics::MIGRATION_FILES_MIGRATED;
use crate::routes::ft::storage_helpers;
use anyhow::{Context, Result};
use futures_util::future::join_all;
Expand Down Expand Up @@ -801,6 +802,12 @@ async fn migrate_single_file(
if let Err(e) = guard.release().await {
warn!("Failed to release file lock: {}", e);
}

// Increment migration metric
MIGRATION_FILES_MIGRATED
.with_label_values(&[&app_state.bucket_name])
.inc();

Ok(true)
}

Expand Down Expand Up @@ -1313,5 +1320,11 @@ async fn migrate_single_file_from_v1_fs(
if let Err(e) = guard.release().await {
warn!("Failed to release file lock: {}", e);
}

// Increment migration metric
MIGRATION_FILES_MIGRATED
.with_label_values(&[&app_state.bucket_name])
.inc();

Ok(true)
}