From 1050232085b51a1d7d973394566588ba375c012c Mon Sep 17 00:00:00 2001 From: Bert Date: Tue, 18 Mar 2025 01:11:16 +0100 Subject: [PATCH 1/4] GH-591: The card's main objectives are done --- node/src/hopper/consuming_service.rs | 3 +- node/src/hopper/live_cores_package.rs | 5 +- node/src/hopper/mod.rs | 7 +- node/src/hopper/routing_service.rs | 6 +- node/src/proxy_server/mod.rs | 148 +++++++++++++++----------- node/src/sub_lib/hopper.rs | 3 +- node/src/sub_lib/stream_key.rs | 30 +++--- node/src/test_utils/mod.rs | 65 +++++------ node/src/test_utils/recorder.rs | 4 +- 9 files changed, 155 insertions(+), 116 deletions(-) diff --git a/node/src/hopper/consuming_service.rs b/node/src/hopper/consuming_service.rs index 7593e36ed..8cd362e98 100644 --- a/node/src/hopper/consuming_service.rs +++ b/node/src/hopper/consuming_service.rs @@ -140,7 +140,8 @@ mod tests { use crate::sub_lib::route::RouteSegment; use crate::test_utils::recorder::make_recorder; use crate::test_utils::recorder::peer_actors_builder; - use crate::test_utils::{main_cryptde, make_meaningless_message_type, make_paying_wallet}; + use crate::test_utils::unshared_test_utils::make_meaningless_message_type; + use crate::test_utils::{main_cryptde, make_paying_wallet}; use actix::System; use masq_lib::test_utils::logging::init_test_logging; use masq_lib::test_utils::logging::TestLogHandler; diff --git a/node/src/hopper/live_cores_package.rs b/node/src/hopper/live_cores_package.rs index 9ec210678..e920a3699 100644 --- a/node/src/hopper/live_cores_package.rs +++ b/node/src/hopper/live_cores_package.rs @@ -99,9 +99,8 @@ mod tests { use crate::sub_lib::node_addr::NodeAddr; use crate::sub_lib::route::RouteSegment; use crate::sub_lib::route::{Route, RouteError}; - use crate::test_utils::{ - main_cryptde, make_meaningless_message_type, make_meaningless_route, make_paying_wallet, - }; + use crate::test_utils::unshared_test_utils::make_meaningless_message_type; + use crate::test_utils::{main_cryptde, make_meaningless_route, make_paying_wallet}; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; diff --git a/node/src/hopper/mod.rs b/node/src/hopper/mod.rs index 49e93bec4..b3a75b9a0 100644 --- a/node/src/hopper/mod.rs +++ b/node/src/hopper/mod.rs @@ -149,10 +149,11 @@ mod tests { use crate::sub_lib::hopper::IncipientCoresPackage; use crate::sub_lib::route::Route; use crate::sub_lib::route::RouteSegment; - use crate::test_utils::unshared_test_utils::prove_that_crash_request_handler_is_hooked_up; + use crate::test_utils::unshared_test_utils::{ + make_meaningless_message_type, prove_that_crash_request_handler_is_hooked_up, + }; use crate::test_utils::{ - alias_cryptde, main_cryptde, make_cryptde_pair, make_meaningless_message_type, - make_paying_wallet, route_to_proxy_client, + alias_cryptde, main_cryptde, make_cryptde_pair, make_paying_wallet, route_to_proxy_client, }; use actix::Actor; use actix::System; diff --git a/node/src/hopper/routing_service.rs b/node/src/hopper/routing_service.rs index 3ebaaab65..4e384484f 100644 --- a/node/src/hopper/routing_service.rs +++ b/node/src/hopper/routing_service.rs @@ -519,9 +519,11 @@ mod tests { use crate::sub_lib::versioned_data::VersionedData; use crate::sub_lib::wallet::Wallet; use crate::test_utils::recorder::{make_recorder, peer_actors_builder}; + use crate::test_utils::unshared_test_utils::{ + make_meaningless_message_type, make_request_payload, make_response_payload, + }; use crate::test_utils::{ - alias_cryptde, main_cryptde, make_cryptde_pair, make_meaningless_message_type, - make_paying_wallet, make_request_payload, make_response_payload, rate_pack_routing, + alias_cryptde, main_cryptde, make_cryptde_pair, make_paying_wallet, rate_pack_routing, rate_pack_routing_byte, route_from_proxy_client, route_to_proxy_client, route_to_proxy_server, }; diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 372b52c37..328bc46ff 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -470,7 +470,7 @@ impl ProxyServer { let duration_since = SystemTime::now() .duration_since(*old_timestamp) .expect("time calculation error"); - warning!( + debug!( self.logger, "Straggling packet of length {} received for a stream key {:?} after a delay of {:?}", packet_len, @@ -521,42 +521,43 @@ impl ProxyServer { } if let Some(old_timestamp) = self.stream_key_ttl.get(&stream_key) { self.log_straggling_packet(&stream_key, payload_data_len, old_timestamp) - } - match self.keys_and_addrs.a_to_b(&stream_key) { - Some(socket_addr) => { - let last_data = response.sequenced_packet.last_data; - let sequence_number = Some( - response.sequenced_packet.sequence_number - + self.browser_proxy_sequence_offset as u64, - ); - self.subs - .as_ref() - .expect("Dispatcher unbound in ProxyServer") - .dispatcher - .try_send(TransmitDataMsg { - endpoint: Endpoint::Socket(socket_addr), - last_data, - sequence_number, - data: response.sequenced_packet.data, - }) - .expect("Dispatcher is dead"); - if last_data { - self.purge_stream_key(&stream_key, "last data received from the exit node"); + } else { + match self.keys_and_addrs.a_to_b(&stream_key) { + Some(socket_addr) => { + let last_data = response.sequenced_packet.last_data; + let sequence_number = Some( + response.sequenced_packet.sequence_number + + self.browser_proxy_sequence_offset as u64, + ); + self.subs + .as_ref() + .expect("Dispatcher unbound in ProxyServer") + .dispatcher + .try_send(TransmitDataMsg { + endpoint: Endpoint::Socket(socket_addr), + last_data, + sequence_number, + data: response.sequenced_packet.data, + }) + .expect("Dispatcher is dead"); + if last_data { + self.purge_stream_key(&stream_key, "last data received from the exit node"); + } + } + None => { + // TODO GH-608: It would be really nice to be able to send an InboundClientData with last_data: true + // back to the ProxyClient (and the distant server) so that the server could shut down + // its stream, since the browser has shut down _its_ stream and no more data will + // ever be accepted from the server on that stream; but we don't have enough information + // to do so, since our stream key has been purged and all the information it keyed + // is gone. Sorry, server! + warning!(self.logger, + "Discarding {}-byte packet {} from an unrecognized stream key: {:?}; can't send response back to client", + response.sequenced_packet.data.len(), + response.sequenced_packet.sequence_number, + response.stream_key, + ) } - } - None => { - // TODO GH-608: It would be really nice to be able to send an InboundClientData with last_data: true - // back to the ProxyClient (and the distant server) so that the server could shut down - // its stream, since the browser has shut down _its_ stream and no more data will - // ever be accepted from the server on that stream; but we don't have enough information - // to do so, since our stream key has been purged and all the information it keyed - // is gone. Sorry, server! - warning!(self.logger, - "Discarding {}-byte packet {} from an unrecognized stream key: {:?}; can't send response back to client", - response.sequenced_packet.data.len(), - response.sequenced_packet.sequence_number, - response.stream_key, - ) } } } @@ -1370,14 +1371,13 @@ mod tests { use crate::sub_lib::ttl_hashmap::TtlHashMap; use crate::sub_lib::versioned_data::VersionedData; use crate::test_utils::make_paying_wallet; - use crate::test_utils::make_request_payload; use crate::test_utils::make_wallet; use crate::test_utils::recorder::make_recorder; use crate::test_utils::recorder::peer_actors_builder; use crate::test_utils::recorder::Recorder; use crate::test_utils::recorder_stop_conditions::{StopCondition, StopConditions}; use crate::test_utils::unshared_test_utils::{ - prove_that_crash_request_handler_is_hooked_up, AssertionsMessage, + make_request_payload, prove_that_crash_request_handler_is_hooked_up, AssertionsMessage, }; use crate::test_utils::zero_hop_route_response; use crate::test_utils::{alias_cryptde, rate_pack}; @@ -4011,9 +4011,9 @@ mod tests { } #[test] - fn straggling_packets_are_logged() { + fn straggling_packets_are_charged_and_dropped_as_the_browser_stopped_awaiting_them_anyway() { init_test_logging(); - let test_name = "straggling_packets_are_logged"; + let test_name = "straggling_packets_are_charged_and_dropped_as_the_browser_stopped_awaiting_them_anyway"; let cryptde = main_cryptde(); let mut subject = ProxyServer::new( cryptde, @@ -4040,34 +4040,42 @@ mod tests { subject .tunneled_hosts .insert(stream_key.clone(), "hostname".to_string()); + let exit_key = PublicKey::new(&b"blah"[..]); + let exit_wallet = make_wallet("abc"); + let exit_rates = RatePack { + routing_byte_rate: 0, + routing_service_rate: 0, + exit_byte_rate: 100, + exit_service_rate: 60000, + }; subject.route_ids_to_return_routes.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, - expected_services: vec![], + expected_services: vec![ExpectedService::Exit( + exit_key, + exit_wallet.clone(), + exit_rates.clone(), + )], protocol: ProxyProtocol::HTTP, hostname_opt: None, }, ); + subject + .stream_key_ttl + .insert(stream_key.clone(), SystemTime::now()); + let (accountant, _, accountant_recording_arc) = make_recorder(); + let (dispatcher, _, dispatcher_recording_arc) = make_recorder(); let proxy_server_addr = subject.start(); - let schedule_stream_key_purge_sub = proxy_server_addr.clone().recipient(); - let mut peer_actors = peer_actors_builder().build(); - peer_actors.proxy_server.schedule_stream_key_purge = schedule_stream_key_purge_sub; - + let peer_actors = peer_actors_builder() + .accountant(accountant) + .dispatcher(dispatcher) + .build(); let system = System::new(test_name); - let bind_msg = BindMessage { peer_actors }; - proxy_server_addr.try_send(bind_msg).unwrap(); - let stream_shutdown_msg = StreamShutdownMsg { - peer_addr: socket_addr, - stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes { - reception_port: 0, - sequence_number: 0, - }), - report_to_counterpart: true, - }; + let response_data = vec![0; 30]; let client_response_payload = ClientResponsePayload_0v1 { stream_key: stream_key.clone(), - sequenced_packet: SequencedPacket::new(vec![], 1, true), + sequenced_packet: SequencedPacket::new(response_data.clone(), 1, true), }; let expired_cores_package: ExpiredCoresPackage = ExpiredCoresPackage::new( @@ -4075,16 +4083,32 @@ mod tests { Some(make_wallet("irrelevant")), return_route_with_id(cryptde, 1234), client_response_payload.into(), - 0, + 5432, ); - proxy_server_addr.try_send(stream_shutdown_msg).unwrap(); + let bind_msg = BindMessage { peer_actors }; + proxy_server_addr.try_send(bind_msg).unwrap(); proxy_server_addr.try_send(expired_cores_package).unwrap(); System::current().stop(); system.run(); + let accountant_recording = accountant_recording_arc.lock().unwrap(); + let msg = accountant_recording.get_record::(0); + assert_eq!( + &msg.exit, + &ExitServiceConsumed { + earning_wallet: exit_wallet, + payload_size: response_data.len(), + service_rate: exit_rates.exit_service_rate, + byte_rate: exit_rates.exit_byte_rate, + } + ); + assert_eq!(msg.routing_payload_size, 5432); + let dispatcher_recording = dispatcher_recording_arc.lock().unwrap(); + let len = dispatcher_recording.len(); + assert_eq!(len, 0); TestLogHandler::new().exists_log_containing(&format!( - "WARN: {test_name}: Straggling packet of length 0 received for a \ + "DEBUG: {test_name}: Straggling packet of length 5432 received for a \ stream key {:?} after a delay of", stream_key )); @@ -5318,7 +5342,11 @@ mod tests { System::current().stop(); system.run(); - TestLogHandler::new().exists_log_containing(&format!("ERROR: {test_name}: While handling ExpiredCoresPackage: No entry found inside dns_failure_retries hashmap for the stream_key: AAAAAAAAAAAAAAAAAAAAAAAAAAA")); + TestLogHandler::new().exists_log_containing(&format!( + "ERROR: {test_name}: While \ + handling ExpiredCoresPackage: No entry found inside dns_failure_retries hashmap for \ + the stream_key: AAAAAAAAAAAAAAAAAAAAAAAAAAA" + )); } #[test] diff --git a/node/src/sub_lib/hopper.rs b/node/src/sub_lib/hopper.rs index 5c1c000a7..f163204c1 100644 --- a/node/src/sub_lib/hopper.rs +++ b/node/src/sub_lib/hopper.rs @@ -174,7 +174,8 @@ mod tests { use crate::sub_lib::dispatcher::Component; use crate::sub_lib::route::RouteSegment; use crate::test_utils::recorder::Recorder; - use crate::test_utils::{main_cryptde, make_meaningless_message_type, make_paying_wallet}; + use crate::test_utils::unshared_test_utils::make_meaningless_message_type; + use crate::test_utils::{main_cryptde, make_paying_wallet}; use actix::Actor; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; use std::net::IpAddr; diff --git a/node/src/sub_lib/stream_key.rs b/node/src/sub_lib/stream_key.rs index f50f67b5d..ea78650d1 100644 --- a/node/src/sub_lib/stream_key.rs +++ b/node/src/sub_lib/stream_key.rs @@ -88,20 +88,6 @@ impl StreamKey { hash: hash.digest().bytes(), } } - - pub fn make_meaningless_stream_key() -> StreamKey { - StreamKey { - hash: [0; sha1::DIGEST_LENGTH], - } - } - - pub fn make_meaningful_stream_key(phrase: &str) -> StreamKey { - let mut hash = sha1::Sha1::new(); - hash.update(phrase.as_bytes()); - StreamKey { - hash: hash.digest().bytes(), - } - } } type HashType = [u8; sha1::DIGEST_LENGTH]; @@ -111,6 +97,22 @@ mod tests { use super::*; use std::collections::HashSet; + impl StreamKey { + pub fn make_meaningless_stream_key() -> StreamKey { + StreamKey { + hash: [0; sha1::DIGEST_LENGTH], + } + } + + pub fn make_meaningful_stream_key(phrase: &str) -> StreamKey { + let mut hash = sha1::Sha1::new(); + hash.update(phrase.as_bytes()); + StreamKey { + hash: hash.digest().bytes(), + } + } + } + #[test] fn stream_keys_are_unique() { let mut stream_keys_set = HashSet::new(); diff --git a/node/src/test_utils/mod.rs b/node/src/test_utils/mod.rs index ab99d59d9..4153d1eb2 100644 --- a/node/src/test_utils/mod.rs +++ b/node/src/test_utils/mod.rs @@ -26,22 +26,17 @@ use crate::sub_lib::cryptde::PlainData; use crate::sub_lib::cryptde::PublicKey; use crate::sub_lib::cryptde_null::CryptDENull; use crate::sub_lib::dispatcher::Component; -use crate::sub_lib::hopper::MessageType; use crate::sub_lib::neighborhood::ExpectedServices; use crate::sub_lib::neighborhood::RouteQueryResponse; use crate::sub_lib::neighborhood::{ExpectedService, RatePack}; -use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; -use crate::sub_lib::proxy_server::{ClientRequestPayload_0v1, ProxyProtocol}; use crate::sub_lib::route::Route; use crate::sub_lib::route::RouteSegment; use crate::sub_lib::sequence_buffer::SequencedPacket; -use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::wallet::Wallet; use crossbeam_channel::{unbounded, Receiver, Sender}; use ethsign_crypto::Keccak256; use futures::sync::mpsc::SendError; use lazy_static::lazy_static; -use masq_lib::constants::HTTP_PORT; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; use rand::RngCore; use regex::Regex; @@ -176,10 +171,6 @@ impl Waiter { } } -pub fn make_meaningless_message_type() -> MessageType { - DnsResolveFailure_0v1::new(StreamKey::make_meaningless_stream_key()).into() -} - pub fn make_one_way_route_to_proxy_client(public_keys: Vec<&PublicKey>) -> Route { Route::one_way( RouteSegment::new(public_keys, Component::ProxyClient), @@ -276,28 +267,6 @@ pub fn make_garbage_data(bytes: usize) -> Vec { data } -pub fn make_request_payload(bytes: usize, cryptde: &dyn CryptDE) -> ClientRequestPayload_0v1 { - ClientRequestPayload_0v1 { - stream_key: StreamKey::make_meaningful_stream_key("request"), - sequenced_packet: SequencedPacket::new(make_garbage_data(bytes), 0, true), - target_hostname: Some("example.com".to_string()), - target_port: HTTP_PORT, - protocol: ProxyProtocol::HTTP, - originator_public_key: cryptde.public_key().clone(), - } -} - -pub fn make_response_payload(bytes: usize) -> ClientResponsePayload_0v1 { - ClientResponsePayload_0v1 { - stream_key: StreamKey::make_meaningful_stream_key("response"), - sequenced_packet: SequencedPacket { - data: make_garbage_data(bytes), - sequence_number: 0, - last_data: false, - }, - } -} - pub fn make_send_error() -> SendError { let (tx, rx) = futures::sync::mpsc::unbounded(); drop(rx); @@ -549,11 +518,18 @@ pub mod unshared_test_utils { use crate::db_config::persistent_configuration::PersistentConfigurationReal; use crate::node_test_utils::DirsWrapperMock; use crate::sub_lib::accountant::{PaymentThresholds, ScanIntervals}; + use crate::sub_lib::cryptde::CryptDE; + use crate::sub_lib::hopper::MessageType; use crate::sub_lib::neighborhood::{ConnectionProgressMessage, DEFAULT_RATE_PACK}; + use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; + use crate::sub_lib::proxy_server::{ClientRequestPayload_0v1, ProxyProtocol}; + use crate::sub_lib::sequence_buffer::SequencedPacket; + use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::utils::{ NLSpawnHandleHolder, NLSpawnHandleHolderReal, NotifyHandle, NotifyLaterHandle, }; use crate::test_utils::database_utils::bring_db_0_back_to_life_and_return_connection; + use crate::test_utils::make_garbage_data; use crate::test_utils::neighborhood_test_utils::MIN_HOPS_FOR_TEST; use crate::test_utils::persistent_configuration_mock::PersistentConfigurationMock; use crate::test_utils::recorder::{make_recorder, Recorder, Recording}; @@ -564,6 +540,7 @@ pub mod unshared_test_utils { use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Either; use lazy_static::lazy_static; + use masq_lib::constants::HTTP_PORT; use masq_lib::messages::{ToMessageBody, UiCrashRequest}; use masq_lib::multi_config::MultiConfig; #[cfg(not(feature = "no_test_share"))] @@ -726,6 +703,32 @@ pub mod unshared_test_utils { (recipient, recording_arc) } + pub fn make_meaningless_message_type() -> MessageType { + DnsResolveFailure_0v1::new(StreamKey::make_meaningless_stream_key()).into() + } + + pub fn make_request_payload(bytes: usize, cryptde: &dyn CryptDE) -> ClientRequestPayload_0v1 { + ClientRequestPayload_0v1 { + stream_key: StreamKey::make_meaningful_stream_key("request"), + sequenced_packet: SequencedPacket::new(make_garbage_data(bytes), 0, true), + target_hostname: Some("example.com".to_string()), + target_port: HTTP_PORT, + protocol: ProxyProtocol::HTTP, + originator_public_key: cryptde.public_key().clone(), + } + } + + pub fn make_response_payload(bytes: usize) -> ClientResponsePayload_0v1 { + ClientResponsePayload_0v1 { + stream_key: StreamKey::make_meaningful_stream_key("response"), + sequenced_packet: SequencedPacket { + data: make_garbage_data(bytes), + sequence_number: 0, + last_data: false, + }, + } + } + pub fn make_cpm_recipient() -> (Recipient, Arc>) { make_recipient_and_recording_arc(None) } diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index da1aed944..f66125182 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -200,7 +200,7 @@ impl Handler for Recorder { msg: RouteQueryMessage, _ctx: &mut Self::Context, ) -> >::Result { - self.record(msg); + self.handle_msg_t_m_p(msg); MessageResult(extract_response( &mut self.route_query_responses, "No RouteQueryResponses prepared for RouteQueryMessage", @@ -208,6 +208,8 @@ impl Handler for Recorder { } } +matchable!(RouteQueryMessage); + fn extract_response(responses: &mut Vec, err_msg: &str) -> T where T: Clone, From 5ab336c2c7bc5c4a076b92ade55407fba1eeddbf Mon Sep 17 00:00:00 2001 From: Bert Date: Tue, 18 Mar 2025 11:12:17 +0100 Subject: [PATCH 2/4] GH-591: a little refactoring --- node/src/proxy_server/mod.rs | 182 ++++++++++++++++++++--------------- 1 file changed, 104 insertions(+), 78 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 328bc46ff..9358f63bf 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -311,13 +311,14 @@ impl ProxyServer { retry: DNSFailureRetry, client_addr: SocketAddr, ) -> DNSFailureRetry { - let args = TryTransmitToHopperArgs::new( + let args = TransmitToHopperArgs::new( self, retry.unsuccessful_request.clone(), client_addr, SystemTime::now(), false, ); + let add_return_route_sub = self.out_subs("ProxyServer").add_return_route.clone(); let route_source = self.out_subs("Neighborhood").route_source.clone(); let proxy_server_sub = self.out_subs("ProxyServer").route_result_sub.clone(); let inbound_client_data_helper = self @@ -325,7 +326,12 @@ impl ProxyServer { .as_ref() .expect("IBCDHelper uninitialized"); - inbound_client_data_helper.request_route_and_transmit(args, route_source, proxy_server_sub); + inbound_client_data_helper.request_route_and_transmit( + args, + add_return_route_sub, + route_source, + proxy_server_sub, + ); retry } @@ -708,7 +714,8 @@ impl ProxyServer { } fn try_transmit_to_hopper( - args: TryTransmitToHopperArgs, + args: TransmitToHopperArgs, + add_return_route_sub: Recipient, route_query_response: RouteQueryResponse, ) -> Result<(), String> { match route_query_response.expected_services { @@ -723,23 +730,10 @@ impl ProxyServer { args.logger, "Adding expectant return route info: {:?}", return_route_info ); - args.add_return_route_sub + add_return_route_sub .try_send(return_route_info) .expect("ProxyServer is dead"); - ProxyServer::transmit_to_hopper( - args.main_cryptde, - &args.hopper_sub, - args.timestamp, - args.payload, - route_query_response.route, - over, - &args.logger, - args.client_addr, - &args.dispatcher_sub, - &args.accountant_sub, - args.retire_stream_key_sub_opt.as_ref(), - args.is_decentralized, - ) + ProxyServer::transmit_to_hopper(args, route_query_response.route, over) } _ => panic!("Expected RoundTrip ExpectedServices but got OneWay"), } @@ -807,37 +801,38 @@ impl ProxyServer { } } - #[allow(clippy::too_many_arguments)] fn transmit_to_hopper( - main_cryptde: &'static dyn CryptDE, - hopper: &Recipient, - timestamp: SystemTime, - payload: ClientRequestPayload_0v1, + args: TransmitToHopperArgs, + // main_cryptde: &'static dyn CryptDE, + // hopper: &Recipient, + // timestamp: SystemTime, + // payload: ClientRequestPayload_0v1, route: Route, expected_services: Vec, - logger: &Logger, - source_addr: SocketAddr, - dispatcher: &Recipient, - accountant_sub: &Recipient, - retire_stream_key_via: Option<&Recipient>, - is_decentralized: bool, + // logger: &Logger, + // source_addr: SocketAddr, + // dispatcher: &Recipient, + // accountant_sub: &Recipient, + // retire_stream_key_via: Option<&Recipient>, + // is_decentralized: bool, ) -> Result<(), String> { - let destination_key_opt = if is_decentralized { + let logger = args.logger; + let destination_key_opt = if args.is_decentralized { expected_services.iter().find_map(|service| match service { ExpectedService::Exit(public_key, _, _) => Some(public_key.clone()), _ => None, }) } else { // In Zero Hop Mode the exit node public key is the same as this public key - Some(main_cryptde.public_key().clone()) + Some(args.main_cryptde.public_key().clone()) }; match destination_key_opt { None => { // Route not found Err(ProxyServer::handle_route_failure( - payload, - source_addr, - dispatcher, + args.payload, + args.client_addr, + &args.dispatcher_sub, )) } Some(payload_destination_key) => { @@ -846,38 +841,39 @@ impl ProxyServer { logger, "transmit to hopper with destination key {:?}", payload_destination_key ); + let payload = args.payload; let payload_size = payload.sequenced_packet.data.len(); let stream_key = payload.stream_key; let pkg = IncipientCoresPackage::new( - main_cryptde, + args.main_cryptde, route, payload.into(), &payload_destination_key, ) .expect("Key magically disappeared"); - if is_decentralized { + if args.is_decentralized { let exit = ProxyServer::report_on_exit_service(&expected_services, payload_size); let routing = - ProxyServer::report_on_routing_services(expected_services, logger); - accountant_sub + ProxyServer::report_on_routing_services(expected_services, &logger); + args.accountant_sub .try_send(ReportServicesConsumedMessage { - timestamp, + timestamp: args.timestamp, exit, routing_payload_size: pkg.payload.len(), routing, }) .expect("Accountant is dead"); } - hopper.try_send(pkg).expect("Hopper is dead"); - if let Some(shutdown_sub) = retire_stream_key_via { + args.hopper_sub.try_send(pkg).expect("Hopper is dead"); + if let Some(shutdown_sub) = args.retire_stream_key_sub_opt { debug!( logger, "Last data is on the way; directing shutdown of stream {}", stream_key ); shutdown_sub .try_send(StreamShutdownMsg { - peer_addr: source_addr, + peer_addr: args.client_addr, stream_type: RemovedStreamType::NonClandestine( NonClandestineAttributes { // No report to counterpart; these are irrelevant @@ -1035,7 +1031,8 @@ pub trait IBCDHelper { fn request_route_and_transmit( &self, - tth_args: TryTransmitToHopperArgs, + args: TransmitToHopperArgs, + add_return_route_sub: Recipient, route_source: Recipient, proxy_server_sub: Recipient, ); @@ -1044,7 +1041,8 @@ pub trait IBCDHelper { trait RouteQueryResponseResolver: Send { fn resolve_message( &self, - args: TryTransmitToHopperArgs, + args: TransmitToHopperArgs, + add_return_route_sub: Recipient, proxy_server_sub: Recipient, route_result_opt: Result, MailboxError>, ); @@ -1054,14 +1052,19 @@ struct RouteQueryResponseResolverReal {} impl RouteQueryResponseResolver for RouteQueryResponseResolverReal { fn resolve_message( &self, - args: TryTransmitToHopperArgs, + args: TransmitToHopperArgs, + add_return_route_sub: Recipient, proxy_server_sub: Recipient, route_result_opt: Result, MailboxError>, ) { let stream_key = args.payload.stream_key; let result = match route_result_opt { Ok(Some(route_query_response)) => { - match ProxyServer::try_transmit_to_hopper(args, route_query_response.clone()) { + match ProxyServer::try_transmit_to_hopper( + args, + add_return_route_sub, + route_query_response.clone(), + ) { Ok(()) => Ok(route_query_response), Err(e) => Err(e), } @@ -1157,9 +1160,10 @@ impl IBCDHelper for IBCDHelperReal { .dns_failure_retries .insert(stream_key, dns_failure_retry); } - let tth_args = - TryTransmitToHopperArgs::new(proxy, payload, source_addr, timestamp, retire_stream_key); - let pld = &tth_args.payload; + let args = + TransmitToHopperArgs::new(proxy, payload, source_addr, timestamp, retire_stream_key); + let add_return_route_sub = proxy.out_subs("ProxysServer").add_return_route.clone(); + let pld = &args.payload; if let Some(route_query_response) = proxy.stream_key_routes.get(&pld.stream_key) { debug!( proxy.logger, @@ -1169,24 +1173,30 @@ impl IBCDHelper for IBCDHelperReal { pld.sequenced_packet.data.len() ); let route_query_response = route_query_response.clone(); - ProxyServer::try_transmit_to_hopper(tth_args, route_query_response) + ProxyServer::try_transmit_to_hopper(args, add_return_route_sub, route_query_response) } else { let route_source = proxy.out_subs("Neighborhood").route_source.clone(); let proxy_server_sub = proxy.out_subs("ProxyServer").route_result_sub.clone(); - self.request_route_and_transmit(tth_args, route_source, proxy_server_sub); + self.request_route_and_transmit( + args, + add_return_route_sub, + route_source, + proxy_server_sub, + ); Ok(()) } } fn request_route_and_transmit( &self, - tth_args: TryTransmitToHopperArgs, + args: TransmitToHopperArgs, + add_return_route_sub: Recipient, neighborhood_sub: Recipient, proxy_server_sub: Recipient, ) { - let pld = &tth_args.payload; + let pld = &args.payload; let hostname_opt = pld.target_hostname.clone(); - let logger = tth_args.logger.clone(); + let logger = args.logger.clone(); debug!( logger, "Getting route and opening new stream with key {} to transmit: sequence {}, length {}", @@ -1204,14 +1214,19 @@ impl IBCDHelper for IBCDHelperReal { payload_size, )) .then(move |route_result| { - message_resolver.resolve_message(tth_args, proxy_server_sub, route_result); + message_resolver.resolve_message( + args, + add_return_route_sub, + proxy_server_sub, + route_result, + ); Ok(()) }), ); } } -pub struct TryTransmitToHopperArgs { +pub struct TransmitToHopperArgs { pub main_cryptde: &'static dyn CryptDE, pub payload: ClientRequestPayload_0v1, pub client_addr: SocketAddr, @@ -1222,10 +1237,9 @@ pub struct TryTransmitToHopperArgs { pub hopper_sub: Recipient, pub dispatcher_sub: Recipient, pub accountant_sub: Recipient, - pub add_return_route_sub: Recipient, } -impl TryTransmitToHopperArgs { +impl TransmitToHopperArgs { pub fn new( proxy_server: &ProxyServer, payload: ClientRequestPayload_0v1, @@ -1253,10 +1267,6 @@ impl TryTransmitToHopperArgs { hopper_sub: proxy_server.out_subs("Hopper").hopper.clone(), dispatcher_sub: proxy_server.out_subs("Dispatcher").dispatcher.clone(), accountant_sub: proxy_server.out_subs("Accountant").accountant.clone(), - add_return_route_sub: proxy_server - .out_subs("ProxyServer") - .add_return_route - .clone(), is_decentralized: proxy_server.is_decentralized, } } @@ -1436,7 +1446,7 @@ mod tests { resolve_message_params: Arc< Mutex< Vec<( - TryTransmitToHopperArgs, + TransmitToHopperArgs, Result, MailboxError>, )>, >, @@ -1446,7 +1456,8 @@ mod tests { impl RouteQueryResponseResolver for RouteQueryResponseResolverMock { fn resolve_message( &self, - args: TryTransmitToHopperArgs, + args: TransmitToHopperArgs, + _add_return_route_sub: Recipient, _proxy_server_sub: Recipient, route_result: Result, MailboxError>, ) { @@ -1463,7 +1474,7 @@ mod tests { param: &Arc< Mutex< Vec<( - TryTransmitToHopperArgs, + TransmitToHopperArgs, Result, MailboxError>, )>, >, @@ -1579,7 +1590,8 @@ mod tests { fn request_route_and_transmit( &self, - _tth_args: TryTransmitToHopperArgs, + _args: TransmitToHopperArgs, + _add_return_route_sub: Recipient, _route_source: Recipient, _proxy_server_sub: Recipient, ) { @@ -2857,7 +2869,7 @@ mod tests { originator_public_key: PublicKey::new(b"originator_public_key"), }; let logger = Logger::new("test"); - let tth_args = TryTransmitToHopperArgs { + let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, client_addr: source_addr, @@ -2867,11 +2879,14 @@ mod tests { hopper_sub: peer_actors.hopper.from_hopper_client, dispatcher_sub: peer_actors.dispatcher.from_dispatcher_client, accountant_sub: peer_actors.accountant.report_services_consumed, - add_return_route_sub: peer_actors.proxy_server.add_return_route, retire_stream_key_sub_opt: None, }; - let result = ProxyServer::try_transmit_to_hopper(tth_args, route_query_response); + let result = ProxyServer::try_transmit_to_hopper( + args, + peer_actors.proxy_server.add_return_route, + route_query_response, + ); System::current().stop(); system.run(); @@ -2942,7 +2957,7 @@ mod tests { originator_public_key: PublicKey::new(b"originator_public_key"), }; let logger = Logger::new("test"); - let tth_args = TryTransmitToHopperArgs { + let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, client_addr: source_addr, @@ -2952,11 +2967,14 @@ mod tests { hopper_sub: peer_actors.hopper.from_hopper_client, dispatcher_sub: peer_actors.dispatcher.from_dispatcher_client, accountant_sub: peer_actors.accountant.report_services_consumed, - add_return_route_sub: peer_actors.proxy_server.add_return_route, retire_stream_key_sub_opt: Some(peer_actors.proxy_server.stream_shutdown_sub), }; - let result = ProxyServer::try_transmit_to_hopper(tth_args, route_query_response); + let result = ProxyServer::try_transmit_to_hopper( + args, + peer_actors.proxy_server.add_return_route, + route_query_response, + ); System::current().stop(); system.run(); @@ -3228,7 +3246,7 @@ mod tests { }; let logger = Logger::new("ProxyServer"); let source_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); - let tth_args = TryTransmitToHopperArgs { + let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, client_addr: source_addr, @@ -3238,11 +3256,14 @@ mod tests { hopper_sub: peer_actors.hopper.from_hopper_client, dispatcher_sub: peer_actors.dispatcher.from_dispatcher_client, accountant_sub: peer_actors.accountant.report_services_consumed, - add_return_route_sub: peer_actors.proxy_server.add_return_route, retire_stream_key_sub_opt: None, }; - let _result = ProxyServer::try_transmit_to_hopper(tth_args, route_result); + let _result = ProxyServer::try_transmit_to_hopper( + args, + peer_actors.proxy_server.add_return_route, + route_result, + ); } #[test] @@ -6114,7 +6135,7 @@ mod tests { let (proxy_server, _, proxy_server_recording_arc) = make_recorder(); let addr = proxy_server.start(); let proxy_server_sub = recipient!(&addr, AddRouteResultMessage); - let tth_args = TryTransmitToHopperArgs { + let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, client_addr: SocketAddr::from_str("1.2.3.4:1234").unwrap(), @@ -6124,13 +6145,18 @@ mod tests { hopper_sub: recipient!(&addr, IncipientCoresPackage), dispatcher_sub: recipient!(&addr, TransmitDataMsg), accountant_sub: recipient!(&addr, ReportServicesConsumedMessage), - add_return_route_sub: recipient!(&addr, AddReturnRouteMessage), retire_stream_key_sub_opt: None, }; + let add_return_route_sub = recipient!(&addr, AddReturnRouteMessage); let subject = RouteQueryResponseResolverReal {}; let system = System::new("resolve_message_handles_mailbox_error_from_neighborhood"); - subject.resolve_message(tth_args, proxy_server_sub, Err(MailboxError::Timeout)); + subject.resolve_message( + args, + add_return_route_sub, + proxy_server_sub, + Err(MailboxError::Timeout), + ); System::current().stop(); system.run(); From d94b53e60fe2b7e5010da2277c81c1291c6afa7f Mon Sep 17 00:00:00 2001 From: Bert Date: Tue, 18 Mar 2025 22:48:02 +0100 Subject: [PATCH 3/4] GH-591: undoing some changes as they were interferring with multinode tests --- node/src/hopper/consuming_service.rs | 3 +-- node/src/hopper/live_cores_package.rs | 5 +++-- node/src/hopper/mod.rs | 7 +++--- node/src/hopper/routing_service.rs | 10 ++++----- node/src/sub_lib/hopper.rs | 3 +-- node/src/sub_lib/stream_key.rs | 32 +++++++++++++-------------- node/src/test_utils/mod.rs | 14 +++++++----- 7 files changed, 36 insertions(+), 38 deletions(-) diff --git a/node/src/hopper/consuming_service.rs b/node/src/hopper/consuming_service.rs index 8cd362e98..7593e36ed 100644 --- a/node/src/hopper/consuming_service.rs +++ b/node/src/hopper/consuming_service.rs @@ -140,8 +140,7 @@ mod tests { use crate::sub_lib::route::RouteSegment; use crate::test_utils::recorder::make_recorder; use crate::test_utils::recorder::peer_actors_builder; - use crate::test_utils::unshared_test_utils::make_meaningless_message_type; - use crate::test_utils::{main_cryptde, make_paying_wallet}; + use crate::test_utils::{main_cryptde, make_meaningless_message_type, make_paying_wallet}; use actix::System; use masq_lib::test_utils::logging::init_test_logging; use masq_lib::test_utils::logging::TestLogHandler; diff --git a/node/src/hopper/live_cores_package.rs b/node/src/hopper/live_cores_package.rs index e920a3699..9ec210678 100644 --- a/node/src/hopper/live_cores_package.rs +++ b/node/src/hopper/live_cores_package.rs @@ -99,8 +99,9 @@ mod tests { use crate::sub_lib::node_addr::NodeAddr; use crate::sub_lib::route::RouteSegment; use crate::sub_lib::route::{Route, RouteError}; - use crate::test_utils::unshared_test_utils::make_meaningless_message_type; - use crate::test_utils::{main_cryptde, make_meaningless_route, make_paying_wallet}; + use crate::test_utils::{ + main_cryptde, make_meaningless_message_type, make_meaningless_route, make_paying_wallet, + }; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; diff --git a/node/src/hopper/mod.rs b/node/src/hopper/mod.rs index b3a75b9a0..49e93bec4 100644 --- a/node/src/hopper/mod.rs +++ b/node/src/hopper/mod.rs @@ -149,11 +149,10 @@ mod tests { use crate::sub_lib::hopper::IncipientCoresPackage; use crate::sub_lib::route::Route; use crate::sub_lib::route::RouteSegment; - use crate::test_utils::unshared_test_utils::{ - make_meaningless_message_type, prove_that_crash_request_handler_is_hooked_up, - }; + use crate::test_utils::unshared_test_utils::prove_that_crash_request_handler_is_hooked_up; use crate::test_utils::{ - alias_cryptde, main_cryptde, make_cryptde_pair, make_paying_wallet, route_to_proxy_client, + alias_cryptde, main_cryptde, make_cryptde_pair, make_meaningless_message_type, + make_paying_wallet, route_to_proxy_client, }; use actix::Actor; use actix::System; diff --git a/node/src/hopper/routing_service.rs b/node/src/hopper/routing_service.rs index 4e384484f..c9613dcd9 100644 --- a/node/src/hopper/routing_service.rs +++ b/node/src/hopper/routing_service.rs @@ -519,13 +519,11 @@ mod tests { use crate::sub_lib::versioned_data::VersionedData; use crate::sub_lib::wallet::Wallet; use crate::test_utils::recorder::{make_recorder, peer_actors_builder}; - use crate::test_utils::unshared_test_utils::{ - make_meaningless_message_type, make_request_payload, make_response_payload, - }; + use crate::test_utils::unshared_test_utils::{make_request_payload, make_response_payload}; use crate::test_utils::{ - alias_cryptde, main_cryptde, make_cryptde_pair, make_paying_wallet, rate_pack_routing, - rate_pack_routing_byte, route_from_proxy_client, route_to_proxy_client, - route_to_proxy_server, + alias_cryptde, main_cryptde, make_cryptde_pair, make_meaningless_message_type, + make_paying_wallet, rate_pack_routing, rate_pack_routing_byte, route_from_proxy_client, + route_to_proxy_client, route_to_proxy_server, }; use actix::System; use masq_lib::test_utils::environment_guard::EnvironmentGuard; diff --git a/node/src/sub_lib/hopper.rs b/node/src/sub_lib/hopper.rs index f163204c1..5c1c000a7 100644 --- a/node/src/sub_lib/hopper.rs +++ b/node/src/sub_lib/hopper.rs @@ -174,8 +174,7 @@ mod tests { use crate::sub_lib::dispatcher::Component; use crate::sub_lib::route::RouteSegment; use crate::test_utils::recorder::Recorder; - use crate::test_utils::unshared_test_utils::make_meaningless_message_type; - use crate::test_utils::{main_cryptde, make_paying_wallet}; + use crate::test_utils::{main_cryptde, make_meaningless_message_type, make_paying_wallet}; use actix::Actor; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; use std::net::IpAddr; diff --git a/node/src/sub_lib/stream_key.rs b/node/src/sub_lib/stream_key.rs index ea78650d1..fc02e7a86 100644 --- a/node/src/sub_lib/stream_key.rs +++ b/node/src/sub_lib/stream_key.rs @@ -92,26 +92,26 @@ impl StreamKey { type HashType = [u8; sha1::DIGEST_LENGTH]; -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashSet; - - impl StreamKey { - pub fn make_meaningless_stream_key() -> StreamKey { - StreamKey { - hash: [0; sha1::DIGEST_LENGTH], - } +impl StreamKey { + pub fn make_meaningless_stream_key() -> StreamKey { + StreamKey { + hash: [0; sha1::DIGEST_LENGTH], } + } - pub fn make_meaningful_stream_key(phrase: &str) -> StreamKey { - let mut hash = sha1::Sha1::new(); - hash.update(phrase.as_bytes()); - StreamKey { - hash: hash.digest().bytes(), - } + pub fn make_meaningful_stream_key(phrase: &str) -> StreamKey { + let mut hash = sha1::Sha1::new(); + hash.update(phrase.as_bytes()); + StreamKey { + hash: hash.digest().bytes(), } } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; #[test] fn stream_keys_are_unique() { diff --git a/node/src/test_utils/mod.rs b/node/src/test_utils/mod.rs index 4153d1eb2..1bf32b4b5 100644 --- a/node/src/test_utils/mod.rs +++ b/node/src/test_utils/mod.rs @@ -53,6 +53,9 @@ use std::io::Read; use std::iter::repeat; use std::net::{Shutdown, TcpStream}; +use crate::sub_lib::hopper::MessageType; +use crate::sub_lib::proxy_client::DnsResolveFailure_0v1; +use crate::sub_lib::stream_key::StreamKey; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::thread; @@ -445,6 +448,10 @@ pub fn read_until_timeout(stream: &mut dyn Read) -> Vec { response } +pub fn make_meaningless_message_type() -> MessageType { + DnsResolveFailure_0v1::new(StreamKey::make_meaningless_stream_key()).into() +} + pub fn handle_connection_error(stream: TcpStream) { let _ = stream.shutdown(Shutdown::Both).is_ok(); thread::sleep(Duration::from_millis(5000)); @@ -519,9 +526,8 @@ pub mod unshared_test_utils { use crate::node_test_utils::DirsWrapperMock; use crate::sub_lib::accountant::{PaymentThresholds, ScanIntervals}; use crate::sub_lib::cryptde::CryptDE; - use crate::sub_lib::hopper::MessageType; use crate::sub_lib::neighborhood::{ConnectionProgressMessage, DEFAULT_RATE_PACK}; - use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; + use crate::sub_lib::proxy_client::ClientResponsePayload_0v1; use crate::sub_lib::proxy_server::{ClientRequestPayload_0v1, ProxyProtocol}; use crate::sub_lib::sequence_buffer::SequencedPacket; use crate::sub_lib::stream_key::StreamKey; @@ -703,10 +709,6 @@ pub mod unshared_test_utils { (recipient, recording_arc) } - pub fn make_meaningless_message_type() -> MessageType { - DnsResolveFailure_0v1::new(StreamKey::make_meaningless_stream_key()).into() - } - pub fn make_request_payload(bytes: usize, cryptde: &dyn CryptDE) -> ClientRequestPayload_0v1 { ClientRequestPayload_0v1 { stream_key: StreamKey::make_meaningful_stream_key("request"), From b29b5a86de670cbcb2e9b5c5ebcc0626671c63dc Mon Sep 17 00:00:00 2001 From: Bert Date: Wed, 19 Mar 2025 13:34:27 +0100 Subject: [PATCH 4/4] GH-591: removed commented out lines --- node/src/proxy_server/mod.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 9358f63bf..5ea296a4b 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -803,18 +803,8 @@ impl ProxyServer { fn transmit_to_hopper( args: TransmitToHopperArgs, - // main_cryptde: &'static dyn CryptDE, - // hopper: &Recipient, - // timestamp: SystemTime, - // payload: ClientRequestPayload_0v1, route: Route, expected_services: Vec, - // logger: &Logger, - // source_addr: SocketAddr, - // dispatcher: &Recipient, - // accountant_sub: &Recipient, - // retire_stream_key_via: Option<&Recipient>, - // is_decentralized: bool, ) -> Result<(), String> { let logger = args.logger; let destination_key_opt = if args.is_decentralized {