diff --git a/Cargo.lock b/Cargo.lock index 723b6791..78d171ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,7 +4410,7 @@ dependencies = [ [[package]] name = "mutant" -version = "0.6.1" +version = "0.6.2" dependencies = [ "anyhow", "chrono", @@ -4432,7 +4432,7 @@ dependencies = [ [[package]] name = "mutant-client" -version = "0.6.1" +version = "0.6.2" dependencies = [ "console_error_panic_hook", "ewebsock", @@ -4455,7 +4455,7 @@ dependencies = [ [[package]] name = "mutant-daemon" -version = "0.6.1" +version = "0.6.2" dependencies = [ "anyhow", "clap", @@ -4478,7 +4478,7 @@ dependencies = [ [[package]] name = "mutant-lib" -version = "0.6.1" +version = "0.6.2" dependencies = [ "ant-networking", "anyhow", @@ -4525,7 +4525,7 @@ dependencies = [ [[package]] name = "mutant-protocol" -version = "0.6.1" +version = "0.6.2" dependencies = [ "base64 0.21.7", "serde", diff --git a/README.md b/README.md index 9d2e8538..946dabdf 100644 --- a/README.md +++ b/README.md @@ -169,7 +169,7 @@ Options: #### Store/fetch private data ```bash -# Store a file directly +# Store a the file `data.txt` under the name `mykey` $> mutant put mykey data.txt # Get a value and save to a file @@ -256,7 +256,7 @@ Add `mutant-lib` and its dependencies to your `Cargo.toml`: ```toml [dependencies] -mutant-lib = "0.6.1" # Or the version you need +mutant-lib = "0.6.2" # Or the version you need tokio = { version = "1", features = ["full"] } ``` @@ -277,7 +277,6 @@ async fn main() -> anyhow::Result<()> { mutant.put("greeting", b"hello world", StorageMode::Medium, false).await?; let fetched_value = mutant.get("greeting").await?; - println!("Fetched value: {}", String::from_utf8_lossy(&fetched_value)); mutant.rm("greeting").await?; @@ -353,45 +352,8 @@ async fn main() -> Result<()> { } ``` -## Development and Testing - -### Local Testnet Management (`scripts/manage_local_testnet.sh`) - -### Running Integration Tests (`scripts/run_tests_with_env.sh`) - -## Migration - -## Architecture Overview - -MutAnt consists of five main components that work together to provide a complete storage solution: - -1. **mutant-lib**: Core library handling chunking, encryption, and storage operations -2. **mutant-protocol**: Shared communication format definitions -3. **mutant-daemon**: Background service maintaining Autonomi connection -4. **mutant-client**: WebSocket client library for communicating with the daemon -5. **mutant-cli**: Command-line interface for end users - -These components work together in a client-server architecture: - -- The **daemon** uses **mutant-lib** to interact with the Autonomi network -- **Clients** (CLI or custom applications) connect to the daemon via WebSocket -- Communication between clients and the daemon uses the protocol definitions -- The daemon manages concurrent operations, background tasks, and network connections - -This architecture provides several benefits: -- Persistent connection to the Autonomi network -- Background processing of long-running operations -- Task management and monitoring -- Concurrent operations with efficient resource usage -- Worker pools for optimized performance - -### Internal Architecture +# Donate -Under the hood, MutAnt uses a worker architecture for handling operations: -- 10 workers, each with a dedicated client -- Each worker manages 10 concurrent operations (tasks) -- Total of 100 concurrent operations -- Round-robin distribution of work with work stealing -- Automatic recycling of failed pads +If you find this project useful, you can donate to support its development. <3 -## Configuration \ No newline at end of file +ETH/ANT `0x3376C33FdC0c885cef483690ffDd1e0Ff0Eb026c` diff --git a/mutant-cli/Cargo.toml b/mutant-cli/Cargo.toml index 1f62c4c7..5352b845 100644 --- a/mutant-cli/Cargo.toml +++ b/mutant-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mutant" -version = "0.6.1" +version = "0.6.2" edition = "2021" license = "LGPL-3.0-only" description = "Command-line interface for MutAnt distributed mutable key value storage over Autonomi network" @@ -13,8 +13,8 @@ colored = "2.0" env_logger = "0.11" indicatif = "0.17" log = "0.4" -mutant-client = { path = "../mutant-client", version = "0.6.1" } -mutant-protocol = { path = "../mutant-protocol", version = "0.6.1" } +mutant-client = { path = "../mutant-client", version = "0.6.2" } +mutant-protocol = { path = "../mutant-protocol", version = "0.6.2" } tokio = { version = "1.0", features = ["full"] } uuid = { version = "1.0", features = ["v4", "serde"] } humansize = "2.1" diff --git a/mutant-cli/README.md b/mutant-cli/README.md index 587b7b89..946dabdf 100644 --- a/mutant-cli/README.md +++ b/mutant-cli/README.md @@ -127,7 +127,7 @@ $> cargo install mutant You can fetch the daily meme of the day with the following command: ```bash -$> mutant get -p 9620303d41cd65177e6763809f4780b1fa7d864a14d4ed0ed0322c5d4524fe406db65fdc485f1737814c81ab6d61dab0 daily_meme.jpg +$> mutant get -p a420224971527d61ce6ee21d850a07c243498c95808697e8fac23f461545656933016697d10b805c0fa26b50eb3532b2 daily_meme.jpg ``` ## Command-Line Interface (CLI) @@ -169,7 +169,7 @@ Options: #### Store/fetch private data ```bash -# Store a file directly +# Store a the file `data.txt` under the name `mykey` $> mutant put mykey data.txt # Get a value and save to a file @@ -256,7 +256,7 @@ Add `mutant-lib` and its dependencies to your `Cargo.toml`: ```toml [dependencies] -mutant-lib = "0.6.0" # Or the version you need +mutant-lib = "0.6.2" # Or the version you need tokio = { version = "1", features = ["full"] } ``` @@ -277,7 +277,6 @@ async fn main() -> anyhow::Result<()> { mutant.put("greeting", b"hello world", StorageMode::Medium, false).await?; let fetched_value = mutant.get("greeting").await?; - println!("Fetched value: {}", String::from_utf8_lossy(&fetched_value)); mutant.rm("greeting").await?; @@ -353,45 +352,8 @@ async fn main() -> Result<()> { } ``` -## Development and Testing - -### Local Testnet Management (`scripts/manage_local_testnet.sh`) - -### Running Integration Tests (`scripts/run_tests_with_env.sh`) - -## Migration - -## Architecture Overview - -MutAnt consists of five main components that work together to provide a complete storage solution: - -1. **mutant-lib**: Core library handling chunking, encryption, and storage operations -2. **mutant-protocol**: Shared communication format definitions -3. **mutant-daemon**: Background service maintaining Autonomi connection -4. **mutant-client**: WebSocket client library for communicating with the daemon -5. **mutant-cli**: Command-line interface for end users - -These components work together in a client-server architecture: - -- The **daemon** uses **mutant-lib** to interact with the Autonomi network -- **Clients** (CLI or custom applications) connect to the daemon via WebSocket -- Communication between clients and the daemon uses the protocol definitions -- The daemon manages concurrent operations, background tasks, and network connections - -This architecture provides several benefits: -- Persistent connection to the Autonomi network -- Background processing of long-running operations -- Task management and monitoring -- Concurrent operations with efficient resource usage -- Worker pools for optimized performance - -### Internal Architecture +# Donate -Under the hood, MutAnt uses a worker architecture for handling operations: -- 10 workers, each with a dedicated client -- Each worker manages 10 concurrent operations (tasks) -- Total of 100 concurrent operations -- Round-robin distribution of work with work stealing -- Automatic recycling of failed pads +If you find this project useful, you can donate to support its development. <3 -## Configuration \ No newline at end of file +ETH/ANT `0x3376C33FdC0c885cef483690ffDd1e0Ff0Eb026c` diff --git a/mutant-cli/src/callbacks/put.rs b/mutant-cli/src/callbacks/put.rs index ed02f04a..f9b12b85 100644 --- a/mutant-cli/src/callbacks/put.rs +++ b/mutant-cli/src/callbacks/put.rs @@ -1,4 +1,5 @@ use super::progress::StyledProgressBar; +use colored::Colorize; use indicatif::MultiProgress; use log::{error, info, warn}; use mutant_client::ProgressReceiver; @@ -13,6 +14,32 @@ struct PutCallbackContext { confirm_pb_opt: Arc>>, multi_progress: MultiProgress, total_chunks: Arc>, + first_complete_seen: Arc>, + start_time: Arc>, +} + +impl PutCallbackContext { + /// Finishes and clears a progress bar, setting it to 100% if not already finished. + /// Returns after the progress bar is cleared. + async fn finish_progress_bar( + &self, + pb_mutex: &Arc>>, + completion_message: &str, + bar_name: &str, + ) { + let mut pb_guard = pb_mutex.lock().await; + if let Some(pb) = pb_guard.take() { + info!("Clearing {} bar", bar_name); + if !pb.is_finished() { + // If the bar isn't at 100%, set it to 100% before clearing + if let Some(len) = pb.length() { + pb.set_position(len); + } + pb.set_message(completion_message.to_string()); + } + pb.finish_and_clear(); + } + } } #[allow(clippy::type_complexity)] @@ -21,6 +48,8 @@ pub fn create_put_progress(mut progress_rx: ProgressReceiver, multi_progress: Mu let upload_pb_opt = Arc::new(Mutex::new(None::)); let confirm_pb_opt = Arc::new(Mutex::new(None::)); let total_chunks_arc = Arc::new(Mutex::new(0usize)); + let first_complete_seen = Arc::new(Mutex::new(false)); + let start_time = Arc::new(Mutex::new(std::time::Instant::now())); let context = PutCallbackContext { res_pb_opt: res_pb_opt.clone(), @@ -28,6 +57,8 @@ pub fn create_put_progress(mut progress_rx: ProgressReceiver, multi_progress: Mu confirm_pb_opt: confirm_pb_opt.clone(), multi_progress: multi_progress.clone(), total_chunks: total_chunks_arc.clone(), + first_complete_seen: first_complete_seen.clone(), + start_time: start_time.clone(), }; let ctx_clone = context.clone(); @@ -138,53 +169,59 @@ pub fn create_put_progress(mut progress_rx: ProgressReceiver, multi_progress: Mu Ok::>(true) } PutEvent::Complete => { - info!("Complete event received, clearing progress bars"); + // Check if this is the first or second Complete event + let mut first_complete_seen_guard = ctx.first_complete_seen.lock().await; + let is_first_complete = !*first_complete_seen_guard; + + if is_first_complete { + info!("First Complete event received for data pads"); + // Mark that we've seen the first Complete event + *first_complete_seen_guard = true; + drop(first_complete_seen_guard); + + // Clear all progress bars before showing the message + // First, finish the reservation bar if it exists + ctx.finish_progress_bar(&ctx.res_pb_opt, "Pads acquired.", "reservation").await; + + // Next, finish the upload bar + ctx.finish_progress_bar(&ctx.upload_pb_opt, "Upload complete.", "upload").await; + + // Finally, finish the confirmation bar + ctx.finish_progress_bar(&ctx.confirm_pb_opt, "Confirmation complete.", "confirmation").await; + + // Ensure all progress bars are cleared + crate::utils::ensure_progress_cleared(&ctx.multi_progress); + + // Calculate elapsed time + let start_time = *ctx.start_time.lock().await; + let elapsed = start_time.elapsed(); + + // Format the elapsed time + let time_str = crate::utils::format_elapsed_time(elapsed); + + // Display message about data pads being uploaded + let total_chunks = *ctx.total_chunks.lock().await; + println!("{} {} data pads have been uploaded (took {})", + "•".bright_green(), + total_chunks, + time_str); + + return Ok::>(true); + } + + // This is the second Complete event (or the only one for private keys) + info!("Final Complete event received, clearing progress bars"); + drop(first_complete_seen_guard); // Make sure all progress bars are finished and cleared // First, finish the reservation bar if it exists - let mut res_pb_guard = ctx.res_pb_opt.lock().await; - if let Some(pb) = res_pb_guard.take() { - info!("Clearing reservation bar"); - if !pb.is_finished() { - // If the bar isn't at 100%, set it to 100% before clearing - if let Some(len) = pb.length() { - pb.set_position(len); - } - pb.set_message("Pads acquired.".to_string()); - } - pb.finish_and_clear(); - } - drop(res_pb_guard); + ctx.finish_progress_bar(&ctx.res_pb_opt, "Pads acquired.", "reservation").await; // Next, finish the upload bar - let mut upload_pb_guard = ctx.upload_pb_opt.lock().await; - if let Some(pb) = upload_pb_guard.take() { - info!("Clearing upload bar"); - if !pb.is_finished() { - // If the bar isn't at 100%, set it to 100% before clearing - if let Some(len) = pb.length() { - pb.set_position(len); - } - pb.set_message("Upload complete.".to_string()); - } - pb.finish_and_clear(); - } - drop(upload_pb_guard); + ctx.finish_progress_bar(&ctx.upload_pb_opt, "Upload complete.", "upload").await; // Finally, finish the confirmation bar - let mut confirm_pb_guard = ctx.confirm_pb_opt.lock().await; - if let Some(pb) = confirm_pb_guard.take() { - info!("Clearing confirmation bar"); - if !pb.is_finished() { - // If the bar isn't at 100%, set it to 100% before clearing - if let Some(len) = pb.length() { - pb.set_position(len); - } - pb.set_message("Confirmation complete.".to_string()); - } - pb.finish_and_clear(); - } - drop(confirm_pb_guard); + ctx.finish_progress_bar(&ctx.confirm_pb_opt, "Confirmation complete.", "confirmation").await; Ok::>(true) } diff --git a/mutant-cli/src/cli.rs b/mutant-cli/src/cli.rs index ba924841..82a99371 100644 --- a/mutant-cli/src/cli.rs +++ b/mutant-cli/src/cli.rs @@ -39,7 +39,7 @@ pub enum Commands { Rm { key: String }, #[command(about = "List stored keys")] Ls { - #[arg(short = 'h', long, help = "Show fetch history")] + #[arg(long, help = "Show fetch history")] history: bool, }, #[command(about = "Show storage statistics")] diff --git a/mutant-client/Cargo.toml b/mutant-client/Cargo.toml index 5b1376fb..7af1b442 100644 --- a/mutant-client/Cargo.toml +++ b/mutant-client/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "mutant-client" -version = "0.6.1" +version = "0.6.2" edition = "2024" license = "LGPL-3.0-only" description = "WebSocket client library for MutAnt distributed mutable key value storage over Autonomi network" repository = "https://github.com/Champii/MutAnt" [dependencies] -mutant-protocol = { path = "../mutant-protocol", version = "0.6.1"} +mutant-protocol = { path = "../mutant-protocol", version = "0.6.2"} # Common dependencies serde = { version = "1.0", features = ["derive"] } diff --git a/mutant-client/README.md b/mutant-client/README.md index 587b7b89..946dabdf 100644 --- a/mutant-client/README.md +++ b/mutant-client/README.md @@ -127,7 +127,7 @@ $> cargo install mutant You can fetch the daily meme of the day with the following command: ```bash -$> mutant get -p 9620303d41cd65177e6763809f4780b1fa7d864a14d4ed0ed0322c5d4524fe406db65fdc485f1737814c81ab6d61dab0 daily_meme.jpg +$> mutant get -p a420224971527d61ce6ee21d850a07c243498c95808697e8fac23f461545656933016697d10b805c0fa26b50eb3532b2 daily_meme.jpg ``` ## Command-Line Interface (CLI) @@ -169,7 +169,7 @@ Options: #### Store/fetch private data ```bash -# Store a file directly +# Store a the file `data.txt` under the name `mykey` $> mutant put mykey data.txt # Get a value and save to a file @@ -256,7 +256,7 @@ Add `mutant-lib` and its dependencies to your `Cargo.toml`: ```toml [dependencies] -mutant-lib = "0.6.0" # Or the version you need +mutant-lib = "0.6.2" # Or the version you need tokio = { version = "1", features = ["full"] } ``` @@ -277,7 +277,6 @@ async fn main() -> anyhow::Result<()> { mutant.put("greeting", b"hello world", StorageMode::Medium, false).await?; let fetched_value = mutant.get("greeting").await?; - println!("Fetched value: {}", String::from_utf8_lossy(&fetched_value)); mutant.rm("greeting").await?; @@ -353,45 +352,8 @@ async fn main() -> Result<()> { } ``` -## Development and Testing - -### Local Testnet Management (`scripts/manage_local_testnet.sh`) - -### Running Integration Tests (`scripts/run_tests_with_env.sh`) - -## Migration - -## Architecture Overview - -MutAnt consists of five main components that work together to provide a complete storage solution: - -1. **mutant-lib**: Core library handling chunking, encryption, and storage operations -2. **mutant-protocol**: Shared communication format definitions -3. **mutant-daemon**: Background service maintaining Autonomi connection -4. **mutant-client**: WebSocket client library for communicating with the daemon -5. **mutant-cli**: Command-line interface for end users - -These components work together in a client-server architecture: - -- The **daemon** uses **mutant-lib** to interact with the Autonomi network -- **Clients** (CLI or custom applications) connect to the daemon via WebSocket -- Communication between clients and the daemon uses the protocol definitions -- The daemon manages concurrent operations, background tasks, and network connections - -This architecture provides several benefits: -- Persistent connection to the Autonomi network -- Background processing of long-running operations -- Task management and monitoring -- Concurrent operations with efficient resource usage -- Worker pools for optimized performance - -### Internal Architecture +# Donate -Under the hood, MutAnt uses a worker architecture for handling operations: -- 10 workers, each with a dedicated client -- Each worker manages 10 concurrent operations (tasks) -- Total of 100 concurrent operations -- Round-robin distribution of work with work stealing -- Automatic recycling of failed pads +If you find this project useful, you can donate to support its development. <3 -## Configuration \ No newline at end of file +ETH/ANT `0x3376C33FdC0c885cef483690ffDd1e0Ff0Eb026c` diff --git a/mutant-daemon/Cargo.toml b/mutant-daemon/Cargo.toml index 24498a0b..0578379e 100644 --- a/mutant-daemon/Cargo.toml +++ b/mutant-daemon/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mutant-daemon" -version = "0.6.1" +version = "0.6.2" edition = "2021" license = "LGPL-3.0-only" description = "Background daemon service for MutAnt distributed mutable key value storage over Autonomi network" @@ -13,8 +13,8 @@ dialoguer = "0.11" directories = "5.0" env_logger = "0.11" log = "0.4" -mutant-lib = { path = "../mutant-lib", version = "0.6.1" } -mutant-protocol = { path = "../mutant-protocol", version = "0.6.1" } +mutant-lib = { path = "../mutant-lib", version = "0.6.2" } +mutant-protocol = { path = "../mutant-protocol", version = "0.6.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/mutant-daemon/README.md b/mutant-daemon/README.md index 587b7b89..946dabdf 100644 --- a/mutant-daemon/README.md +++ b/mutant-daemon/README.md @@ -127,7 +127,7 @@ $> cargo install mutant You can fetch the daily meme of the day with the following command: ```bash -$> mutant get -p 9620303d41cd65177e6763809f4780b1fa7d864a14d4ed0ed0322c5d4524fe406db65fdc485f1737814c81ab6d61dab0 daily_meme.jpg +$> mutant get -p a420224971527d61ce6ee21d850a07c243498c95808697e8fac23f461545656933016697d10b805c0fa26b50eb3532b2 daily_meme.jpg ``` ## Command-Line Interface (CLI) @@ -169,7 +169,7 @@ Options: #### Store/fetch private data ```bash -# Store a file directly +# Store a the file `data.txt` under the name `mykey` $> mutant put mykey data.txt # Get a value and save to a file @@ -256,7 +256,7 @@ Add `mutant-lib` and its dependencies to your `Cargo.toml`: ```toml [dependencies] -mutant-lib = "0.6.0" # Or the version you need +mutant-lib = "0.6.2" # Or the version you need tokio = { version = "1", features = ["full"] } ``` @@ -277,7 +277,6 @@ async fn main() -> anyhow::Result<()> { mutant.put("greeting", b"hello world", StorageMode::Medium, false).await?; let fetched_value = mutant.get("greeting").await?; - println!("Fetched value: {}", String::from_utf8_lossy(&fetched_value)); mutant.rm("greeting").await?; @@ -353,45 +352,8 @@ async fn main() -> Result<()> { } ``` -## Development and Testing - -### Local Testnet Management (`scripts/manage_local_testnet.sh`) - -### Running Integration Tests (`scripts/run_tests_with_env.sh`) - -## Migration - -## Architecture Overview - -MutAnt consists of five main components that work together to provide a complete storage solution: - -1. **mutant-lib**: Core library handling chunking, encryption, and storage operations -2. **mutant-protocol**: Shared communication format definitions -3. **mutant-daemon**: Background service maintaining Autonomi connection -4. **mutant-client**: WebSocket client library for communicating with the daemon -5. **mutant-cli**: Command-line interface for end users - -These components work together in a client-server architecture: - -- The **daemon** uses **mutant-lib** to interact with the Autonomi network -- **Clients** (CLI or custom applications) connect to the daemon via WebSocket -- Communication between clients and the daemon uses the protocol definitions -- The daemon manages concurrent operations, background tasks, and network connections - -This architecture provides several benefits: -- Persistent connection to the Autonomi network -- Background processing of long-running operations -- Task management and monitoring -- Concurrent operations with efficient resource usage -- Worker pools for optimized performance - -### Internal Architecture +# Donate -Under the hood, MutAnt uses a worker architecture for handling operations: -- 10 workers, each with a dedicated client -- Each worker manages 10 concurrent operations (tasks) -- Total of 100 concurrent operations -- Round-robin distribution of work with work stealing -- Automatic recycling of failed pads +If you find this project useful, you can donate to support its development. <3 -## Configuration \ No newline at end of file +ETH/ANT `0x3376C33FdC0c885cef483690ffDd1e0Ff0Eb026c` diff --git a/mutant-daemon/src/app.rs b/mutant-daemon/src/app.rs index 52b8ff25..eded41dc 100644 --- a/mutant-daemon/src/app.rs +++ b/mutant-daemon/src/app.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use xdg::BaseDirectories; use mutant_lib::{config::NetworkChoice, MutAnt}; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, OnceCell}; use warp::Filter; use crate::error::Error; @@ -13,6 +13,43 @@ use crate::wallet; use std::path::PathBuf; use tokio::signal; +// Thread-safe singleton to track public-only mode +pub static PUBLIC_ONLY_MODE: OnceCell = OnceCell::const_new(); + +/// Helper function to initialize MutAnt based on network choice and private key +async fn init_mutant(network_choice: NetworkChoice, private_key: Option) -> Result<(MutAnt, bool), Error> { + let mut is_public_only = private_key.is_none(); + + let mutant = match (network_choice, private_key) { + // Full access with private key + (NetworkChoice::Devnet, _) => { + log::info!("Running in local mode"); + is_public_only = false; + MutAnt::init_local().await + } + (NetworkChoice::Alphanet, Some(key)) => { + log::info!("Running in alphanet mode"); + MutAnt::init_alphanet(&key).await + } + (NetworkChoice::Mainnet, Some(key)) => { + log::info!("Running in mainnet mode"); + MutAnt::init(&key).await + } + + // Public-only mode (no private key) + (NetworkChoice::Alphanet, None) => { + log::info!("Running in alphanet public-only mode"); + MutAnt::init_public_alphanet().await + } + (NetworkChoice::Mainnet, None) => { + log::info!("Running in mainnet public-only mode"); + MutAnt::init_public().await + } + }.map_err(Error::MutAnt)?; + + Ok((mutant, is_public_only)) +} + #[derive(serde::Deserialize, serde::Serialize)] struct NetworkConfig { public_key: String, @@ -129,50 +166,49 @@ pub async fn run(options: AppOptions) -> Result<(), Error> { let mut config = Config::load()?; - let private_key_for_mutant: String; - - match config.get_private_key(network_choice)? { + // Try to get a private key from config + let private_key = match config.get_private_key(network_choice)? { Some((key_from_file, pk_hex)) => { log::info!( "Loaded private key from file for network {:?}: {}", network_choice, pk_hex ); - private_key_for_mutant = key_from_file; + Some(key_from_file) } None => { - log::info!("No private key found in config or file, attempting to scan wallets for network {:?}", network_choice); - let (selected_private_key, pk_hex) = wallet::scan_and_select_wallet().await?; - - config.set_public_key(network_choice, pk_hex.clone()); - config.save()?; - log::info!( - "Saved newly selected public key {} to config for network {:?}", - pk_hex, - network_choice - ); - private_key_for_mutant = selected_private_key; - } - } - - let mutant = Arc::new( - match network_choice { - NetworkChoice::Devnet => { - log::info!("Running in local mode"); - MutAnt::init_local().await - } - NetworkChoice::Alphanet => { - log::info!("Running in alphanet mode"); - MutAnt::init_alphanet(&private_key_for_mutant).await - } - NetworkChoice::Mainnet => { - log::info!("Running in mainnet mode"); - MutAnt::init(&private_key_for_mutant).await + // Try to scan for wallets + match wallet::scan_and_select_wallet().await { + Ok((selected_private_key, pk_hex)) => { + // Save the selected wallet for future use + config.set_public_key(network_choice, pk_hex.clone()); + config.save()?; + log::info!( + "Saved newly selected public key {} to config for network {:?}", + pk_hex, + network_choice + ); + Some(selected_private_key) + } + Err(e) => { + // No wallet found, initialize in public-only mode + log::warn!("No wallet found: {}. Initializing in public-only mode.", e); + log::info!("Only public downloads (mutant get -p) will be available."); + None + } } } - .map_err(Error::MutAnt)?, - ); - log::info!("MutAnt initialized successfully."); + }; + + // Initialize MutAnt with the appropriate mode + let (mutant, is_public_only) = init_mutant(network_choice, private_key).await?; + let mutant = Arc::new(mutant); + + // Set the public-only mode flag + PUBLIC_ONLY_MODE.set(is_public_only).expect("PUBLIC_ONLY_MODE should only be set once"); + + log::info!("MutAnt initialized successfully{}", + if is_public_only { " in public-only mode" } else { "" }); // Initialize Task Management let tasks: TaskMap = Arc::new(RwLock::new(HashMap::new())); diff --git a/mutant-daemon/src/handlers/data_operations.rs b/mutant-daemon/src/handlers/data_operations.rs index c1c9dd68..98948c3f 100644 --- a/mutant-daemon/src/handlers/data_operations.rs +++ b/mutant-daemon/src/handlers/data_operations.rs @@ -3,7 +3,7 @@ use tokio::fs; use uuid::Uuid; use crate::error::Error as DaemonError; -use super::{TaskEntry, TaskMap, ActiveKeysMap, try_register_key, release_key}; +use super::{TaskEntry, TaskMap, ActiveKeysMap, try_register_key, release_key, is_public_only_mode, PUBLIC_ONLY_ERROR_MSG}; use mutant_lib::storage::ScratchpadAddress; use mutant_lib::MutAnt; use mutant_protocol::{ @@ -22,6 +22,16 @@ pub(crate) async fn handle_put( active_keys: ActiveKeysMap, original_request_str: &str, ) -> Result<(), DaemonError> { + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(original_request_str.to_string()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let task_id = Uuid::new_v4(); let user_key = req.user_key.clone(); let source_path = req.source_path.clone(); // Keep path for logging @@ -428,6 +438,16 @@ pub(crate) async fn handle_rm( active_keys: ActiveKeysMap, original_request_str: &str, ) -> Result<(), DaemonError> { + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(original_request_str.to_string()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let task_id = Uuid::new_v4(); let user_key = req.user_key.clone(); log::info!("Starting RM task: user_key={}", user_key); diff --git a/mutant-daemon/src/handlers/import_export.rs b/mutant-daemon/src/handlers/import_export.rs index 9fb5ca94..b7dab474 100644 --- a/mutant-daemon/src/handlers/import_export.rs +++ b/mutant-daemon/src/handlers/import_export.rs @@ -2,10 +2,11 @@ use std::sync::Arc; use tokio::fs; use crate::error::Error as DaemonError; +use super::{is_public_only_mode, PUBLIC_ONLY_ERROR_MSG}; use mutant_lib::storage::PadInfo; use mutant_lib::MutAnt; use mutant_protocol::{ - ExportRequest, ExportResponse, ExportResult, ImportRequest, ImportResponse, ImportResult, + ErrorResponse, ExportRequest, ExportResponse, ExportResult, ImportRequest, ImportResponse, ImportResult, Response, }; @@ -18,6 +19,16 @@ pub(crate) async fn handle_import( ) -> Result<(), DaemonError> { log::debug!("Handling Import request"); + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(serde_json::to_string(&req).unwrap_or_default()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let file_path = req.file_path.clone(); let pads_hex = fs::read(&file_path) .await @@ -50,6 +61,16 @@ pub(crate) async fn handle_export( ) -> Result<(), DaemonError> { log::debug!("Handling Export request"); + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(serde_json::to_string(&req).unwrap_or_default()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let destination_path = req.destination_path.clone(); let pads_hex = mutant.export_raw_pads_private_key().await?; diff --git a/mutant-daemon/src/handlers/mod.rs b/mutant-daemon/src/handlers/mod.rs index 97bc0f58..6dea74dc 100644 --- a/mutant-daemon/src/handlers/mod.rs +++ b/mutant-daemon/src/handlers/mod.rs @@ -13,3 +13,12 @@ mod key_management; pub use websocket::handle_ws; pub use task_management::{TaskEntry, TaskMap}; pub use key_management::{ActiveKeysMap, try_register_key, release_key}; + +/// Check if the daemon is running in public-only mode +pub fn is_public_only_mode() -> bool { + // If the OnceCell hasn't been initialized yet, we're not in public-only mode + crate::app::PUBLIC_ONLY_MODE.get().copied().unwrap_or(false) +} + +/// Error message for operations that require a wallet when in public-only mode +pub const PUBLIC_ONLY_ERROR_MSG: &str = "This operation requires a wallet, but the daemon is running in public-only mode. To enable full functionality, please set up an Autonomi wallet using 'ant wallet import' or 'ant wallet create' and restart the daemon."; diff --git a/mutant-daemon/src/handlers/system_operations.rs b/mutant-daemon/src/handlers/system_operations.rs index 59523db4..275dd864 100644 --- a/mutant-daemon/src/handlers/system_operations.rs +++ b/mutant-daemon/src/handlers/system_operations.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use uuid::Uuid; use crate::error::Error as DaemonError; -use super::{TaskMap, TaskEntry}; +use super::{TaskMap, TaskEntry, is_public_only_mode, PUBLIC_ONLY_ERROR_MSG}; use mutant_lib::MutAnt; use mutant_protocol::{ - HealthCheckCallback, HealthCheckEvent, HealthCheckRequest, PurgeCallback, PurgeEvent, + ErrorResponse, HealthCheckCallback, HealthCheckEvent, HealthCheckRequest, PurgeCallback, PurgeEvent, PurgeRequest, Response, SyncCallback, SyncEvent, SyncRequest, Task, TaskCreatedResponse, TaskProgress, TaskResult, TaskResultResponse, TaskResultType, TaskStatus, TaskType, TaskUpdateResponse, @@ -19,6 +19,16 @@ pub(crate) async fn handle_sync( mutant: Arc, tasks: TaskMap, ) -> Result<(), DaemonError> { + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(serde_json::to_string(&SyncRequest { push_force: req.push_force }).unwrap_or_default()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let task_id = Uuid::new_v4(); let task = Task { @@ -159,6 +169,16 @@ pub(crate) async fn handle_purge( mutant: Arc, tasks: TaskMap, ) -> Result<(), DaemonError> { + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(serde_json::to_string(&PurgeRequest { aggressive: req.aggressive }).unwrap_or_default()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let task_id = Uuid::new_v4(); let task = Task { @@ -300,6 +320,19 @@ pub(crate) async fn handle_health_check( mutant: Arc, tasks: TaskMap, ) -> Result<(), DaemonError> { + // Check if we're in public-only mode + if is_public_only_mode() { + return update_tx + .send(Response::Error(ErrorResponse { + error: PUBLIC_ONLY_ERROR_MSG.to_string(), + original_request: Some(serde_json::to_string(&HealthCheckRequest { + key_name: req.key_name.clone(), + recycle: req.recycle + }).unwrap_or_default()), + })) + .map_err(|e| DaemonError::Internal(format!("Update channel send error: {}", e))); + } + let task_id = Uuid::new_v4(); let task = Task { diff --git a/mutant-lib/Cargo.toml b/mutant-lib/Cargo.toml index 31eb320c..02b605c1 100644 --- a/mutant-lib/Cargo.toml +++ b/mutant-lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mutant-lib" -version = "0.6.1" +version = "0.6.2" edition = "2021" license = " LGPL-3.0-only" description = "Core library for MutAnt distributed mutable key value storage over Autonomi network" @@ -43,7 +43,7 @@ serde_json = "1.0" anyhow = "1.0" bincode = "1.3" xdg = "2.5.2" -mutant-protocol = { path = "../mutant-protocol", version = "0.6.1" } +mutant-protocol = { path = "../mutant-protocol", version = "0.6.2" } async-channel = "2.3.1" never = "0.1.0" crc = "3.2.1" diff --git a/mutant-lib/README.md b/mutant-lib/README.md index 5482c2f9..946dabdf 100644 --- a/mutant-lib/README.md +++ b/mutant-lib/README.md @@ -127,7 +127,7 @@ $> cargo install mutant You can fetch the daily meme of the day with the following command: ```bash -$> mutant get -p 9620303d41cd65177e6763809f4780b1fa7d864a14d4ed0ed0322c5d4524fe406db65fdc485f1737814c81ab6d61dab0 daily_meme.jpg +$> mutant get -p a420224971527d61ce6ee21d850a07c243498c95808697e8fac23f461545656933016697d10b805c0fa26b50eb3532b2 daily_meme.jpg ``` ## Command-Line Interface (CLI) @@ -169,7 +169,7 @@ Options: #### Store/fetch private data ```bash -# Store a file directly +# Store a the file `data.txt` under the name `mykey` $> mutant put mykey data.txt # Get a value and save to a file @@ -256,7 +256,7 @@ Add `mutant-lib` and its dependencies to your `Cargo.toml`: ```toml [dependencies] -mutant-lib = "0.6.1" # Or the version you need +mutant-lib = "0.6.2" # Or the version you need tokio = { version = "1", features = ["full"] } ``` @@ -277,7 +277,6 @@ async fn main() -> anyhow::Result<()> { mutant.put("greeting", b"hello world", StorageMode::Medium, false).await?; let fetched_value = mutant.get("greeting").await?; - println!("Fetched value: {}", String::from_utf8_lossy(&fetched_value)); mutant.rm("greeting").await?; @@ -353,45 +352,8 @@ async fn main() -> Result<()> { } ``` -## Development and Testing - -### Local Testnet Management (`scripts/manage_local_testnet.sh`) - -### Running Integration Tests (`scripts/run_tests_with_env.sh`) - -## Migration - -## Architecture Overview - -MutAnt consists of five main components that work together to provide a complete storage solution: - -1. **mutant-lib**: Core library handling chunking, encryption, and storage operations -2. **mutant-protocol**: Shared communication format definitions -3. **mutant-daemon**: Background service maintaining Autonomi connection -4. **mutant-client**: WebSocket client library for communicating with the daemon -5. **mutant-cli**: Command-line interface for end users - -These components work together in a client-server architecture: - -- The **daemon** uses **mutant-lib** to interact with the Autonomi network -- **Clients** (CLI or custom applications) connect to the daemon via WebSocket -- Communication between clients and the daemon uses the protocol definitions -- The daemon manages concurrent operations, background tasks, and network connections - -This architecture provides several benefits: -- Persistent connection to the Autonomi network -- Background processing of long-running operations -- Task management and monitoring -- Concurrent operations with efficient resource usage -- Worker pools for optimized performance - -### Internal Architecture +# Donate -Under the hood, MutAnt uses a worker architecture for handling operations: -- 10 workers, each with a dedicated client -- Each worker manages 10 concurrent operations (tasks) -- Total of 100 concurrent operations -- Round-robin distribution of work with work stealing -- Automatic recycling of failed pads +If you find this project useful, you can donate to support its development. <3 -## Configuration \ No newline at end of file +ETH/ANT `0x3376C33FdC0c885cef483690ffDd1e0Ff0Eb026c` diff --git a/mutant-lib/src/index/integration_tests.rs b/mutant-lib/src/index/integration_tests.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/mutant-lib/src/index/master_index/key_management.rs b/mutant-lib/src/index/master_index/key_management.rs index 9be7022d..c7e4850b 100644 --- a/mutant-lib/src/index/master_index/key_management.rs +++ b/mutant-lib/src/index/master_index/key_management.rs @@ -20,7 +20,7 @@ impl MasterIndex { } let chunk_ranges = self.chunk_data(data_bytes, mode); - let pads = self.aquire_pads(data_bytes, &chunk_ranges)?; + let pads = self.acquire_pads(data_bytes, &chunk_ranges)?; if public { // Empty index pad info for now diff --git a/mutant-lib/src/index/master_index/pad_management.rs b/mutant-lib/src/index/master_index/pad_management.rs index f559e1ba..d14e9997 100644 --- a/mutant-lib/src/index/master_index/pad_management.rs +++ b/mutant-lib/src/index/master_index/pad_management.rs @@ -1,11 +1,11 @@ -use crate::error::Error; +use crate::{error::Error, index::PadStatus}; use crate::index::error::IndexError; use crate::index::pad_info::PadInfo; use crate::storage::ScratchpadAddress; use mutant_protocol::StorageMode; use std::ops::Range; -use super::MasterIndex; +use super::{IndexEntry, MasterIndex}; impl MasterIndex { pub fn chunk_data(&self, data_bytes: &[u8], mode: StorageMode) -> Vec> { @@ -22,7 +22,7 @@ impl MasterIndex { ranges } - pub fn aquire_pads( + pub fn acquire_pads( &mut self, data_bytes: &[u8], chunk_ranges: &[Range], @@ -45,14 +45,17 @@ impl MasterIndex { for (i, range) in ranges.iter().enumerate() { let chunk_data_slice = &data_bytes[range.clone()]; let mut pad_info = available_pads.remove(0); - pad_info.chunk_index = i; - pad_info.size = chunk_data_slice.len(); - pad_info.checksum = PadInfo::checksum(chunk_data_slice); + + pad_info = pad_info.update_data(chunk_data_slice, i); generated_pads.push(pad_info); } - // Add any remaining unused acquired pads back to the free list - self.free_pads.extend(available_pads); + // // Add any remaining unused acquired pads back to the free list + // self.free_pads.extend(available_pads); + + if !available_pads.is_empty() { + return Err(Error::Internal("Unexpected remaining pads detected. This indicates a bug.".to_string())); + } Ok(generated_pads) } @@ -73,6 +76,7 @@ impl MasterIndex { self.free_pads.len() ))); } + let taken_free_pads: Vec<_> = self.free_pads.drain(..pads_to_take_from_free).collect(); // Then, generate the remaining required new pads @@ -101,6 +105,36 @@ impl MasterIndex { Ok(available_pads) } + + /// Update a key with a new set of pads + pub fn update_key_with_pads( + &mut self, + key_name: &str, + pads: Vec, + index_pad: Option, + ) -> Result<(), Error> { + if !self.index.contains_key(key_name) { + return Err(Error::Index(IndexError::KeyNotFound(key_name.to_string()))); + } + + // Update the key with the new pads + if let Some(index_pad) = index_pad { + // For public keys, update with the index pad + self.index.insert( + key_name.to_string(), + IndexEntry::PublicUpload(index_pad, pads), + ); + } else { + // For private keys, just update with the new pads + self.index.insert(key_name.to_string(), IndexEntry::PrivateKey(pads)); + } + + // Save the updated index + self.save(self.network_choice)?; + + Ok(()) + } + pub async fn recycle_errored_pad( &mut self, key_name: &str, @@ -122,10 +156,7 @@ impl MasterIndex { let old_pad = pads[pad_index].clone(); // Configure the new pad based on the old one - new_pad.checksum = old_pad.checksum; - new_pad.size = old_pad.size; - new_pad.chunk_index = old_pad.chunk_index; - new_pad.last_known_counter = old_pad.last_known_counter + 1; + Self::update_pad_properties(&mut new_pad, &old_pad); // new_pad already has its new address and sk_bytes // Replace the old pad info in the vector @@ -146,10 +177,7 @@ impl MasterIndex { let old_pad = index_pad.clone(); // Configure the new pad based on the old one - new_pad.checksum = old_pad.checksum; - new_pad.size = old_pad.size; - new_pad.chunk_index = old_pad.chunk_index; // Should be 0 - new_pad.last_known_counter = old_pad.last_known_counter + 1; + Self::update_pad_properties(&mut new_pad, &old_pad); // Replace the index pad info *index_pad = new_pad.clone(); @@ -164,10 +192,7 @@ impl MasterIndex { let old_pad = pads[data_pad_index].clone(); // Configure the new pad info based on the old one - new_pad.checksum = old_pad.checksum; - new_pad.size = old_pad.size; - new_pad.chunk_index = old_pad.chunk_index; - new_pad.last_known_counter = old_pad.last_known_counter + 1; + Self::update_pad_properties(&mut new_pad, &old_pad); // new_pad already has its new address and sk_bytes // Replace the old pad info in the vector @@ -215,4 +240,28 @@ impl MasterIndex { address_exists || index_exists } + + /// Helper function to update a pad's properties based on another pad + /// This is used when recycling pads or updating pad information + pub(crate) fn update_pad_properties(target_pad: &mut PadInfo, source_pad: &PadInfo) { + target_pad.checksum = source_pad.checksum; + target_pad.size = source_pad.size; + target_pad.chunk_index = source_pad.chunk_index; + } + + pub(crate) fn free_pads(&mut self, pads: Vec) -> Result<(), Error> { + for mut pad in pads{ + if pad.status == PadStatus::Generated { + self.pending_verification_pads.push(pad); + } else { + pad.status = PadStatus::Free; + self.free_pads.push(pad); + } + + } + + self.save(self.network_choice)?; + + Ok(()) + } } diff --git a/mutant-lib/src/index/master_index/public_keys.rs b/mutant-lib/src/index/master_index/public_keys.rs index 053d7be3..36003b91 100644 --- a/mutant-lib/src/index/master_index/public_keys.rs +++ b/mutant-lib/src/index/master_index/public_keys.rs @@ -1,7 +1,6 @@ use crate::error::Error; use crate::index::error::IndexError; use crate::index::pad_info::PadInfo; -use log::info; use super::{IndexEntry, MasterIndex}; @@ -18,43 +17,21 @@ impl MasterIndex { None } - /// Updates a public key with a new set of data pads but preserves the original index pad - /// - /// This is critical for public keys because the index pad address must remain the same - /// to maintain accessibility of the key through its public address. - pub fn update_public_key_with_preserved_index_pad( - &mut self, - key_name: &str, - preserved_index_pad: PadInfo, - ) -> Result<(), Error> { - if !self.contains_key(key_name) { - return Err(IndexError::KeyNotFound(key_name.to_string()).into()); - } - - if let Some(entry) = self.index.get_mut(key_name) { - if let IndexEntry::PublicUpload(index_pad, _) = entry { - *index_pad = preserved_index_pad; - } else { - return Err(IndexError::KeyNotFound(key_name.to_string()).into()); - } - } - - self.save(self.network_choice)?; - - info!("Updated public key {} with preserved index pad", key_name); - - Ok(()) - } - pub fn populate_index_pad(&mut self, key_name: &str) -> Result<(PadInfo, Vec), Error> { - match self.index.get_mut(key_name) { + let res = match self.index.get_mut(key_name) { Some(IndexEntry::PublicUpload(index_pad, pads)) => { let index_data = serde_cbor::to_vec(&pads).unwrap(); + index_pad.size = index_data.len(); index_pad.checksum = PadInfo::checksum(&index_data); + Ok((index_pad.clone(), index_data)) } _ => Err(IndexError::KeyNotFound(key_name.to_string()).into()), - } + }; + + self.save(self.network_choice)?; + + res } } diff --git a/mutant-lib/src/index/master_index/tests.rs b/mutant-lib/src/index/master_index/tests.rs index c2f6e9ab..9896f570 100644 --- a/mutant-lib/src/index/master_index/tests.rs +++ b/mutant-lib/src/index/master_index/tests.rs @@ -204,7 +204,7 @@ fn test_list() { } #[test] -fn test_aquire_pads_reuse_free() { +fn test_acquire_pads_reuse_free() { let (_td, mut index) = setup_test_environment(); let data1 = vec![1u8; DEFAULT_SCRATCHPAD_SIZE]; let data2 = vec![2u8; DEFAULT_SCRATCHPAD_SIZE]; @@ -240,7 +240,7 @@ fn test_aquire_pads_reuse_free() { } #[test] -fn test_aquire_pads_generate_new() { +fn test_acquire_pads_generate_new() { let (_td, mut index) = setup_test_environment(); let data = vec![1u8; DEFAULT_SCRATCHPAD_SIZE * 2]; let key = "key"; @@ -257,7 +257,7 @@ fn test_aquire_pads_generate_new() { } #[test] -fn test_aquire_pads_mix_free_and_new() { +fn test_acquire_pads_mix_free_and_new() { let (_td, mut index) = setup_test_environment(); let data1 = vec![1u8; DEFAULT_SCRATCHPAD_SIZE]; let data3 = vec![3u8; DEFAULT_SCRATCHPAD_SIZE * 3]; // Requires 3 pads diff --git a/mutant-lib/src/index/mod.rs b/mutant-lib/src/index/mod.rs index 16312e8c..176072bd 100644 --- a/mutant-lib/src/index/mod.rs +++ b/mutant-lib/src/index/mod.rs @@ -3,6 +3,3 @@ pub mod master_index; pub mod pad_info; pub(crate) use pad_info::{PadInfo, PadStatus}; - -#[cfg(test)] -mod integration_tests; diff --git a/mutant-lib/src/ops/put/context.rs b/mutant-lib/src/ops/put/context.rs index 4f212b8e..2e8b5949 100644 --- a/mutant-lib/src/ops/put/context.rs +++ b/mutant-lib/src/ops/put/context.rs @@ -1,4 +1,3 @@ -use crate::index::PadInfo; use crate::network::Network; use std::{ops::Range, sync::Arc}; use tokio::sync::RwLock; @@ -13,10 +12,7 @@ pub struct Context { pub data: Arc>, pub chunk_ranges: Arc>>, pub public: bool, - /// Optional preserved index pad for public key updates - pub preserved_index_pad: Option, - /// Flag to indicate if this context is for an index pad - pub is_index_pad: bool, + pub encoding: u64, } /// Context for put tasks diff --git a/mutant-lib/src/ops/put/mod.rs b/mutant-lib/src/ops/put/mod.rs index 9e2bbb91..258e4809 100644 --- a/mutant-lib/src/ops/put/mod.rs +++ b/mutant-lib/src/ops/put/mod.rs @@ -2,6 +2,8 @@ mod context; mod operations; mod pipeline; mod task; +#[cfg(test)] +mod tests; use crate::error::Error; use crate::network::Network; diff --git a/mutant-lib/src/ops/put/operations.rs b/mutant-lib/src/ops/put/operations.rs index 27fd01c0..6a362393 100644 --- a/mutant-lib/src/ops/put/operations.rs +++ b/mutant-lib/src/ops/put/operations.rs @@ -1,9 +1,10 @@ use crate::error::Error; -use crate::index::PadStatus; +use crate::index::{PadInfo, PadStatus}; use crate::internal_events::invoke_put_callback; use crate::network::Network; +use crate::ops::{DATA_ENCODING_PRIVATE_DATA, DATA_ENCODING_PUBLIC_DATA, DATA_ENCODING_PUBLIC_INDEX}; use autonomi::ScratchpadAddress; -use log::{info, warn}; +use log::info; use mutant_protocol::{PutCallback, PutEvent, StorageMode}; use std::sync::Arc; use tokio::sync::RwLock; @@ -11,16 +12,19 @@ use tokio::sync::RwLock; use super::context::Context; use super::pipeline::write_pipeline; -/// Update a key with new content, preserving the public index pad if applicable. +/// Efficiently update a key with new content by reusing pads with matching checksums. /// -/// For public keys, this function ensures that the public index pad is preserved during updates. -/// This is critical because the public index pad must remain the same to maintain accessibility -/// of the key through its public address. The function: -/// 1. Extracts and preserves the index pad from the existing public key -/// 2. Removes the key (which moves all pads to free_pads or pending_verification_pads) -/// 3. Creates a new key with the updated content -/// 4. Replaces the newly created index pad with the preserved one -/// 5. Updates the index pad data to reflect the new content +/// This function: +/// 1. Compares checksums between existing pads and new data chunks +/// 2. Reuses pads with matching checksums (keeps them as is) +/// 3. Marks pads with different checksums as needing update (sets to Free status) +/// 4. Handles cases where the new data is longer or shorter than the existing data +/// 5. Preserves the public index pad for public keys +/// +/// This approach is more efficient than the original update method because it: +/// - Avoids unnecessary network operations for unchanged data chunks +/// - Preserves pad addresses when possible, reducing network churn +/// - Only uploads data that has actually changed pub async fn update( index: Arc>, network: Arc, @@ -31,62 +35,179 @@ pub async fn update( no_verify: bool, put_callback: Option, ) -> Result { - info!("Update for {}", key_name); + info!("Efficient update for {}", key_name); + + // Get existing pads for the key + let existing_pads = index.read().await.get_pads(key_name); + if existing_pads.is_empty() { + return Err(Error::Internal(format!("Key '{}' not found", key_name))); + } + + // Calculate chunk ranges for the new content + let chunk_ranges = index.read().await.chunk_data(&content, mode.clone()); // Special handling for public keys to preserve the index pad - let mut preserved_index_pad = if public && index.read().await.is_public(key_name) { + let preserved_index_pad = if public && index.read().await.is_public(key_name) { info!("Preserving public index pad for key {}", key_name); index.read().await.extract_public_index_pad(key_name) } else { None }; - // Remove the key (this will move all pads to free_pads or pending_verification_pads) - index.write().await.remove_key(key_name).unwrap(); + // Compare the number of pads needed for the new content vs. existing pads + let existing_data_pads_count = existing_pads.len(); + + let new_data_pads_count = chunk_ranges.len(); + + info!( + "Update for key '{}': existing pads: {}, new chunks: {}", + key_name, existing_data_pads_count, new_data_pads_count + ); + + // Create a vector to hold the updated pads + let mut updated_pads = Vec::with_capacity(new_data_pads_count); + + // Process existing pads up to the minimum of existing and new counts + let min_count = std::cmp::min(existing_data_pads_count, new_data_pads_count); + + for i in 0..min_count { + let pad = &existing_pads[i]; + let chunk_range = &chunk_ranges[i]; + let chunk_data = &content[chunk_range.clone()]; + let chunk_checksum = PadInfo::checksum(chunk_data); + + // If checksums match, keep the pad as is + if pad.checksum == chunk_checksum { + info!("Pad {} (chunk {}) has matching checksum, keeping as is", pad.address, i); + updated_pads.push(pad.clone()); + } else { + // If checksums don't match, mark the pad for update + info!("Pad {} (chunk {}) has different checksum, marking for update", pad.address, i); + let mut updated_pad = pad.clone(); + updated_pad.status = PadStatus::Free; + updated_pad.checksum = chunk_checksum; + updated_pad.size = chunk_data.len(); + updated_pad.last_known_counter += 1; + updated_pads.push(updated_pad); + } + } - // Create a new key with the updated content - let (pads, chunk_ranges) = index - .write() - .await - .create_key(key_name, &content, mode, public)?; + // If we need more pads than we have, acquire new ones + if new_data_pads_count > existing_data_pads_count { + info!( + "Need {} additional pads for key '{}'", + new_data_pads_count - existing_data_pads_count, + key_name + ); - info!("Created key {} with {} pads", key_name, pads.len()); + // Acquire additional pads for the remaining chunks + let additional_chunks = chunk_ranges[existing_data_pads_count..].to_vec(); - let address = pads[0].address; + // Determine the start and end of the additional content in the original content + let start_offset = additional_chunks[0].start; + let end_offset = additional_chunks.last().unwrap().end; - let context = Context { - index: index.clone(), - network: network.clone(), - name: Arc::new(key_name.to_string()), - chunk_ranges: Arc::new(chunk_ranges), - data: content.clone(), - public, - preserved_index_pad: None, - is_index_pad: false, - }; + // Create a slice of the original content + let additional_content = Arc::new(content[start_offset..end_offset].to_vec()); - // Write the data pads - write_pipeline(context, pads.clone(), no_verify, put_callback.clone()).await?; + // Create new chunk ranges relative to the slice + let mut new_chunk_ranges = Vec::new(); + for chunk_range in &additional_chunks { + // Adjust the range to be relative to the slice + let relative_start = chunk_range.start - start_offset; + let relative_end = chunk_range.end - start_offset; + new_chunk_ranges.push(relative_start..relative_end); + } - // For public keys, we need to write the index pad - if public { - let (index_pad, index_data) = index.write().await.populate_index_pad(key_name)?; - let index_data_bytes: Arc> = Arc::new(index_data); - let index_chunk_ranges = Arc::new(vec![0..index_data_bytes.len()]); + // Use the existing acquire_pads method to get the additional pads + let mut additional_pads = index + .write() + .await + .acquire_pads(&additional_content, &new_chunk_ranges)?; - preserved_index_pad = preserved_index_pad.map(|mut old_pad| { - old_pad.checksum = index_pad.checksum; - old_pad.size = index_pad.size; - old_pad.status = PadStatus::Free; - old_pad.last_known_counter += 1; + // Update the chunk indices for the additional pads + for (i, pad) in additional_pads.iter_mut().enumerate() { + // Set the correct chunk index (offset by the number of existing pads) + pad.chunk_index = existing_data_pads_count + i; + } - old_pad - }); + updated_pads.extend(additional_pads); + } else if new_data_pads_count < existing_data_pads_count { + // If we have more pads than we need, free the excess ones + info!( + "Freeing {} excess pads for key '{}'", + existing_data_pads_count - new_data_pads_count, + key_name + ); + + // Move excess pads to the free list + let excess_pads = existing_pads[new_data_pads_count..].to_vec(); + + index.write().await.free_pads(excess_pads)?; + } - //call update_public_key_with_preserved_index_pad - if let Some(preserved_index_pad) = &preserved_index_pad { - index.write().await.update_public_key_with_preserved_index_pad(key_name, preserved_index_pad.clone())?; + // Update the key in the master index + if public { + // For public keys, update with the preserved index pad + if let Some(mut index_pad) = preserved_index_pad { + index_pad.status = PadStatus::Free; + index_pad.last_known_counter += 1; + + index.write().await.update_key_with_pads(key_name, updated_pads.clone(), Some(index_pad))?; + } else { + // This shouldn't happen for existing public keys, but handle it just in case + index.write().await.update_key_with_pads(key_name, updated_pads.clone(), None)?; } + } else { + // For private keys, just update with the new pads + index.write().await.update_key_with_pads(key_name, updated_pads.clone(), None)?; + } + + // Get the address from the first pad + let address = updated_pads[0].address; + + // Filter pads that need to be written (status is Free or Generated) + let pads_to_write: Vec = updated_pads + .iter() + .filter(|p| p.status == PadStatus::Free || p.status == PadStatus::Generated) + .cloned() + .collect(); + + if !pads_to_write.is_empty() { + info!( + "Writing {} pads for key '{}' (out of {} total)", + pads_to_write.len(), + key_name, + updated_pads.len() + ); + + let encoding = if public { + DATA_ENCODING_PUBLIC_DATA + } else { + DATA_ENCODING_PRIVATE_DATA + }; + + let context = Context { + index: index.clone(), + network: network.clone(), + name: Arc::new(key_name.to_string()), + chunk_ranges: Arc::new(chunk_ranges), + data: content.clone(), + public, + encoding, + }; + + // Write only the pads that need updating + write_pipeline(context, pads_to_write, no_verify, put_callback.clone()).await?; + } else { + info!("No pads need to be written for key '{}'", key_name); + } + + // For public keys, update and write the index pad + if public { + let (index_pad, index_data) = index.write().await.populate_index_pad(key_name)?; + let index_data_bytes: Arc> = Arc::new(index_data); + let index_chunk_ranges = Arc::new(vec![0..index_data_bytes.len()]); let index_pad_context = Context { index: index.clone(), @@ -95,21 +216,20 @@ pub async fn update( chunk_ranges: index_chunk_ranges, data: index_data_bytes, public, - preserved_index_pad: preserved_index_pad.clone(), - is_index_pad: true, // This is an index pad + encoding: DATA_ENCODING_PUBLIC_INDEX, }; // Write the index pad write_pipeline( index_pad_context, - vec![preserved_index_pad.unwrap_or(index_pad)], + vec![index_pad], no_verify, put_callback.clone(), ) .await?; } - // Final completion callback after all pipelines are done + // Final completion callback invoke_put_callback(&put_callback, PutEvent::Complete) .await .unwrap(); @@ -146,15 +266,17 @@ pub async fn resume( let chunk_ranges = index.read().await.chunk_data(&data_bytes, mode.clone()); + // If the number of pads doesn't match the number of chunks, use the efficient update + // which can handle adding or removing pads as needed if pads.len() != chunk_ranges.len() { - warn!( - "Resuming key '{}' with data size mismatch. Index has {} pads, current data requires {}. Forcing rewrite.", + info!( + "Resuming key '{}' with data size mismatch. Index has {} pads, current data requires {}. Using efficient update.", name, pads.len(), chunk_ranges.len() ); - index.write().await.remove_key(name)?; - return first_store( + + return update( index, network, name, @@ -162,11 +284,30 @@ pub async fn resume( mode, public, no_verify, - put_callback.clone(), + put_callback, ) .await; } + // If all checksums match, we can just return success + if index.read().await.verify_checksum(name, &data_bytes, mode.clone()) { + info!("All checksums match for key '{}', no upload needed", name); + + // Final completion callback + invoke_put_callback(&put_callback, PutEvent::Complete) + .await + .unwrap(); + + return Ok(pads[0].address); + } + + // If we get here, we need to upload the data but the pad count matches + let encoding = if public { + DATA_ENCODING_PUBLIC_DATA + } else { + DATA_ENCODING_PRIVATE_DATA + }; + let context = Context { index: index.clone(), network: network.clone(), @@ -174,11 +315,16 @@ pub async fn resume( data: data_bytes.clone(), chunk_ranges: Arc::new(chunk_ranges), public, - preserved_index_pad: None, - is_index_pad: false, // This is for data pads, not the index pad + encoding, }; - write_pipeline(context, pads.clone(), no_verify, put_callback.clone()).await?; + // Mark pads as Free to ensure they get uploaded + let mut pads_to_write = pads.clone(); + for pad in &mut pads_to_write { + pad.status = PadStatus::Free; + } + + write_pipeline(context, pads_to_write, no_verify, put_callback.clone()).await?; if public { let (index_pad, index_data) = index.write().await.populate_index_pad(name)?; @@ -186,15 +332,13 @@ pub async fn resume( let index_chunk_ranges = Arc::new(vec![0..index_data_bytes.len()]); let index_pad_context = Context { - // Reuse index and network Arcs index: index.clone(), network: network.clone(), - name: Arc::new(name.to_string()), // Reuse name Arc + name: Arc::new(name.to_string()), chunk_ranges: index_chunk_ranges, data: index_data_bytes, - public, // Keep public flag - preserved_index_pad: None, // No preserved index pad for normal operations - is_index_pad: true, // This is an index pad + public, + encoding: DATA_ENCODING_PUBLIC_INDEX, }; // Call write_pipeline again for the single index pad @@ -202,7 +346,7 @@ pub async fn resume( index_pad_context, vec![index_pad], no_verify, - put_callback.clone(), // Clone the callback Arc again + put_callback.clone(), ) .await?; } @@ -229,6 +373,12 @@ pub async fn first_store( let address = pads[0].address; + let encoding = if public { + DATA_ENCODING_PUBLIC_DATA + } else { + DATA_ENCODING_PRIVATE_DATA + }; + let context = Context { index: index.clone(), network: network.clone(), @@ -236,8 +386,7 @@ pub async fn first_store( chunk_ranges: Arc::new(chunk_ranges), data: data_bytes.clone(), public, - preserved_index_pad: None, - is_index_pad: false, // This is for data pads, not the index pad + encoding, }; write_pipeline(context, pads.clone(), no_verify, put_callback.clone()).await?; @@ -255,8 +404,7 @@ pub async fn first_store( chunk_ranges: index_chunk_ranges, data: index_data_bytes, public, // Keep public flag - preserved_index_pad: None, // No preserved index pad for normal operations - is_index_pad: true, // This is an index pad + encoding: DATA_ENCODING_PUBLIC_INDEX, }; // Call write_pipeline again for the single index pad diff --git a/mutant-lib/src/ops/put/pipeline.rs b/mutant-lib/src/ops/put/pipeline.rs index a897f151..96cba53d 100644 --- a/mutant-lib/src/ops/put/pipeline.rs +++ b/mutant-lib/src/ops/put/pipeline.rs @@ -127,12 +127,14 @@ pub async fn write_pipeline( let task_processor = PutTaskProcessor::new(put_task_context.clone()); // 3. Create WorkerPoolConfig + // For the total_items_hint, we use the number of pads we're actually processing + // This ensures the worker pool will only wait for the pads we're sending to it let config = WorkerPoolConfig { network: context.network.clone(), // Clone network Arc client_config: crate::network::client::Config::Put, // Use crate path task_processor, enable_recycling: true, // Ensure recycling is enabled for PUT - total_items_hint: initial_process_count, + total_items_hint: initial_process_count, // Use the number of pads we're actually processing }; debug!( diff --git a/mutant-lib/src/ops/put/task.rs b/mutant-lib/src/ops/put/task.rs index 937c7ec6..c0bef4a1 100644 --- a/mutant-lib/src/ops/put/task.rs +++ b/mutant-lib/src/ops/put/task.rs @@ -7,14 +7,12 @@ use crate::ops::MAX_CONFIRMATION_DURATION; use async_trait::async_trait; use log::{debug, error, info, warn}; use mutant_protocol::PutEvent; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; +use std::time::Duration; use tokio::time::Instant; +use super::super::PAD_RECYCLING_RETRIES; use super::context::PutTaskContext; -use super::super::{ - DATA_ENCODING_PRIVATE_DATA, DATA_ENCODING_PUBLIC_DATA, DATA_ENCODING_PUBLIC_INDEX, - PAD_RECYCLING_RETRIES, -}; #[derive(Clone)] pub struct PutTaskProcessor { @@ -28,9 +26,7 @@ impl PutTaskProcessor { } #[async_trait] -impl AsyncTask - for PutTaskProcessor -{ +impl AsyncTask for PutTaskProcessor { type ItemId = usize; async fn process( @@ -54,43 +50,6 @@ impl AsyncTask }; if should_put { - // Use the is_index_pad flag from the context to determine if this is an index pad - let is_index_pad = is_public && self.context.base_context.is_index_pad; - - // Check if we have a preserved index pad that we should use instead - if is_index_pad && self.context.base_context.preserved_index_pad.is_some() { - // If we have a preserved index pad, we should use its address instead - if let Some(preserved_pad) = &self.context.base_context.preserved_index_pad { - info!( - "Worker {} using preserved index pad address {} instead of {}", - worker_id, preserved_pad.address, pad_state.address - ); - pad_state.address = preserved_pad.address; - } - } - - let data_encoding = if is_public { - if is_index_pad { - debug!( - "Using PUBLIC_INDEX encoding for index pad {} (chunk_index={})", - pad_state.address, pad_state.chunk_index - ); - DATA_ENCODING_PUBLIC_INDEX - } else { - debug!( - "Using PUBLIC_DATA encoding for data pad {} (chunk_index={})", - pad_state.address, pad_state.chunk_index - ); - DATA_ENCODING_PUBLIC_DATA - } - } else { - debug!( - "Using PRIVATE_DATA encoding for pad {} (chunk_index={})", - pad_state.address, pad_state.chunk_index - ); - DATA_ENCODING_PRIVATE_DATA - }; - let chunk_index = pad_state.chunk_index; let range = self .context @@ -128,7 +87,13 @@ impl AsyncTask .context .base_context .network - .put(client, &pad_state, chunk_data, data_encoding, is_public) + .put( + client, + &pad_state, + chunk_data, + self.context.base_context.encoding, + is_public, + ) .await; match put_result { @@ -227,6 +192,7 @@ impl AsyncTask let size_match = pad.size == get_result.data.len(); if checksum_match && counter_match && size_match { pad_state.status = PadStatus::Confirmed; + match self .context .base_context @@ -237,7 +203,7 @@ impl AsyncTask &self.context.base_context.name, ¤t_pad_address, PadStatus::Confirmed, - None, + Some(get_result.counter), // Pass the actual counter from the network ) { Ok(final_pad) => { confirmation_succeeded = true; diff --git a/mutant-lib/src/ops/put/tests.rs b/mutant-lib/src/ops/put/tests.rs new file mode 100644 index 00000000..b0122dca --- /dev/null +++ b/mutant-lib/src/ops/put/tests.rs @@ -0,0 +1,481 @@ +#[cfg(test)] +mod tests { + use std::sync::Arc; + use tokio::sync::RwLock; + use rand::RngCore; + + use crate::index::master_index::{IndexEntry, MasterIndex}; + use crate::index::{PadInfo, PadStatus}; + use crate::network::{Network, NetworkChoice}; + use crate::ops::put::operations::{first_store, update}; + use mutant_protocol::StorageMode; + + // Helper function to generate random data + fn generate_random_data(size: usize) -> Vec { + let mut data = vec![0u8; size]; + rand::thread_rng().fill_bytes(&mut data); + data + } + + // Helper function to setup test environment + async fn setup_test_environment() -> (Arc>, Arc) { + let network = Arc::new( + Network::new( + crate::network::DEV_TESTNET_PRIVATE_KEY_HEX, + NetworkChoice::Devnet, + ) + .expect("Failed to create network"), + ); + let index = Arc::new(RwLock::new(MasterIndex::new(NetworkChoice::Devnet))); + (index, network) + } + + // Helper function to compare data chunks + fn compare_chunks(original: &[u8], updated: &[u8], mode: StorageMode) -> (usize, usize) { + let chunk_size = mode.scratchpad_size(); + let original_chunks: Vec<&[u8]> = original.chunks(chunk_size).collect(); + let updated_chunks: Vec<&[u8]> = updated.chunks(chunk_size).collect(); + + let mut unchanged_count = 0; + let mut changed_count = 0; + + for (i, updated_chunk) in updated_chunks.iter().enumerate() { + if i < original_chunks.len() && PadInfo::checksum(original_chunks[i]) == PadInfo::checksum(updated_chunk) { + unchanged_count += 1; + } else { + changed_count += 1; + } + } + + (unchanged_count, changed_count) + } + + #[tokio::test] + async fn test_update_same_size_private() { + let (index, network) = setup_test_environment().await; + let key_name = "test_update_same_size"; + let initial_data = Arc::new(generate_random_data(1024)); + let mode = StorageMode::Medium; + + // First store the initial data + let initial_result = first_store( + index.clone(), + network.clone(), + key_name, + initial_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(initial_result.is_ok(), "Initial store failed"); + + // Get the initial pads + let initial_pads = index.read().await.get_pads(key_name); + assert!(!initial_pads.is_empty(), "No pads found for initial store"); + + // Create updated data with some changes but same size + let mut updated_data = initial_data.as_ref().clone(); + // Modify part of the data to ensure some chunks change + for i in 0..100 { + updated_data[i] = 0xFF; + } + let updated_data = Arc::new(updated_data); + + // Calculate how many chunks should change + let (_unchanged_count, changed_count) = compare_chunks(&initial_data, &updated_data, mode.clone()); + + // Update the key + let update_result = update( + index.clone(), + network.clone(), + key_name, + updated_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(update_result.is_ok(), "Update failed"); + + // Get the updated pads + let updated_pads = index.read().await.get_pads(key_name); + + // Verify the number of pads is the same + assert_eq!( + initial_pads.len(), + updated_pads.len(), + "Number of pads changed after update" + ); + + // Count how many pads were actually updated + let updated_count = updated_pads + .iter() + .filter(|p| p.last_known_counter > 0) + .count(); + + // Verify that only the changed chunks were updated + assert_eq!( + changed_count, updated_count, + "Expected {} pads to be updated, but {} were updated", + changed_count, updated_count + ); + + // Verify we can retrieve the updated data + let retrieved_data = index.read().await.verify_checksum(key_name, &updated_data, mode.clone()); + assert!(retrieved_data, "Retrieved data doesn't match updated data"); + } + + #[tokio::test] + async fn test_update_larger_size_private() { + let (index, network) = setup_test_environment().await; + let key_name = "test_update_larger_size"; + + // Use a small initial size that will fit in a single pad + let chunk_size = StorageMode::Medium.scratchpad_size(); + let initial_data = Arc::new(generate_random_data(chunk_size / 2)); + let mode = StorageMode::Medium; + + // First store the initial data + let initial_result = first_store( + index.clone(), + network.clone(), + key_name, + initial_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(initial_result.is_ok(), "Initial store failed"); + + // Get the initial pads + let initial_pads = index.read().await.get_pads(key_name); + let initial_pad_count = initial_pads.len(); + assert_eq!(initial_pad_count, 1, "Expected initial data to use exactly 1 pad"); + + // Create updated data with larger size that will require multiple pads + let updated_data = Arc::new(generate_random_data(chunk_size * 3)); // Use 3 pads + + // Update the key + let update_result = update( + index.clone(), + network.clone(), + key_name, + updated_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(update_result.is_ok(), "Update failed"); + + // Get the updated pads + let updated_pads = index.read().await.get_pads(key_name); + + // Verify the number of pads increased + assert!( + updated_pads.len() > initial_pad_count, + "Number of pads did not increase after update with larger data. Initial: {}, Updated: {}", + initial_pad_count, + updated_pads.len() + ); + + // Verify we can retrieve the updated data + let retrieved_data = index.read().await.verify_checksum(key_name, &updated_data, mode.clone()); + assert!(retrieved_data, "Retrieved data doesn't match updated data"); + } + + #[tokio::test] + async fn test_update_smaller_size_private() { + let (index, network) = setup_test_environment().await; + let key_name = "test_update_smaller_size"; + + // Use a large initial size that will require multiple pads + let chunk_size = StorageMode::Medium.scratchpad_size(); + let initial_data = Arc::new(generate_random_data(chunk_size * 3)); // Use 3 pads + let mode = StorageMode::Medium; + + // First store the initial data + let initial_result = first_store( + index.clone(), + network.clone(), + key_name, + initial_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(initial_result.is_ok(), "Initial store failed"); + + // Get the initial pads + let initial_pads = index.read().await.get_pads(key_name); + let initial_pad_count = initial_pads.len(); + assert!(initial_pad_count > 1, "Expected initial data to use multiple pads"); + + // Create updated data with smaller size that will fit in a single pad + let updated_data = Arc::new(generate_random_data(chunk_size / 2)); // Use 1 pad + + // Update the key + let update_result = update( + index.clone(), + network.clone(), + key_name, + updated_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(update_result.is_ok(), "Update failed"); + + // Get the updated pads + let updated_pads = index.read().await.get_pads(key_name); + + // Verify the number of pads decreased + assert!( + updated_pads.len() < initial_pad_count, + "Number of pads did not decrease after update with smaller data. Initial: {}, Updated: {}", + initial_pad_count, + updated_pads.len() + ); + + // Verify we can retrieve the updated data + let retrieved_data = index.read().await.verify_checksum(key_name, &updated_data, mode.clone()); + assert!(retrieved_data, "Retrieved data doesn't match updated data"); + } + + #[tokio::test] + async fn test_update_public_key_preserve_index_pad() { + let (index, network) = setup_test_environment().await; + let key_name = "test_update_public_preserve_index"; + let initial_data = Arc::new(generate_random_data(1024)); + let mode = StorageMode::Medium; + + // First store the initial data as public + let initial_result = first_store( + index.clone(), + network.clone(), + key_name, + initial_data.clone(), + mode.clone(), + true, // public + false, // verify + None, // no callback + ) + .await; + + assert!(initial_result.is_ok(), "Initial public store failed"); + + // Get the initial entry to extract the index pad + let initial_index_pad = match index.read().await.get_entry(key_name) { + Some(IndexEntry::PublicUpload(index_pad, _)) => Some(index_pad.clone()), + _ => None, + }; + + assert!(initial_index_pad.is_some(), "No index pad found for public key"); + let initial_index_pad = initial_index_pad.unwrap(); + + // Create updated data + let updated_data = Arc::new(generate_random_data(1536)); // 1.5x the size + + // Update the key + let update_result = update( + index.clone(), + network.clone(), + key_name, + updated_data.clone(), + mode.clone(), + true, // public + false, // verify + None, // no callback + ) + .await; + + assert!(update_result.is_ok(), "Public update failed"); + + // Get the updated entry to check if the index pad was preserved + let updated_index_pad = match index.read().await.get_entry(key_name) { + Some(IndexEntry::PublicUpload(index_pad, _)) => Some(index_pad.clone()), + _ => None, + }; + + assert!(updated_index_pad.is_some(), "No index pad found after update"); + let updated_index_pad = updated_index_pad.unwrap(); + + // Verify the index pad address was preserved + assert_eq!( + initial_index_pad.address, updated_index_pad.address, + "Public index pad address changed after update" + ); + + // Verify the counter was incremented + assert!( + updated_index_pad.last_known_counter > initial_index_pad.last_known_counter, + "Public index pad counter was not incremented" + ); + + // Verify we can retrieve the updated data + let retrieved_data = index.read().await.verify_checksum(key_name, &updated_data, mode.clone()); + assert!(retrieved_data, "Retrieved data doesn't match updated data"); + } + + #[tokio::test] + async fn test_update_with_unchanged_chunks() { + let (index, network) = setup_test_environment().await; + let key_name = "test_update_unchanged_chunks"; + let initial_data = Arc::new(generate_random_data(2048)); + let mode = StorageMode::Medium; + + // First store the initial data + let initial_result = first_store( + index.clone(), + network.clone(), + key_name, + initial_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(initial_result.is_ok(), "Initial store failed"); + + // Get the initial pads + let initial_pads = index.read().await.get_pads(key_name); + + // Create updated data with only some chunks changed + let mut updated_data = initial_data.as_ref().clone(); + + // Only modify the first chunk, leave the rest unchanged + let chunk_size = mode.scratchpad_size(); + for i in 0..chunk_size.min(100) { + updated_data[i] = 0xFF; + } + + let updated_data = Arc::new(updated_data); + + // Update the key + let update_result = update( + index.clone(), + network.clone(), + key_name, + updated_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(update_result.is_ok(), "Update failed"); + + // Get the updated pads + let updated_pads = index.read().await.get_pads(key_name); + + // Verify only the first pad was updated (has a higher counter) + assert!( + updated_pads[0].last_known_counter > initial_pads[0].last_known_counter, + "First pad was not updated" + ); + + // Verify the other pads were not updated (same counter) + for i in 1..updated_pads.len() { + assert_eq!( + updated_pads[i].last_known_counter, initial_pads[i].last_known_counter, + "Pad {} was unnecessarily updated", i + ); + } + + // Verify we can retrieve the updated data + let retrieved_data = index.read().await.verify_checksum(key_name, &updated_data, mode.clone()); + assert!(retrieved_data, "Retrieved data doesn't match updated data"); + } + + #[tokio::test] + async fn test_update_with_additional_pads_confirmation() { + let (index, network) = setup_test_environment().await; + let key_name = "test_update_additional_pads_confirmation"; + + // Use a small initial size that will fit in a single pad + let chunk_size = StorageMode::Medium.scratchpad_size(); + let initial_data = Arc::new(generate_random_data(chunk_size / 2)); + let mode = StorageMode::Medium; + + // First store the initial data + let initial_result = first_store( + index.clone(), + network.clone(), + key_name, + initial_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(initial_result.is_ok(), "Initial store failed"); + + // Get the initial pads + let initial_pads = index.read().await.get_pads(key_name); + let initial_pad_count = initial_pads.len(); + assert_eq!(initial_pad_count, 1, "Expected initial data to use exactly 1 pad"); + + // Create updated data with larger size that will require multiple pads + // Use a size that will require 3 pads to ensure we're testing with multiple additional pads + let updated_data = Arc::new(generate_random_data(chunk_size * 3)); + + // Update the key + let update_result = update( + index.clone(), + network.clone(), + key_name, + updated_data.clone(), + mode.clone(), + false, // private + false, // verify + None, // no callback + ) + .await; + + assert!(update_result.is_ok(), "Update failed"); + + // Get the updated pads + let updated_pads = index.read().await.get_pads(key_name); + + // Verify the number of pads increased + assert_eq!( + updated_pads.len(), 3, + "Expected exactly 3 pads after update, got {}", + updated_pads.len() + ); + + // Verify all pads are in Confirmed status + for (i, pad) in updated_pads.iter().enumerate() { + assert_eq!( + pad.status, PadStatus::Confirmed, + "Pad {} is not in Confirmed status: {:?}", + i, pad.status + ); + } + + // Verify we can retrieve the updated data + let retrieved_data = index.read().await.verify_checksum(key_name, &updated_data, mode.clone()); + assert!(retrieved_data, "Retrieved data doesn't match updated data"); + } +} diff --git a/mutant-lib/src/ops/worker/builder.rs b/mutant-lib/src/ops/worker/builder.rs index 0d33dd39..dda0b75b 100644 --- a/mutant-lib/src/ops/worker/builder.rs +++ b/mutant-lib/src/ops/worker/builder.rs @@ -41,7 +41,7 @@ where )); } - let num_workers = *NB_CLIENTS; + let num_workers = NB_CLIENTS.min(config.total_items_hint); let batch_size = *BATCH_SIZE; // --- Channel Creation --- diff --git a/mutant-protocol/Cargo.toml b/mutant-protocol/Cargo.toml index 4ad0ee7f..cb2286f0 100644 --- a/mutant-protocol/Cargo.toml +++ b/mutant-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mutant-protocol" -version = "0.6.1" +version = "0.6.2" edition = "2021" license = "LGPL-3.0-only" description = "Protocol definitions for MutAnt distributed mutable key value storage over Autonomi network" diff --git a/mutant-protocol/README.md b/mutant-protocol/README.md index 587b7b89..946dabdf 100644 --- a/mutant-protocol/README.md +++ b/mutant-protocol/README.md @@ -127,7 +127,7 @@ $> cargo install mutant You can fetch the daily meme of the day with the following command: ```bash -$> mutant get -p 9620303d41cd65177e6763809f4780b1fa7d864a14d4ed0ed0322c5d4524fe406db65fdc485f1737814c81ab6d61dab0 daily_meme.jpg +$> mutant get -p a420224971527d61ce6ee21d850a07c243498c95808697e8fac23f461545656933016697d10b805c0fa26b50eb3532b2 daily_meme.jpg ``` ## Command-Line Interface (CLI) @@ -169,7 +169,7 @@ Options: #### Store/fetch private data ```bash -# Store a file directly +# Store a the file `data.txt` under the name `mykey` $> mutant put mykey data.txt # Get a value and save to a file @@ -256,7 +256,7 @@ Add `mutant-lib` and its dependencies to your `Cargo.toml`: ```toml [dependencies] -mutant-lib = "0.6.0" # Or the version you need +mutant-lib = "0.6.2" # Or the version you need tokio = { version = "1", features = ["full"] } ``` @@ -277,7 +277,6 @@ async fn main() -> anyhow::Result<()> { mutant.put("greeting", b"hello world", StorageMode::Medium, false).await?; let fetched_value = mutant.get("greeting").await?; - println!("Fetched value: {}", String::from_utf8_lossy(&fetched_value)); mutant.rm("greeting").await?; @@ -353,45 +352,8 @@ async fn main() -> Result<()> { } ``` -## Development and Testing - -### Local Testnet Management (`scripts/manage_local_testnet.sh`) - -### Running Integration Tests (`scripts/run_tests_with_env.sh`) - -## Migration - -## Architecture Overview - -MutAnt consists of five main components that work together to provide a complete storage solution: - -1. **mutant-lib**: Core library handling chunking, encryption, and storage operations -2. **mutant-protocol**: Shared communication format definitions -3. **mutant-daemon**: Background service maintaining Autonomi connection -4. **mutant-client**: WebSocket client library for communicating with the daemon -5. **mutant-cli**: Command-line interface for end users - -These components work together in a client-server architecture: - -- The **daemon** uses **mutant-lib** to interact with the Autonomi network -- **Clients** (CLI or custom applications) connect to the daemon via WebSocket -- Communication between clients and the daemon uses the protocol definitions -- The daemon manages concurrent operations, background tasks, and network connections - -This architecture provides several benefits: -- Persistent connection to the Autonomi network -- Background processing of long-running operations -- Task management and monitoring -- Concurrent operations with efficient resource usage -- Worker pools for optimized performance - -### Internal Architecture +# Donate -Under the hood, MutAnt uses a worker architecture for handling operations: -- 10 workers, each with a dedicated client -- Each worker manages 10 concurrent operations (tasks) -- Total of 100 concurrent operations -- Round-robin distribution of work with work stealing -- Automatic recycling of failed pads +If you find this project useful, you can donate to support its development. <3 -## Configuration \ No newline at end of file +ETH/ANT `0x3376C33FdC0c885cef483690ffDd1e0Ff0Eb026c`