From 725c237c9737ba5dea15957ac222d597aea28cff Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Sat, 19 Oct 2024 22:49:33 +0700 Subject: [PATCH 1/7] feat(sdk): retry --- packages/rs-dapi-client/src/dapi_client.rs | 106 +++++++++++------- packages/rs-dapi-client/src/lib.rs | 39 +++++-- packages/rs-dapi-client/src/mock.rs | 46 +++++++- packages/rs-dapi-client/src/transport.rs | 2 +- .../rs-dapi-client/tests/mock_dapi_client.rs | 19 +++- packages/rs-sdk/src/sdk.rs | 2 +- 6 files changed, 156 insertions(+), 58 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 468ca399749..92f00fa5a50 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -3,7 +3,9 @@ use backon::{ExponentialBuilder, Retryable}; use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; +use std::error::Error; use std::fmt::Debug; +use std::future::Future; use std::sync::{Arc, RwLock}; use std::time::Duration; use tracing::Instrument; @@ -18,32 +20,44 @@ use crate::{ /// General DAPI request error type. #[derive(Debug, thiserror::Error)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] -pub enum DapiClientError { - /// The error happened on transport layer +pub enum DapiClientError +where + TE: Mockable, + PE: Mockable, +{ + /// The error happened on transport layer. #[error("transport error with {1}: {0}")] Transport( #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] TE, Address, ), + /// Processing error from the closure. + #[error("processing error: {0}")] + Processing(#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] PE), /// There are no valid DAPI addresses to use. #[error("no available addresses to use")] NoAvailableAddresses, - /// [AddressListError] errors + /// [AddressListError] errors. #[error("address list error: {0}")] AddressList(AddressListError), #[cfg(feature = "mocks")] #[error("mock error: {0}")] - /// Error happened in mock client + /// Error happened in mock client. Mock(#[from] crate::mock::MockError), } -impl CanRetry for DapiClientError { +impl CanRetry for DapiClientError +where + TE: CanRetry + Mockable, + PE: CanRetry + Mockable, +{ fn can_retry(&self) -> bool { use DapiClientError::*; match self { NoAvailableAddresses => false, Transport(transport_error, _) => transport_error.can_retry(), + Processing(processing_error) => processing_error.can_retry(), AddressList(_) => false, #[cfg(feature = "mocks")] Mock(_) => false, @@ -61,7 +75,7 @@ struct TransportErrorData { /// Serialization of [DapiClientError]. /// /// We need to do manual serialization because of the generic type parameter which doesn't support serde derive. -impl Mockable for DapiClientError { +impl Mockable for DapiClientError { #[cfg(feature = "mocks")] fn mock_serialize(&self) -> Option> { Some(serde_json::to_vec(self).expect("serialize DAPI client error")) @@ -77,15 +91,20 @@ impl Mockable for DapiClientError { /// DAPI client executor trait. pub trait DapiRequestExecutor { /// Execute request using this DAPI client. - async fn execute( + async fn execute( &self, request: R, + process_response: Arc, settings: RequestSettings, - ) -> Result::Error>> + ) -> Result::Error, PE>> where R: TransportRequest + Mockable, R::Response: Mockable, - ::Error: Mockable; + ::Error: Mockable, + PE: Error + Mockable + CanRetry + Send, + O: Debug + Send + Mockable, + F: Fn(R::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static; } /// Access point to DAPI. @@ -101,7 +120,7 @@ pub struct DapiClient { impl DapiClient { /// Initialize new [DapiClient] and optionally override default settings. pub fn new(address_list: AddressList, settings: RequestSettings) -> Self { - // multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case + // Multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case. let address_count = 3 * address_list.len(); Self { @@ -121,16 +140,20 @@ impl DapiClient { #[async_trait] impl DapiRequestExecutor for DapiClient { - /// Execute the [DapiRequest](crate::DapiRequest). - async fn execute( + async fn execute( &self, request: R, + process_response: Arc, settings: RequestSettings, - ) -> Result::Error>> + ) -> Result::Error, PE>> where R: TransportRequest + Mockable, - R::Response: Mockable, + R::Response: Mockable + Send, ::Error: Mockable, + PE: Error + Mockable + CanRetry + Send, + O: Debug + Send + Mockable, + F: Fn(R::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, { // Join settings of different sources to get final version of the settings for this execution: let applied_settings = self @@ -156,6 +179,9 @@ impl DapiRequestExecutor for DapiClient { // Setup DAPI request execution routine future. It's a closure that will be called // more once to build new future on each retry. let routine = move || { + // Clone `process_response` inside `routine` + let process_response = Arc::clone(&process_response); + // Try to get an address to initialize transport on: let address_list = self @@ -163,9 +189,10 @@ impl DapiRequestExecutor for DapiClient { .read() .expect("can't get address list for read"); - let address_result = address_list.get_live_address().cloned().ok_or( - DapiClientError::<::Error>::NoAvailableAddresses, - ); + let address_result = address_list + .get_live_address() + .cloned() + .ok_or(DapiClientError::NoAvailableAddresses); drop(address_list); @@ -200,24 +227,19 @@ impl DapiRequestExecutor for DapiClient { &applied_settings, &pool, ) - .map_err(|e| { - DapiClientError::<::Error>::Transport( - e, - address.clone(), - ) - })?; + .map_err(|e| DapiClientError::Transport(e, address.clone()))?; let response = transport_request .execute_transport(&mut transport_client, &applied_settings) .await - .map_err(|e| { - DapiClientError::<::Error>::Transport( - e, - address.clone(), - ) - }); - - match &response { + .map_err(|e| DapiClientError::Transport(e, address.clone()))?; + + // Processing response + let result = process_response(response) + .await + .map_err(DapiClientError::Processing); + + match &result { Ok(_) => { // Unban the address if it was banned and node responded successfully this time if address.is_banned() { @@ -226,11 +248,13 @@ impl DapiRequestExecutor for DapiClient { .write() .expect("can't get address list for write"); - address_list.unban_address(&address) - .map_err(DapiClientError::<::Error>::AddressList)?; + address_list + .unban_address(&address) + .map_err(DapiClientError::AddressList)?; } - tracing::trace!(?response, "received {} response", response_name); + // TODO: implement tracing + //tracing::trace!(?response, "received {} response", response_name); } Err(error) => { if !error.can_retry() { @@ -240,16 +264,17 @@ impl DapiRequestExecutor for DapiClient { .write() .expect("can't get address list for write"); - address_list.ban_address(&address) - .map_err(DapiClientError::<::Error>::AddressList)?; + address_list + .ban_address(&address) + .map_err(DapiClientError::AddressList)?; } } else { - tracing::trace!(?error, "received error"); + tracing::trace!(?error, "received retryable error"); } } }; - response + result } }; @@ -275,8 +300,9 @@ impl DapiRequestExecutor for DapiClient { } // Dump request and response to disk if dump_dir is set: - #[cfg(feature = "dump")] - Self::dump_request_response(&dump_request, &result, dump_dir); + // TODO: implement + //#[cfg(feature = "dump")] + //Self::dump_request_response(&dump_request, &response, dump_dir); result } diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 760d9ce2e78..6a18ab07299 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -22,34 +22,47 @@ use dapi_grpc::mock::Mockable; pub use dump::DumpData; use futures::{future::BoxFuture, FutureExt}; pub use request_settings::RequestSettings; +use std::error::Error; +use std::fmt::Debug; +use std::future::Future; +use std::sync::Arc; /// A DAPI request could be executed with an initialized [DapiClient]. /// /// # Examples /// ``` +/// use std::sync::Arc; /// use rs_dapi_client::{RequestSettings, AddressList, mock::MockDapiClient, DapiClientError, DapiRequest}; -/// use dapi_grpc::platform::v0::{self as proto}; +/// use dapi_grpc::platform::v0::{self as proto, GetIdentityResponse}; +/// use rs_dapi_client::mock::DummyProcessingError; /// /// # let _ = async { /// let mut client = MockDapiClient::new(); /// let request: proto::GetIdentityRequest = proto::get_identity_request::GetIdentityRequestV0 { id: b"0".to_vec(), prove: true }.into(); -/// let response = request.execute(&mut client, RequestSettings::default()).await?; -/// # Ok::<(), DapiClientError<_>>(()) +/// let process_response = Arc::new(|response| async move { Ok::(response) }); +/// let response = request.execute(&mut client, process_response, RequestSettings::default()).await?; +/// # Ok::<(), DapiClientError<_, _>>(()) /// # }; /// ``` pub trait DapiRequest { /// Response from DAPI for this specific request. - type Response; + type Response: Send; /// An error type for the transport this request uses. type TransportError: Mockable; /// Executes the request. - fn execute<'c, D: DapiRequestExecutor>( + fn execute<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, + process_response: Arc, settings: RequestSettings, - ) -> BoxFuture<'c, Result>> + ) -> BoxFuture<'c, Result>> where + D: DapiRequestExecutor, + PE: Error + Mockable + CanRetry + Send + Sync + 'static, + O: Debug + Mockable + Send + Sync + 'static, + F: Fn(Self::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, Self: 'c; } @@ -59,15 +72,23 @@ impl DapiRequest for T { type TransportError = ::Error; - fn execute<'c, D: DapiRequestExecutor>( + fn execute<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, + process_response: Arc, settings: RequestSettings, - ) -> BoxFuture<'c, Result>> + ) -> BoxFuture<'c, Result>> where + D: DapiRequestExecutor, + PE: Error + Mockable + CanRetry + Send + 'static, + O: Debug + Mockable + Send + 'static, + F: Fn(Self::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, Self: 'c, { - dapi_client.execute(self, settings).boxed() + dapi_client + .execute(self, process_response, settings) + .boxed() } } diff --git a/packages/rs-dapi-client/src/mock.rs b/packages/rs-dapi-client/src/mock.rs index 813546686d4..d30b0bb156c 100644 --- a/packages/rs-dapi-client/src/mock.rs +++ b/packages/rs-dapi-client/src/mock.rs @@ -13,12 +13,15 @@ use crate::{ transport::{TransportClient, TransportRequest}, - DapiClientError, DapiRequestExecutor, RequestSettings, + CanRetry, DapiClientError, DapiRequestExecutor, RequestSettings, }; use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; use hex::ToHex; use sha2::Digest; +use std::error::Error; +use std::future::Future; +use std::sync::Arc; use std::{ any::type_name, collections::HashMap, @@ -37,7 +40,10 @@ pub struct MockDapiClient { /// Result of executing a mock request pub type MockResult = Result< ::Response, - DapiClientError<<::Client as TransportClient>::Error>, + DapiClientError< + <::Client as TransportClient>::Error, + <::Client as TransportClient>::Error, + >, >; impl MockDapiClient { @@ -105,14 +111,20 @@ impl MockDapiClient { #[async_trait] impl DapiRequestExecutor for MockDapiClient { - async fn execute( + async fn execute( &self, request: R, + _process_response: Arc, _settings: RequestSettings, - ) -> MockResult + ) -> Result::Error, PE>> where - R: Mockable, + R: TransportRequest + Mockable, R::Response: Mockable, + ::Error: Mockable, + PE: Error + Mockable + CanRetry, + O: Debug + Mockable, + F: Fn(R::Response) -> Fut + Send + Sync, + Fut: Future> + Send, { let (key, response) = self.expectations.get(&request); @@ -256,3 +268,27 @@ impl Expectations { (key, response) } } + +/// Processing Error for tests +#[derive(Debug, thiserror::Error)] +#[error("DummyProcessingError occurred")] +// #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] +pub struct DummyProcessingError; + +impl CanRetry for DummyProcessingError { + fn can_retry(&self) -> bool { + false + } +} + +impl Mockable for DummyProcessingError { + #[cfg(feature = "mocks")] + fn mock_serialize(&self) -> Option> { + Some(vec![]) // Implement as needed + } + + #[cfg(feature = "mocks")] + fn mock_deserialize(_data: &[u8]) -> Option { + Some(DummyProcessingError) + } +} diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index 600189fc2fe..cd4698d32da 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -48,7 +48,7 @@ pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable { /// Generic way to create a transport client from provided [Uri]. pub trait TransportClient: Send + Sized { /// Error type for the specific client. - type Error: CanRetry + Send + Debug + Mockable; + type Error: CanRetry + Send + Sync + Debug + Mockable; /// Build client using node's url. fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result; diff --git a/packages/rs-dapi-client/tests/mock_dapi_client.rs b/packages/rs-dapi-client/tests/mock_dapi_client.rs index f069c4e47a3..bddeb580822 100644 --- a/packages/rs-dapi-client/tests/mock_dapi_client.rs +++ b/packages/rs-dapi-client/tests/mock_dapi_client.rs @@ -1,3 +1,7 @@ +use rs_dapi_client::mock::DummyProcessingError; +use rs_dapi_client::DapiClientError; +use std::convert::Infallible; +use std::sync::Arc; #[cfg(feature = "mocks")] use { dapi_grpc::platform::v0::{GetIdentityRequest, GetIdentityResponse, Proof}, @@ -27,9 +31,20 @@ async fn test_mock_get_identity_dapi_client() { let settings = RequestSettings::default(); - let result = dapi.execute(request.clone(), settings).await.unwrap(); + let process_response = + Arc::new( + |response| async move { Ok::(response) }, + ); - let result2 = request.execute(&dapi, settings).await.unwrap(); + let result = dapi + .execute(request.clone(), Arc::clone(&process_response), settings) + .await + .unwrap(); + + let result2 = request + .execute(&dapi, process_response, settings) + .await + .unwrap(); assert_eq!(result, response); assert_eq!(result2, response); diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 2b9268f2751..2fdda564e2a 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -83,7 +83,7 @@ pub type LastQueryTimestamp = u64; /// /// See tests/ for examples of using the SDK. pub struct Sdk { - /// The network that the sdk is configured for (Dash (mainnet), Testnet, Devnet, Regtest) + /// The network that the sdk is configured for (Dash (mainnet), Testnet, Devnet, Regtest) pub network: Network, inner: SdkInstance, /// Use proofs when retrieving data from Platform. From bff194737bb985de8d20edb729bb17222a796456 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Sat, 19 Oct 2024 23:15:07 +0700 Subject: [PATCH 2/7] refactor: remove arc --- packages/rs-dapi-client/src/dapi_client.rs | 6 ++++-- packages/rs-dapi-client/src/lib.rs | 7 +++---- packages/rs-dapi-client/src/mock.rs | 2 +- packages/rs-dapi-client/tests/mock_dapi_client.rs | 6 ++---- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 92f00fa5a50..3b3409a7fe7 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -94,7 +94,7 @@ pub trait DapiRequestExecutor { async fn execute( &self, request: R, - process_response: Arc, + process_response: F, settings: RequestSettings, ) -> Result::Error, PE>> where @@ -143,7 +143,7 @@ impl DapiRequestExecutor for DapiClient { async fn execute( &self, request: R, - process_response: Arc, + process_response: F, settings: RequestSettings, ) -> Result::Error, PE>> where @@ -176,6 +176,8 @@ impl DapiRequestExecutor for DapiClient { #[cfg(feature = "dump")] let dump_request = request.clone(); + let process_response = Arc::new(process_response); + // Setup DAPI request execution routine future. It's a closure that will be called // more once to build new future on each retry. let routine = move || { diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 6a18ab07299..db27f65f799 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -25,7 +25,6 @@ pub use request_settings::RequestSettings; use std::error::Error; use std::fmt::Debug; use std::future::Future; -use std::sync::Arc; /// A DAPI request could be executed with an initialized [DapiClient]. /// @@ -39,7 +38,7 @@ use std::sync::Arc; /// # let _ = async { /// let mut client = MockDapiClient::new(); /// let request: proto::GetIdentityRequest = proto::get_identity_request::GetIdentityRequestV0 { id: b"0".to_vec(), prove: true }.into(); -/// let process_response = Arc::new(|response| async move { Ok::(response) }); +/// let process_response = |response| async move { Ok::(response) }; /// let response = request.execute(&mut client, process_response, RequestSettings::default()).await?; /// # Ok::<(), DapiClientError<_, _>>(()) /// # }; @@ -54,7 +53,7 @@ pub trait DapiRequest { fn execute<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, - process_response: Arc, + process_response: F, settings: RequestSettings, ) -> BoxFuture<'c, Result>> where @@ -75,7 +74,7 @@ impl DapiRequest for T { fn execute<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, - process_response: Arc, + process_response: F, settings: RequestSettings, ) -> BoxFuture<'c, Result>> where diff --git a/packages/rs-dapi-client/src/mock.rs b/packages/rs-dapi-client/src/mock.rs index d30b0bb156c..bebfab2f62d 100644 --- a/packages/rs-dapi-client/src/mock.rs +++ b/packages/rs-dapi-client/src/mock.rs @@ -114,7 +114,7 @@ impl DapiRequestExecutor for MockDapiClient { async fn execute( &self, request: R, - _process_response: Arc, + _process_response: F, _settings: RequestSettings, ) -> Result::Error, PE>> where diff --git a/packages/rs-dapi-client/tests/mock_dapi_client.rs b/packages/rs-dapi-client/tests/mock_dapi_client.rs index bddeb580822..57eaf77863e 100644 --- a/packages/rs-dapi-client/tests/mock_dapi_client.rs +++ b/packages/rs-dapi-client/tests/mock_dapi_client.rs @@ -32,12 +32,10 @@ async fn test_mock_get_identity_dapi_client() { let settings = RequestSettings::default(); let process_response = - Arc::new( - |response| async move { Ok::(response) }, - ); + |response| async move { Ok::(response) }; let result = dapi - .execute(request.clone(), Arc::clone(&process_response), settings) + .execute(request.clone(), process_response, settings) .await .unwrap(); From ce9c8df6955c1d14b7ffd995bc2410b03509c1d9 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Sun, 20 Oct 2024 00:00:53 +0700 Subject: [PATCH 3/7] chore: implement for fetch --- packages/rs-dapi-client/src/dapi_client.rs | 5 +- packages/rs-dapi-client/src/lib.rs | 4 +- packages/rs-dapi-client/src/mock.rs | 14 +++--- packages/rs-sdk/src/core/transaction.rs | 18 +++++-- packages/rs-sdk/src/error.rs | 6 ++- packages/rs-sdk/src/platform/fetch.rs | 56 ++++++++++++++-------- packages/rs-sdk/src/sdk.rs | 25 ++++++++-- packages/rs-sdk/tests/fetch/epoch.rs | 10 +++- 8 files changed, 95 insertions(+), 43 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 3b3409a7fe7..b043180cb5f 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -94,6 +94,7 @@ pub trait DapiRequestExecutor { async fn execute( &self, request: R, + // TODO: Figure out how to make it optional. For example we can do two methods: execute and execute_and_process process_response: F, settings: RequestSettings, ) -> Result::Error, PE>> @@ -102,7 +103,7 @@ pub trait DapiRequestExecutor { R::Response: Mockable, ::Error: Mockable, PE: Error + Mockable + CanRetry + Send, - O: Debug + Send + Mockable, + O: Debug + Send, F: Fn(R::Response) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static; } @@ -151,7 +152,7 @@ impl DapiRequestExecutor for DapiClient { R::Response: Mockable + Send, ::Error: Mockable, PE: Error + Mockable + CanRetry + Send, - O: Debug + Send + Mockable, + O: Debug + Send, F: Fn(R::Response) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index db27f65f799..926eac4507e 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -59,7 +59,7 @@ pub trait DapiRequest { where D: DapiRequestExecutor, PE: Error + Mockable + CanRetry + Send + Sync + 'static, - O: Debug + Mockable + Send + Sync + 'static, + O: Debug + Send + Sync + 'static, F: Fn(Self::Response) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, Self: 'c; @@ -80,7 +80,7 @@ impl DapiRequest for T { where D: DapiRequestExecutor, PE: Error + Mockable + CanRetry + Send + 'static, - O: Debug + Mockable + Send + 'static, + O: Debug + Send + 'static, F: Fn(Self::Response) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, Self: 'c, diff --git a/packages/rs-dapi-client/src/mock.rs b/packages/rs-dapi-client/src/mock.rs index bebfab2f62d..16d1f558204 100644 --- a/packages/rs-dapi-client/src/mock.rs +++ b/packages/rs-dapi-client/src/mock.rs @@ -114,7 +114,7 @@ impl DapiRequestExecutor for MockDapiClient { async fn execute( &self, request: R, - _process_response: F, + process_response: F, _settings: RequestSettings, ) -> Result::Error, PE>> where @@ -122,7 +122,7 @@ impl DapiRequestExecutor for MockDapiClient { R::Response: Mockable, ::Error: Mockable, PE: Error + Mockable + CanRetry, - O: Debug + Mockable, + O: Debug, F: Fn(R::Response) -> Fut + Send + Sync, Fut: Future> + Send, { @@ -136,15 +136,17 @@ impl DapiRequestExecutor for MockDapiClient { "mock execute" ); - return if let Some(response) = response { - response + if let Some(response) = response { + process_response(response) + .await + .map_err(DapiClientError::Processing) } else { Err(MockError::MockExpectationNotFound(format!( "unexpected mock request with key {}, use MockDapiClient::expect(): {:?}", key, request )) .into()) - }; + } } } @@ -260,7 +262,7 @@ impl Expectations { /// Get the response for a given request. /// /// Returns `None` if the request has not been expected. - pub fn get(&self, request: &I) -> (Key, Option) { + pub fn get(&self, request: &I) -> (Key, Option) { let key = Key::new(request); let response = self.expectations.get(&key).and_then(|v| v.deserialize()); diff --git a/packages/rs-sdk/src/core/transaction.rs b/packages/rs-sdk/src/core/transaction.rs index 39dd85e562c..38dc4c89825 100644 --- a/packages/rs-sdk/src/core/transaction.rs +++ b/packages/rs-sdk/src/core/transaction.rs @@ -11,6 +11,7 @@ use dpp::dashcore::{Address, InstantLock, MerkleBlock, OutPoint, Transaction, Tx use dpp::identity::state_transition::asset_lock_proof::chain::ChainAssetLockProof; use dpp::identity::state_transition::asset_lock_proof::InstantAssetLockProof; use dpp::prelude::AssetLockProof; +use std::sync::Arc; use rs_dapi_client::{DapiRequestExecutor, RequestSettings}; use std::time::Duration; @@ -54,9 +55,16 @@ impl Sdk { from_block_hash, )), }; - self.execute(core_transactions_stream, RequestSettings::default()) - .await - .map_err(|e| Error::DapiClientError(e.to_string())) + + let process_response = |response| async move { Ok::<_, Error>(response) }; + + self.execute( + core_transactions_stream, + process_response, + RequestSettings::default(), + ) + .await + .map_err(|e| Error::DapiClientError(e.to_string())) } /// Waits for a response for the asset lock proof @@ -76,6 +84,8 @@ impl Sdk { tracing::debug!("waiting for messages from stream"); + let process_response = |response| async move { Ok::<_, Error>(response) }; + // Define an inner async block to handle the stream processing. let stream_processing = async { loop { @@ -168,6 +178,7 @@ impl Sdk { // Wait until the block is chainlocked let mut core_chain_locked_height; + loop { let GetTransactionResponse { height, @@ -178,6 +189,7 @@ impl Sdk { GetTransactionRequest { id: transaction_id.to_string(), }, + process_response, RequestSettings::default(), ) .await?; diff --git a/packages/rs-sdk/src/error.rs b/packages/rs-sdk/src/error.rs index e55bda4742e..3b77f30fb06 100644 --- a/packages/rs-sdk/src/error.rs +++ b/packages/rs-sdk/src/error.rs @@ -73,8 +73,8 @@ pub enum Error { StaleNode(#[from] StaleNodeError), } -impl From> for Error { - fn from(value: DapiClientError) -> Self { +impl From> for Error { + fn from(value: DapiClientError) -> Self { Self::DapiClientError(format!("{:?}", value)) } } @@ -91,6 +91,8 @@ impl CanRetry for Error { } } +impl Mockable for Error {} + /// Server returned stale metadata #[derive(Debug, thiserror::Error)] pub enum StaleNodeError { diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 109140bdb7c..4f575b96dbc 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -8,6 +8,8 @@ //! It requires the implementing type to also implement [Debug] and [FromProof] //! traits. The associated [Fetch::Request]` type needs to implement [TransportRequest]. +use super::types::identity::IdentityRequest; +use super::DocumentQuery; use crate::mock::MockResponse; use crate::{error::Error, platform::query::Query, Sdk}; use dapi_grpc::platform::v0::{self as platform_proto, Proof, ResponseMetadata}; @@ -19,9 +21,7 @@ use dpp::{ use drive_proof_verifier::FromProof; use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; use std::fmt::Debug; - -use super::types::identity::IdentityRequest; -use super::DocumentQuery; +use std::sync::Arc; /// Trait implemented by objects that can be fetched from Platform. /// @@ -53,18 +53,22 @@ where Self: Sized + Debug + MockResponse + + Send + + Sync + FromProof< ::Request, Request = ::Request, Response = <::Request as DapiRequest>::Response, - >, + > + 'static, { /// Type of request used to fetch data from Platform. /// /// Most likely, one of the types defined in [`dapi_grpc::platform::v0`]. /// /// This type must implement [`TransportRequest`] and [`MockRequest`]. - type Request: TransportRequest + Into<::Request>>::Request>; + type Request: TransportRequest + + Into<::Request>>::Request> + + 'static; /// Fetch single object from Platform. /// @@ -120,17 +124,22 @@ where settings: Option, ) -> Result<(Option, ResponseMetadata), Error> { let request = query.query(sdk.prove())?; + let request_arc = Arc::new(request.clone()); + let sdk_arc = Arc::new(sdk.clone()); - let response = request - .clone() - .execute(sdk, settings.unwrap_or_default()) - .await?; + let process_response = move |response| { + let request = Arc::clone(&request_arc); + let sdk = Arc::clone(&sdk_arc); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + async move { + sdk.parse_proof_with_metadata((*request).clone(), response) + .await + } + }; - let (object, response_metadata): (Option, ResponseMetadata) = - sdk.parse_proof_with_metadata(request, response).await?; + let (object, response_metadata): (Option, ResponseMetadata) = request + .execute(sdk, process_response, settings.unwrap_or_default()) + .await?; match object { Some(item) => Ok((item.into(), response_metadata)), @@ -169,16 +178,21 @@ where ) -> Result<(Option, ResponseMetadata, Proof), Error> { let request = query.query(sdk.prove())?; - let response = request - .clone() - .execute(sdk, settings.unwrap_or_default()) - .await?; + let request_arc = Arc::new(request.clone()); + let sdk_arc = Arc::new(sdk.clone()); + + let process_response = move |response| { + let request = Arc::clone(&request_arc); + let sdk = Arc::clone(&sdk_arc); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + async move { + sdk.parse_proof_with_metadata_and_proof((*request).clone(), response) + .await + } + }; - let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = sdk - .parse_proof_with_metadata_and_proof(request, response) + let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = request + .execute(sdk, process_response, settings.unwrap_or_default()) .await?; match object { diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 2fdda564e2a..b15b400dc09 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -28,10 +28,11 @@ pub use rs_dapi_client::AddressList; pub use rs_dapi_client::RequestSettings; use rs_dapi_client::{ transport::{TransportClient, TransportRequest}, - DapiClient, DapiClientError, DapiRequestExecutor, + CanRetry, DapiClient, DapiClientError, DapiRequestExecutor, }; use std::collections::btree_map::Entry; use std::fmt::Debug; +use std::future::Future; #[cfg(feature = "mocks")] use std::num::NonZeroUsize; #[cfg(feature = "mocks")] @@ -668,17 +669,31 @@ fn verify_metadata_height( #[async_trait::async_trait] impl DapiRequestExecutor for Sdk { - async fn execute( + async fn execute( &self, request: R, + process_response: F, settings: RequestSettings, - ) -> Result::Error>> { + ) -> Result::Error, PE>> + where + R: TransportRequest + Mockable, + R::Response: Mockable, + ::Error: Mockable, + PE: std::error::Error + Mockable + CanRetry + Send, + O: Debug + Send, + F: Fn(R::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { match self.inner { - SdkInstance::Dapi { ref dapi, .. } => dapi.execute(request, settings).await, + SdkInstance::Dapi { ref dapi, .. } => { + dapi.execute(request, process_response, settings).await + } #[cfg(feature = "mocks")] SdkInstance::Mock { ref dapi, .. } => { let dapi_guard = dapi.lock().await; - dapi_guard.execute(request, settings).await + dapi_guard + .execute(request, process_response, settings) + .await } } } diff --git a/packages/rs-sdk/tests/fetch/epoch.rs b/packages/rs-sdk/tests/fetch/epoch.rs index 0c8f429d7fb..30794f934de 100644 --- a/packages/rs-sdk/tests/fetch/epoch.rs +++ b/packages/rs-sdk/tests/fetch/epoch.rs @@ -8,7 +8,7 @@ use dash_sdk::{ fetch_current_no_parameters::FetchCurrent, Fetch, FetchMany, LimitQuery, DEFAULT_EPOCH_QUERY_LIMIT, }, - Sdk, + Error, Sdk, }; use dpp::block::epoch::EpochIndex; use dpp::block::extended_epoch_info::v0::ExtendedEpochInfoV0Getters; @@ -25,8 +25,14 @@ async fn get_current_epoch(sdk: &Sdk, cfg: &Config) -> EpochIndex { } .into(); + let process_response = |response| async move { Ok::<_, Error>(response) }; + let response = sdk - .execute(identity_request, RequestSettings::default()) + .execute( + identity_request, + process_response, + RequestSettings::default(), + ) .await .expect("get identity"); From 1441aacef497f411c20cd87013987973e69a6093 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Mon, 21 Oct 2024 11:43:57 +0700 Subject: [PATCH 4/7] chore: update status ban for transport response as well --- packages/rs-dapi-client/src/dapi_client.rs | 116 +++++++++++++-------- packages/rs-dapi-client/src/transport.rs | 3 +- 2 files changed, 76 insertions(+), 43 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index b043180cb5f..229d515d52d 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -12,6 +12,7 @@ use tracing::Instrument; use crate::address_list::AddressListError; use crate::connection_pool::ConnectionPool; +use crate::request_settings::AppliedRequestSettings; use crate::{ transport::{TransportClient, TransportRequest}, Address, AddressList, CanRetry, RequestSettings, @@ -232,52 +233,38 @@ impl DapiRequestExecutor for DapiClient { ) .map_err(|e| DapiClientError::Transport(e, address.clone()))?; - let response = transport_request + let response_result = transport_request .execute_transport(&mut transport_client, &applied_settings) .await - .map_err(|e| DapiClientError::Transport(e, address.clone()))?; - - // Processing response - let result = process_response(response) - .await - .map_err(DapiClientError::Processing); - - match &result { - Ok(_) => { - // Unban the address if it was banned and node responded successfully this time - if address.is_banned() { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list - .unban_address(&address) - .map_err(DapiClientError::AddressList)?; - } - - // TODO: implement tracing - //tracing::trace!(?response, "received {} response", response_name); + .map_err(|e| DapiClientError::Transport(e, address.clone())); + + match response_result { + Ok(response) => { + // Processing response + let result = process_response(response) + .await + .map_err(DapiClientError::Processing); + + update_ban_status( + &result, + Arc::clone(&self.address_list), + address, + applied_settings, + )?; + + result } - Err(error) => { - if !error.can_retry() { - if applied_settings.ban_failed_address { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list - .ban_address(&address) - .map_err(DapiClientError::AddressList)?; - } - } else { - tracing::trace!(?error, "received retryable error"); - } + error_result => { + update_ban_status( + &error_result, + Arc::clone(&self.address_list), + address, + applied_settings, + )?; + + Err(error_result.unwrap_err()) } - }; - - result + } } }; @@ -310,3 +297,48 @@ impl DapiRequestExecutor for DapiClient { result } } + +fn update_ban_status( + result: &Result, + address_list: Arc>, + address: Address, + applied_settings: AppliedRequestSettings, +) -> Result<(), DapiClientError> +where + E: Error + CanRetry, + PE: Mockable, + TE: Mockable, +{ + match result { + Ok(_) => { + // Unban the address if it was banned and node responded successfully this time + if address.is_banned() { + let mut list = address_list + .write() + .expect("can't get address list for write"); + + list.unban_address(&address) + .map_err(DapiClientError::AddressList)?; + } + + // TODO: implement tracing + //tracing::trace!(?response, "received {} response", response_name); + } + Err(error) => { + if !error.can_retry() { + if applied_settings.ban_failed_address { + let mut list = address_list + .write() + .expect("can't get address list for write"); + + list.ban_address(&address) + .map_err(DapiClientError::AddressList)?; + } + } else { + tracing::trace!(?error, "received retryable error"); + } + } + }; + + Ok(()) +} diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index cd4698d32da..a502ca097ff 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -10,6 +10,7 @@ use dapi_grpc::tonic::transport::Uri; pub use futures::future::BoxFuture; pub use grpc::{CoreGrpcClient, PlatformGrpcClient}; use std::any; +use std::error::Error; use std::fmt::Debug; /// Generic transport layer request. @@ -48,7 +49,7 @@ pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable { /// Generic way to create a transport client from provided [Uri]. pub trait TransportClient: Send + Sized { /// Error type for the specific client. - type Error: CanRetry + Send + Sync + Debug + Mockable; + type Error: Error + CanRetry + Send + Sync + Mockable; /// Build client using node's url. fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result; From 9744417247ae939cff059e2bd879fce4d7242193 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Mon, 21 Oct 2024 12:13:59 +0700 Subject: [PATCH 5/7] chore: implement fetch many --- packages/rs-sdk/src/mock/sdk.rs | 5 +- packages/rs-sdk/src/platform/fetch.rs | 15 ++--- packages/rs-sdk/src/platform/fetch_many.rs | 73 +++++++++++++--------- 3 files changed, 54 insertions(+), 39 deletions(-) diff --git a/packages/rs-sdk/src/mock/sdk.rs b/packages/rs-sdk/src/mock/sdk.rs index bc9c3927716..a8d9c285e46 100644 --- a/packages/rs-sdk/src/mock/sdk.rs +++ b/packages/rs-sdk/src/mock/sdk.rs @@ -24,6 +24,7 @@ use rs_dapi_client::{ transport::TransportRequest, DapiClient, DumpData, }; +use std::fmt::Debug; use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; use tokio::sync::{Mutex, OwnedMutexGuard}; @@ -325,7 +326,9 @@ impl MockDashPlatformSdk { Response = <>::Request as TransportRequest>::Response, > + Sync + Send - + Default, + + Default + + Debug + + 'static, <>::Request as TransportRequest>::Response: Default, { let grpc_request = query.query(self.prove()).expect("query must be correct"); diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 4f575b96dbc..3d68e7a55e2 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -124,17 +124,14 @@ where settings: Option, ) -> Result<(Option, ResponseMetadata), Error> { let request = query.query(sdk.prove())?; - let request_arc = Arc::new(request.clone()); + let request_clone = request.clone(); let sdk_arc = Arc::new(sdk.clone()); let process_response = move |response| { - let request = Arc::clone(&request_arc); + let request = request_clone.clone(); let sdk = Arc::clone(&sdk_arc); - async move { - sdk.parse_proof_with_metadata((*request).clone(), response) - .await - } + async move { sdk.parse_proof_with_metadata(request, response).await } }; let (object, response_metadata): (Option, ResponseMetadata) = request @@ -178,15 +175,15 @@ where ) -> Result<(Option, ResponseMetadata, Proof), Error> { let request = query.query(sdk.prove())?; - let request_arc = Arc::new(request.clone()); + let request_clone = request.clone(); let sdk_arc = Arc::new(sdk.clone()); let process_response = move |response| { - let request = Arc::clone(&request_arc); + let request = request_clone.clone(); let sdk = Arc::clone(&sdk_arc); async move { - sdk.parse_proof_with_metadata_and_proof((*request).clone(), response) + sdk.parse_proof_with_metadata_and_proof(request, response) .await } }; diff --git a/packages/rs-sdk/src/platform/fetch_many.rs b/packages/rs-sdk/src/platform/fetch_many.rs index 4ecf6e42cc0..aa355dd5edd 100644 --- a/packages/rs-sdk/src/platform/fetch_many.rs +++ b/packages/rs-sdk/src/platform/fetch_many.rs @@ -4,6 +4,7 @@ //! //! ## Traits //! - `[FetchMany]`: An async trait that fetches multiple items of a specific type from Platform. + use super::LimitQuery; use crate::{ error::Error, @@ -14,11 +15,10 @@ use crate::{ use dapi_grpc::platform::v0::{ GetContestedResourceIdentityVotesRequest, GetContestedResourceVoteStateRequest, GetContestedResourceVotersForIdentityRequest, GetContestedResourcesRequest, - GetDataContractsRequest, GetDocumentsResponse, GetEpochsInfoRequest, - GetEvonodesProposedEpochBlocksByIdsRequest, GetEvonodesProposedEpochBlocksByRangeRequest, - GetIdentitiesBalancesRequest, GetIdentityKeysRequest, GetPathElementsRequest, - GetProtocolVersionUpgradeStateRequest, GetProtocolVersionUpgradeVoteStatusRequest, - GetVotePollsByEndDateRequest, + GetDataContractsRequest, GetEpochsInfoRequest, GetEvonodesProposedEpochBlocksByIdsRequest, + GetEvonodesProposedEpochBlocksByRangeRequest, GetIdentitiesBalancesRequest, + GetIdentityKeysRequest, GetPathElementsRequest, GetProtocolVersionUpgradeStateRequest, + GetProtocolVersionUpgradeVoteStatusRequest, GetVotePollsByEndDateRequest, }; use dashcore_rpc::dashcore::ProTxHash; use dpp::data_contract::DataContract; @@ -41,6 +41,8 @@ use drive_proof_verifier::types::{ }; use drive_proof_verifier::{types::Documents, FromProof}; use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; +use std::fmt::Debug; +use std::sync::Arc; /// Fetch multiple objects from Platform. /// @@ -89,7 +91,10 @@ where Request = Self::Request, Response = <>::Request as TransportRequest>::Response, > + Send - + Default, + + Sync + + Default + + Debug + + 'static, { /// Type of request used to fetch multiple objects from Platform. /// @@ -97,7 +102,8 @@ where /// /// This type must implement [`TransportRequest`] and [`MockRequest`](crate::mock::MockRequest). type Request: TransportRequest - + Into<>::Request>>::Request>; + + Into<>::Request>>::Request> + + 'static; /// Fetch (or search) multiple objects on the Dash Platform /// @@ -143,20 +149,24 @@ where ) -> Result { let request = query.query(sdk.prove())?; - let response = request - .clone() - .execute(sdk, RequestSettings::default()) - .await?; + let request_clone = request.clone(); + let sdk_arc = Arc::new(sdk.clone()); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + let process_response = move |response| { + let request = request_clone.clone(); + let sdk = Arc::clone(&sdk_arc); - let object: O = sdk - .parse_proof::<>::Request, O>(request, response) - .await? - .unwrap_or_default(); + async move { + sdk.parse_proof::<>::Request, O>(request, response) + .await + .map(|x| x.unwrap_or_default()) + } + }; - Ok(object) + request + .execute(sdk, process_response, RequestSettings::default()) + .await + .map_err(Error::from) } /// Fetch multiple objects from Platform by their identifiers. @@ -185,7 +195,7 @@ where /// Fetch multiple objects from Platform with limit. /// - /// Fetches up to `limit` objects matching the `query`. + /// Fetches up to `limit` objects matching the `query`. /// See [FetchMany] and [FetchMany::fetch_many()] for more detailed documentation. /// /// ## Parameters @@ -231,19 +241,24 @@ impl FetchMany for Document { ) -> Result { let document_query: DocumentQuery = query.query(sdk.prove())?; - let request = document_query.clone(); - let response: GetDocumentsResponse = - request.execute(sdk, RequestSettings::default()).await?; + let document_query_clone = document_query.clone(); + let sdk_arc = Arc::new(sdk.clone()); - tracing::trace!(request=?document_query, response=?response, "fetch multiple documents"); + let process_response = move |response| { + let document_query = document_query_clone.clone(); + let sdk = Arc::clone(&sdk_arc); - // let object: Option> = sdk - let documents: Documents = sdk - .parse_proof::(document_query, response) - .await? - .unwrap_or_default(); + async move { + sdk.parse_proof::(document_query, response) + .await + .map(|x| x.unwrap_or_default()) + } + }; - Ok(documents) + document_query + .execute(sdk, process_response, RequestSettings::default()) + .await + .map_err(Error::from) } } From fc3c8277f0f66f3c4ae9254536d004a03d7e2f8c Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Mon, 21 Oct 2024 12:54:53 +0700 Subject: [PATCH 6/7] feat: implement default execute without processing --- packages/rs-dapi-client/src/dapi_client.rs | 38 ++++++++++++++++++- packages/rs-dapi-client/src/lib.rs | 33 ++++++++++++---- packages/rs-dapi-client/src/mock.rs | 29 ++------------ packages/rs-dapi-client/src/transport.rs | 2 +- .../rs-dapi-client/tests/mock_dapi_client.rs | 17 +-------- packages/rs-sdk/src/core/transaction.rs | 15 ++------ packages/rs-sdk/src/error.rs | 6 ++- packages/rs-sdk/src/platform/fetch.rs | 4 +- packages/rs-sdk/src/platform/fetch_many.rs | 4 +- packages/rs-sdk/src/sdk.rs | 7 ++-- 10 files changed, 82 insertions(+), 73 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 229d515d52d..f7b3cee7a5d 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -18,6 +18,20 @@ use crate::{ Address, AddressList, CanRetry, RequestSettings, }; +/// Processing Error for tests and non-processing request execution +#[derive(Debug, thiserror::Error)] +#[error("dummy processing error")] +#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] +pub struct DummyProcessingError; + +impl CanRetry for DummyProcessingError { + fn can_retry(&self) -> bool { + false + } +} + +impl Mockable for DummyProcessingError {} + /// General DAPI request error type. #[derive(Debug, thiserror::Error)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] @@ -92,7 +106,27 @@ impl Mockable for DapiClientError { /// DAPI client executor trait. pub trait DapiRequestExecutor { /// Execute request using this DAPI client. - async fn execute( + async fn execute( + &self, + request: R, + settings: RequestSettings, + ) -> Result< + R::Response, + DapiClientError<::Error, DummyProcessingError>, + > + where + R: TransportRequest + Mockable, + R::Response: Mockable, + ::Error: Mockable, + { + let process_response = move |response: R::Response| async move { Ok(response) }; + + self.execute_and_process(request, process_response, settings) + .await + } + + /// Execute request using this DAPI client. + async fn execute_and_process( &self, request: R, // TODO: Figure out how to make it optional. For example we can do two methods: execute and execute_and_process @@ -142,7 +176,7 @@ impl DapiClient { #[async_trait] impl DapiRequestExecutor for DapiClient { - async fn execute( + async fn execute_and_process( &self, request: R, process_response: F, diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 926eac4507e..2663802d80d 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -12,6 +12,7 @@ pub mod mock; mod request_settings; pub mod transport; +use crate::dapi_client::DummyProcessingError; pub use address_list::Address; pub use address_list::AddressList; pub use connection_pool::ConnectionPool; @@ -33,24 +34,40 @@ use std::future::Future; /// use std::sync::Arc; /// use rs_dapi_client::{RequestSettings, AddressList, mock::MockDapiClient, DapiClientError, DapiRequest}; /// use dapi_grpc::platform::v0::{self as proto, GetIdentityResponse}; -/// use rs_dapi_client::mock::DummyProcessingError; /// /// # let _ = async { /// let mut client = MockDapiClient::new(); /// let request: proto::GetIdentityRequest = proto::get_identity_request::GetIdentityRequestV0 { id: b"0".to_vec(), prove: true }.into(); -/// let process_response = |response| async move { Ok::(response) }; -/// let response = request.execute(&mut client, process_response, RequestSettings::default()).await?; +/// let response = request.execute(&mut client, RequestSettings::default()).await?; /// # Ok::<(), DapiClientError<_, _>>(()) /// # }; /// ``` pub trait DapiRequest { /// Response from DAPI for this specific request. - type Response: Send; + type Response: Send + Debug + 'static; /// An error type for the transport this request uses. type TransportError: Mockable; /// Executes the request. - fn execute<'c, D, O, PE, F, Fut>( + fn execute<'c, D: DapiRequestExecutor>( + self, + dapi_client: &'c D, + settings: RequestSettings, + ) -> BoxFuture< + 'c, + Result>, + > + where + Self: 'c, + Self: Sized, + { + let process_response = move |response: Self::Response| async move { Ok(response) }; + + self.execute_and_process(dapi_client, process_response, settings) + } + + /// Executes the request. + fn execute_and_process<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, process_response: F, @@ -59,7 +76,7 @@ pub trait DapiRequest { where D: DapiRequestExecutor, PE: Error + Mockable + CanRetry + Send + Sync + 'static, - O: Debug + Send + Sync + 'static, + O: Debug + Send + 'static, F: Fn(Self::Response) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, Self: 'c; @@ -71,7 +88,7 @@ impl DapiRequest for T { type TransportError = ::Error; - fn execute<'c, D, O, PE, F, Fut>( + fn execute_and_process<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, process_response: F, @@ -86,7 +103,7 @@ impl DapiRequest for T { Self: 'c, { dapi_client - .execute(self, process_response, settings) + .execute_and_process(self, process_response, settings) .boxed() } } diff --git a/packages/rs-dapi-client/src/mock.rs b/packages/rs-dapi-client/src/mock.rs index 16d1f558204..2e75e8c1b32 100644 --- a/packages/rs-dapi-client/src/mock.rs +++ b/packages/rs-dapi-client/src/mock.rs @@ -11,6 +11,7 @@ //! //! See `tests/mock_dapi_client.rs` for an example. +use crate::dapi_client::DummyProcessingError; use crate::{ transport::{TransportClient, TransportRequest}, CanRetry, DapiClientError, DapiRequestExecutor, RequestSettings, @@ -42,7 +43,7 @@ pub type MockResult = Result< ::Response, DapiClientError< <::Client as TransportClient>::Error, - <::Client as TransportClient>::Error, + DummyProcessingError, >, >; @@ -111,7 +112,7 @@ impl MockDapiClient { #[async_trait] impl DapiRequestExecutor for MockDapiClient { - async fn execute( + async fn execute_and_process( &self, request: R, process_response: F, @@ -270,27 +271,3 @@ impl Expectations { (key, response) } } - -/// Processing Error for tests -#[derive(Debug, thiserror::Error)] -#[error("DummyProcessingError occurred")] -// #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] -pub struct DummyProcessingError; - -impl CanRetry for DummyProcessingError { - fn can_retry(&self) -> bool { - false - } -} - -impl Mockable for DummyProcessingError { - #[cfg(feature = "mocks")] - fn mock_serialize(&self) -> Option> { - Some(vec![]) // Implement as needed - } - - #[cfg(feature = "mocks")] - fn mock_deserialize(_data: &[u8]) -> Option { - Some(DummyProcessingError) - } -} diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index a502ca097ff..2bd12726452 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -20,7 +20,7 @@ pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable { type Client: TransportClient; /// Transport layer response. - type Response: Mockable + Send + Debug; + type Response: Mockable + Send + Debug + 'static; /// Settings that will override [DapiClient](crate::DapiClient)'s ones each time the request is executed. const SETTINGS_OVERRIDES: RequestSettings; diff --git a/packages/rs-dapi-client/tests/mock_dapi_client.rs b/packages/rs-dapi-client/tests/mock_dapi_client.rs index 57eaf77863e..f069c4e47a3 100644 --- a/packages/rs-dapi-client/tests/mock_dapi_client.rs +++ b/packages/rs-dapi-client/tests/mock_dapi_client.rs @@ -1,7 +1,3 @@ -use rs_dapi_client::mock::DummyProcessingError; -use rs_dapi_client::DapiClientError; -use std::convert::Infallible; -use std::sync::Arc; #[cfg(feature = "mocks")] use { dapi_grpc::platform::v0::{GetIdentityRequest, GetIdentityResponse, Proof}, @@ -31,18 +27,9 @@ async fn test_mock_get_identity_dapi_client() { let settings = RequestSettings::default(); - let process_response = - |response| async move { Ok::(response) }; + let result = dapi.execute(request.clone(), settings).await.unwrap(); - let result = dapi - .execute(request.clone(), process_response, settings) - .await - .unwrap(); - - let result2 = request - .execute(&dapi, process_response, settings) - .await - .unwrap(); + let result2 = request.execute(&dapi, settings).await.unwrap(); assert_eq!(result, response); assert_eq!(result2, response); diff --git a/packages/rs-sdk/src/core/transaction.rs b/packages/rs-sdk/src/core/transaction.rs index 38dc4c89825..dbfb3e8125f 100644 --- a/packages/rs-sdk/src/core/transaction.rs +++ b/packages/rs-sdk/src/core/transaction.rs @@ -56,15 +56,9 @@ impl Sdk { )), }; - let process_response = |response| async move { Ok::<_, Error>(response) }; - - self.execute( - core_transactions_stream, - process_response, - RequestSettings::default(), - ) - .await - .map_err(|e| Error::DapiClientError(e.to_string())) + self.execute(core_transactions_stream, RequestSettings::default()) + .await + .map_err(|e| Error::DapiClientError(e.to_string())) } /// Waits for a response for the asset lock proof @@ -84,8 +78,6 @@ impl Sdk { tracing::debug!("waiting for messages from stream"); - let process_response = |response| async move { Ok::<_, Error>(response) }; - // Define an inner async block to handle the stream processing. let stream_processing = async { loop { @@ -189,7 +181,6 @@ impl Sdk { GetTransactionRequest { id: transaction_id.to_string(), }, - process_response, RequestSettings::default(), ) .await?; diff --git a/packages/rs-sdk/src/error.rs b/packages/rs-sdk/src/error.rs index 3b77f30fb06..a4cf514375c 100644 --- a/packages/rs-sdk/src/error.rs +++ b/packages/rs-sdk/src/error.rs @@ -73,8 +73,10 @@ pub enum Error { StaleNode(#[from] StaleNodeError), } -impl From> for Error { - fn from(value: DapiClientError) -> Self { +impl From> + for Error +{ + fn from(value: DapiClientError) -> Self { Self::DapiClientError(format!("{:?}", value)) } } diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 3d68e7a55e2..5c2832a1232 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -135,7 +135,7 @@ where }; let (object, response_metadata): (Option, ResponseMetadata) = request - .execute(sdk, process_response, settings.unwrap_or_default()) + .execute_and_process(sdk, process_response, settings.unwrap_or_default()) .await?; match object { @@ -189,7 +189,7 @@ where }; let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = request - .execute(sdk, process_response, settings.unwrap_or_default()) + .execute_and_process(sdk, process_response, settings.unwrap_or_default()) .await?; match object { diff --git a/packages/rs-sdk/src/platform/fetch_many.rs b/packages/rs-sdk/src/platform/fetch_many.rs index aa355dd5edd..249f0fe1a70 100644 --- a/packages/rs-sdk/src/platform/fetch_many.rs +++ b/packages/rs-sdk/src/platform/fetch_many.rs @@ -164,7 +164,7 @@ where }; request - .execute(sdk, process_response, RequestSettings::default()) + .execute_and_process(sdk, process_response, RequestSettings::default()) .await .map_err(Error::from) } @@ -256,7 +256,7 @@ impl FetchMany for Document { }; document_query - .execute(sdk, process_response, RequestSettings::default()) + .execute_and_process(sdk, process_response, RequestSettings::default()) .await .map_err(Error::from) } diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index b15b400dc09..22ba4b6ac65 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -669,7 +669,7 @@ fn verify_metadata_height( #[async_trait::async_trait] impl DapiRequestExecutor for Sdk { - async fn execute( + async fn execute_and_process( &self, request: R, process_response: F, @@ -686,13 +686,14 @@ impl DapiRequestExecutor for Sdk { { match self.inner { SdkInstance::Dapi { ref dapi, .. } => { - dapi.execute(request, process_response, settings).await + dapi.execute_and_process(request, process_response, settings) + .await } #[cfg(feature = "mocks")] SdkInstance::Mock { ref dapi, .. } => { let dapi_guard = dapi.lock().await; dapi_guard - .execute(request, process_response, settings) + .execute_and_process(request, process_response, settings) .await } } From 19b593dcdf9ff66261b9044188f1d072729e1bb5 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Mon, 21 Oct 2024 14:35:22 +0700 Subject: [PATCH 7/7] fix: get_current_epoch is not compiling --- packages/rs-sdk/tests/fetch/epoch.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/rs-sdk/tests/fetch/epoch.rs b/packages/rs-sdk/tests/fetch/epoch.rs index 30794f934de..e6805ed700b 100644 --- a/packages/rs-sdk/tests/fetch/epoch.rs +++ b/packages/rs-sdk/tests/fetch/epoch.rs @@ -25,14 +25,8 @@ async fn get_current_epoch(sdk: &Sdk, cfg: &Config) -> EpochIndex { } .into(); - let process_response = |response| async move { Ok::<_, Error>(response) }; - let response = sdk - .execute( - identity_request, - process_response, - RequestSettings::default(), - ) + .execute(identity_request, RequestSettings::default()) .await .expect("get identity");