From 3e0b1e5e1fc09974aa7df80ef6a3effc0c75cf7f Mon Sep 17 00:00:00 2001 From: EchoBT Date: Sat, 27 Dec 2025 17:51:05 +0000 Subject: [PATCH] fix: validator uses Subtensor.set_mechanism_weights() for CRv4 - Subtensor handles CRv4/commit-reveal automatically based on chain config - Fetch weights from platform-server each epoch - Submit burn weights (UID 0) if no challenges available - Reveal pending commits in RevealWindowOpen (for non-CRv4) - Clean centralized architecture (no P2P) --- Cargo.lock | 16 +-- bins/validator-node/Cargo.toml | 26 ++-- bins/validator-node/src/main.rs | 230 +++++++++++++++++++++++--------- 3 files changed, 174 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9e8dc77..bec5944a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9535,38 +9535,24 @@ name = "validator-node" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", - "bincode", + "bittensor-rs", "challenge-orchestrator", - "chrono", "clap", - "distributed-db", - "futures", "hex", "parking_lot 0.12.5", "platform-bittensor", - "platform-challenge-runtime", - "platform-challenge-sdk", - "platform-consensus", "platform-core", - "platform-epoch", - "platform-network", "platform-rpc", "platform-storage", "platform-subnet-manager", - "rand 0.8.5", "reqwest 0.12.25", "secure-container-runtime", - "sentry", - "sentry-tracing", "serde", "serde_json", - "sha2 0.10.9", "sp-core 38.1.0", "tokio", "tracing", "tracing-subscriber 0.3.22", - "uuid", ] [[package]] diff --git a/bins/validator-node/Cargo.toml b/bins/validator-node/Cargo.toml index 6d70870c..540ea8fd 100644 --- a/bins/validator-node/Cargo.toml +++ b/bins/validator-node/Cargo.toml @@ -12,45 +12,35 @@ name = "validator-node" path = "src/main.rs" [dependencies] +# Platform crates platform-core = { path = "../../crates/core" } -platform-network = { path = "../../crates/network" } -platform-consensus = { path = "../../crates/consensus" } platform-storage = { path = "../../crates/storage" } -platform-challenge-runtime = { path = "../../crates/challenge-runtime" } -platform-challenge-sdk = { path = "../../crates/challenge-sdk" } -platform-epoch = { path = "../../crates/epoch" } platform-bittensor = { path = "../../crates/bittensor-integration" } platform-rpc = { path = "../../crates/rpc-server" } platform-subnet-manager = { path = "../../crates/subnet-manager" } challenge-orchestrator = { path = "../../crates/challenge-orchestrator" } -distributed-db = { path = "../../crates/distributed-db" } secure-container-runtime = { path = "../../crates/secure-container-runtime" } -# Challenges are loaded dynamically from ChainState (Docker containers) -# No hardcoded challenge dependencies +# Bittensor (for weight submission) +bittensor-rs = { workspace = true } +# Async runtime tokio = { workspace = true } -futures = { workspace = true } -async-trait = { workspace = true } +# Serialization serde = { workspace = true } serde_json = { workspace = true } -bincode = { workspace = true } -# HTTP client for proxying to challenges +# HTTP client for platform-server reqwest = { version = "0.12", features = ["json"] } +# Logging tracing = { workspace = true } tracing-subscriber = { workspace = true } -sentry = { workspace = true } -sentry-tracing = { workspace = true } +# CLI clap = { workspace = true, features = ["env"] } anyhow = { workspace = true } hex = { workspace = true } parking_lot = { workspace = true } -uuid = { workspace = true } -sha2 = { workspace = true } -chrono = { workspace = true } sp-core = { workspace = true } -rand = { workspace = true } diff --git a/bins/validator-node/src/main.rs b/bins/validator-node/src/main.rs index 767b408f..33ea4485 100644 --- a/bins/validator-node/src/main.rs +++ b/bins/validator-node/src/main.rs @@ -1,7 +1,7 @@ //! Validator Node - Centralized Architecture //! //! All communication via platform-server (chain.platform.network). -//! No P2P networking. +//! No P2P networking. Weights submitted via Subtensor (handles CRv4 automatically). use anyhow::Result; use challenge_orchestrator::{ChallengeOrchestrator, OrchestratorConfig}; @@ -9,7 +9,7 @@ use clap::Parser; use parking_lot::RwLock; use platform_bittensor::{ signer_from_seed, sync_metagraph, BittensorClient, BittensorSigner, BlockSync, BlockSyncConfig, - BlockSyncEvent, + BlockSyncEvent, ExtrinsicWait, Subtensor, }; use platform_core::{production_sudo_key, ChainState, Keypair, NetworkConfig}; use platform_rpc::{RpcConfig, RpcServer}; @@ -24,7 +24,6 @@ use tracing::{debug, error, info, warn}; // ==================== Platform Server Client ==================== -/// HTTP client for platform-server #[derive(Clone)] pub struct PlatformServerClient { base_url: String, @@ -119,7 +118,6 @@ struct Args { #[arg(long, default_value = "1000")] stake: f64, - // Bittensor #[arg( long, env = "SUBTENSOR_ENDPOINT", @@ -133,31 +131,30 @@ struct Args { #[arg(long)] no_bittensor: bool, - // RPC #[arg(long, default_value = "8080")] rpc_port: u16, #[arg(long, default_value = "0.0.0.0")] rpc_addr: String, - // Docker #[arg(long, default_value = "true")] docker_challenges: bool, - // Container broker #[arg(long, env = "BROKER_WS_PORT", default_value = "8090")] broker_port: u16, #[arg(long, env = "BROKER_JWT_SECRET")] broker_jwt_secret: Option, - // Platform server (central) #[arg( long, env = "PLATFORM_SERVER_URL", default_value = "https://chain.platform.network" )] platform_server: String, + + #[arg(long, env = "VERSION_KEY", default_value = "1")] + version_key: u64, } // ==================== Main ==================== @@ -172,7 +169,7 @@ async fn main() -> Result<()> { .init(); let args = Args::parse(); - info!("Starting validator (centralized mode - no P2P)"); + info!("Starting validator (centralized mode)"); // Keypair let keypair = load_keypair(&args)?; @@ -194,7 +191,7 @@ async fn main() -> Result<()> { let bans = Arc::new(RwLock::new(BanList::default())); // Platform server - let platform_client = PlatformServerClient::new(&args.platform_server); + let platform_client = Arc::new(PlatformServerClient::new(&args.platform_server)); info!("Platform server: {}", args.platform_server); if platform_client.health().await { @@ -206,7 +203,7 @@ async fn main() -> Result<()> { // List challenges match platform_client.list_challenges().await { Ok(c) if !c.is_empty() => { - info!("Challenges from platform-server:"); + info!("Challenges:"); for ch in &c { info!( " - {} (mechanism={}, healthy={})", @@ -214,15 +211,12 @@ async fn main() -> Result<()> { ); } } - Ok(_) => info!("No challenges on platform-server yet"), + Ok(_) => info!("No challenges yet"), Err(e) => warn!("Failed to list challenges: {}", e), } // Container broker - info!("Container broker starting on port {}...", args.broker_port); - if args.broker_jwt_secret.is_none() { - warn!("Container broker: no JWT secret (dev mode)"); - } + info!("Container broker on port {}...", args.broker_port); let broker = Arc::new(ContainerBroker::with_policy(SecurityPolicy::default()).await?); let ws_config = WsConfig { bind_addr: format!("0.0.0.0:{}", args.broker_port), @@ -236,11 +230,9 @@ async fn main() -> Result<()> { error!("Broker error: {}", e); } }); - info!("Container broker: ws://0.0.0.0:{}", args.broker_port); // Challenge orchestrator let _orchestrator = if args.docker_challenges { - info!("Docker orchestrator starting..."); match ChallengeOrchestrator::new(OrchestratorConfig { network_name: "platform-challenges".to_string(), health_check_interval: Duration::from_secs(30), @@ -249,10 +241,7 @@ async fn main() -> Result<()> { }) .await { - Ok(o) => { - info!("Docker orchestrator: ready"); - Some(Arc::new(o)) - } + Ok(o) => Some(Arc::new(o)), Err(e) => { warn!("Docker orchestrator failed: {}", e); None @@ -278,31 +267,56 @@ async fn main() -> Result<()> { let _rpc = rpc_server.spawn(); info!("RPC: http://{}:{}", args.rpc_addr, args.rpc_port); - // Bittensor block sync + // Bittensor setup + let subtensor: Option>; + let subtensor_signer: Option>; let mut block_rx: Option> = None; + if !args.no_bittensor { info!( "Bittensor: {} (netuid={})", args.subtensor_endpoint, args.netuid ); - let mut sync = BlockSync::new(BlockSyncConfig { - netuid: args.netuid, - ..Default::default() - }); - let rx = sync.take_event_receiver(); + // Create Subtensor with persistence for automatic commit-reveal handling + let state_path = data_dir.join("subtensor_state.json"); + match Subtensor::with_persistence(&args.subtensor_endpoint, state_path.clone()).await { + Ok(st) => { + info!("Subtensor connected with persistence at {:?}", state_path); + + // Create signer + let secret = args.secret_key.as_ref().ok_or_else(|| { + anyhow::anyhow!("VALIDATOR_SECRET_KEY required for Bittensor") + })?; + + match signer_from_seed(secret) { + Ok(signer) => { + info!("Bittensor hotkey: {}", signer.account_id()); + subtensor_signer = Some(Arc::new(signer)); + } + Err(e) => { + error!("Failed to create signer: {}", e); + subtensor_signer = None; + } + } - match BittensorClient::new(&args.subtensor_endpoint).await { - Ok(client) => { - let client = Arc::new(client); + subtensor = Some(Arc::new(st)); // Sync metagraph + let client = BittensorClient::new(&args.subtensor_endpoint).await?; match sync_metagraph(&client, args.netuid).await { Ok(mg) => info!("Metagraph: {} neurons", mg.n), Err(e) => warn!("Metagraph sync failed: {}", e), } - // Start block sync + // Block sync + let mut sync = BlockSync::new(BlockSyncConfig { + netuid: args.netuid, + ..Default::default() + }); + let rx = sync.take_event_receiver(); + + let client = Arc::new(client); if let Err(e) = sync.connect(client).await { warn!("Block sync connect failed: {}", e); } else { @@ -315,22 +329,22 @@ async fn main() -> Result<()> { info!("Block sync: started"); } } - Err(e) => warn!("Bittensor connection failed: {}", e), + Err(e) => { + error!("Subtensor connection failed: {}", e); + subtensor = None; + subtensor_signer = None; + } } } else { - info!("Bittensor: disabled (--no-bittensor)"); + info!("Bittensor: disabled"); + subtensor = None; + subtensor_signer = None; } - let _subtensor: Option<()> = None; // Placeholder - - // Signer for weight submission - let signer = args - .secret_key - .as_ref() - .and_then(|s| signer_from_seed(s).ok()); info!("Validator running. Ctrl+C to stop."); - // Main loop + let netuid = args.netuid; + let version_key = args.version_key; let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { @@ -341,7 +355,14 @@ async fn main() -> Result<()> { None => std::future::pending().await, } } => { - handle_block_event(event, &platform_client, &signer, args.netuid).await; + handle_block_event( + event, + &platform_client, + &subtensor, + &subtensor_signer, + netuid, + version_key, + ).await; } _ = interval.tick() => { @@ -381,9 +402,11 @@ fn load_keypair(args: &Args) -> Result { async fn handle_block_event( event: BlockSyncEvent, - client: &PlatformServerClient, - _signer: &Option, - _netuid: u16, + platform_client: &Arc, + subtensor: &Option>, + signer: &Option>, + netuid: u16, + version_key: u64, ) { match event { BlockSyncEvent::NewBlock { block_number, .. } => { @@ -399,33 +422,110 @@ async fn handle_block_event( } BlockSyncEvent::CommitWindowOpen { epoch, block } => { - info!("Commit window: epoch {} block {}", epoch, block); - - // Fetch weights from platform-server - match client.list_challenges().await { - Ok(challenges) => { - for c in challenges.iter().filter(|c| c.is_healthy) { - match client.get_weights(&c.id, epoch).await { - Ok(w) if !w.is_empty() => { - info!( - "Challenge {} weights: {} entries (mechanism {})", - c.id, - w.len(), - c.mechanism_id - ); - // TODO: Submit weights via WeightSubmitter + info!("=== COMMIT WINDOW: epoch {} block {} ===", epoch, block); + + // Submit weights via Subtensor (handles CRv4/commit-reveal automatically) + if let (Some(st), Some(sig)) = (subtensor.as_ref(), signer.as_ref()) { + // Fetch weights from platform-server + let mechanism_weights = match platform_client.list_challenges().await { + Ok(challenges) if !challenges.is_empty() => { + let mut weights = Vec::new(); + + for challenge in challenges.iter().filter(|c| c.is_healthy) { + match platform_client.get_weights(&challenge.id, epoch).await { + Ok(w) if !w.is_empty() => { + let uids: Vec = w.iter().map(|(u, _)| *u).collect(); + let vals: Vec = w.iter().map(|(_, v)| *v).collect(); + + info!( + "Challenge {} (mech {}): {} weights", + challenge.id, + challenge.mechanism_id, + uids.len() + ); + + weights.push((challenge.mechanism_id as u8, uids, vals)); + } + Ok(_) => debug!("Challenge {} has no weights", challenge.id), + Err(e) => { + warn!("Failed to get weights for {}: {}", challenge.id, e) + } } - Ok(_) => debug!("Challenge {} has no weights", c.id), - Err(e) => warn!("Failed to get weights for {}: {}", c.id, e), + } + + weights + } + Ok(_) => { + info!("No challenges on platform-server"); + vec![] + } + Err(e) => { + warn!("Failed to list challenges: {}", e); + vec![] + } + }; + + // Submit weights (or burn weights if none) + let weights_to_submit = if mechanism_weights.is_empty() { + info!("No weights - submitting burn weights to UID 0"); + vec![(0u8, vec![0u16], vec![65535u16])] + } else { + mechanism_weights + }; + + // Submit each mechanism via Subtensor (handles CRv4 automatically) + for (mechanism_id, uids, weights) in weights_to_submit { + match st + .set_mechanism_weights( + sig, + netuid, + mechanism_id, + &uids, + &weights, + version_key, + ExtrinsicWait::Finalized, + ) + .await + { + Ok(resp) if resp.success => { + info!( + "Mechanism {} weights submitted: {:?}", + mechanism_id, resp.tx_hash + ); + } + Ok(resp) => { + warn!("Mechanism {} issue: {}", mechanism_id, resp.message); + } + Err(e) => { + error!("Mechanism {} failed: {}", mechanism_id, e); } } } - Err(e) => warn!("Platform-server error: {}", e), + } else { + warn!("No Subtensor/signer - cannot submit weights"); } } BlockSyncEvent::RevealWindowOpen { epoch, block } => { - info!("Reveal window: epoch {} block {}", epoch, block); + info!("=== REVEAL WINDOW: epoch {} block {} ===", epoch, block); + + // With CRv4, reveals are automatic via DRAND + // For older versions, Subtensor handles reveals internally + if let (Some(st), Some(sig)) = (subtensor.as_ref(), signer.as_ref()) { + if st.has_pending_commits().await { + info!("Revealing pending commits..."); + match st.reveal_all_pending(sig, ExtrinsicWait::Finalized).await { + Ok(results) => { + for resp in results { + if resp.success { + info!("Revealed: {:?}", resp.tx_hash); + } + } + } + Err(e) => error!("Reveal failed: {}", e), + } + } + } } BlockSyncEvent::PhaseChange {