diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 468ca399749..f7b3cee7a5d 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -3,47 +3,76 @@ use backon::{ExponentialBuilder, Retryable}; use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; +use std::error::Error; use std::fmt::Debug; +use std::future::Future; use std::sync::{Arc, RwLock}; use std::time::Duration; use tracing::Instrument; use crate::address_list::AddressListError; use crate::connection_pool::ConnectionPool; +use crate::request_settings::AppliedRequestSettings; use crate::{ transport::{TransportClient, TransportRequest}, Address, AddressList, CanRetry, RequestSettings, }; +/// Processing Error for tests and non-processing request execution +#[derive(Debug, thiserror::Error)] +#[error("dummy processing error")] +#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] +pub struct DummyProcessingError; + +impl CanRetry for DummyProcessingError { + fn can_retry(&self) -> bool { + false + } +} + +impl Mockable for DummyProcessingError {} + /// General DAPI request error type. #[derive(Debug, thiserror::Error)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] -pub enum DapiClientError { - /// The error happened on transport layer +pub enum DapiClientError +where + TE: Mockable, + PE: Mockable, +{ + /// The error happened on transport layer. #[error("transport error with {1}: {0}")] Transport( #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] TE, Address, ), + /// Processing error from the closure. + #[error("processing error: {0}")] + Processing(#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] PE), /// There are no valid DAPI addresses to use. #[error("no available addresses to use")] NoAvailableAddresses, - /// [AddressListError] errors + /// [AddressListError] errors. #[error("address list error: {0}")] AddressList(AddressListError), #[cfg(feature = "mocks")] #[error("mock error: {0}")] - /// Error happened in mock client + /// Error happened in mock client. Mock(#[from] crate::mock::MockError), } -impl CanRetry for DapiClientError { +impl CanRetry for DapiClientError +where + TE: CanRetry + Mockable, + PE: CanRetry + Mockable, +{ fn can_retry(&self) -> bool { use DapiClientError::*; match self { NoAvailableAddresses => false, Transport(transport_error, _) => transport_error.can_retry(), + Processing(processing_error) => processing_error.can_retry(), AddressList(_) => false, #[cfg(feature = "mocks")] Mock(_) => false, @@ -61,7 +90,7 @@ struct TransportErrorData { /// Serialization of [DapiClientError]. /// /// We need to do manual serialization because of the generic type parameter which doesn't support serde derive. -impl Mockable for DapiClientError { +impl Mockable for DapiClientError { #[cfg(feature = "mocks")] fn mock_serialize(&self) -> Option> { Some(serde_json::to_vec(self).expect("serialize DAPI client error")) @@ -81,11 +110,37 @@ pub trait DapiRequestExecutor { &self, request: R, settings: RequestSettings, - ) -> Result::Error>> + ) -> Result< + R::Response, + DapiClientError<::Error, DummyProcessingError>, + > + where + R: TransportRequest + Mockable, + R::Response: Mockable, + ::Error: Mockable, + { + let process_response = move |response: R::Response| async move { Ok(response) }; + + self.execute_and_process(request, process_response, settings) + .await + } + + /// Execute request using this DAPI client. + async fn execute_and_process( + &self, + request: R, + // TODO: Figure out how to make it optional. For example we can do two methods: execute and execute_and_process + process_response: F, + settings: RequestSettings, + ) -> Result::Error, PE>> where R: TransportRequest + Mockable, R::Response: Mockable, - ::Error: Mockable; + ::Error: Mockable, + PE: Error + Mockable + CanRetry + Send, + O: Debug + Send, + F: Fn(R::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static; } /// Access point to DAPI. @@ -101,7 +156,7 @@ pub struct DapiClient { impl DapiClient { /// Initialize new [DapiClient] and optionally override default settings. pub fn new(address_list: AddressList, settings: RequestSettings) -> Self { - // multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case + // Multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case. let address_count = 3 * address_list.len(); Self { @@ -121,16 +176,20 @@ impl DapiClient { #[async_trait] impl DapiRequestExecutor for DapiClient { - /// Execute the [DapiRequest](crate::DapiRequest). - async fn execute( + async fn execute_and_process( &self, request: R, + process_response: F, settings: RequestSettings, - ) -> Result::Error>> + ) -> Result::Error, PE>> where R: TransportRequest + Mockable, - R::Response: Mockable, + R::Response: Mockable + Send, ::Error: Mockable, + PE: Error + Mockable + CanRetry + Send, + O: Debug + Send, + F: Fn(R::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, { // Join settings of different sources to get final version of the settings for this execution: let applied_settings = self @@ -153,9 +212,14 @@ impl DapiRequestExecutor for DapiClient { #[cfg(feature = "dump")] let dump_request = request.clone(); + let process_response = Arc::new(process_response); + // Setup DAPI request execution routine future. It's a closure that will be called // more once to build new future on each retry. let routine = move || { + // Clone `process_response` inside `routine` + let process_response = Arc::clone(&process_response); + // Try to get an address to initialize transport on: let address_list = self @@ -163,9 +227,10 @@ impl DapiRequestExecutor for DapiClient { .read() .expect("can't get address list for read"); - let address_result = address_list.get_live_address().cloned().ok_or( - DapiClientError::<::Error>::NoAvailableAddresses, - ); + let address_result = address_list + .get_live_address() + .cloned() + .ok_or(DapiClientError::NoAvailableAddresses); drop(address_list); @@ -200,56 +265,40 @@ impl DapiRequestExecutor for DapiClient { &applied_settings, &pool, ) - .map_err(|e| { - DapiClientError::<::Error>::Transport( - e, - address.clone(), - ) - })?; - - let response = transport_request + .map_err(|e| DapiClientError::Transport(e, address.clone()))?; + + let response_result = transport_request .execute_transport(&mut transport_client, &applied_settings) .await - .map_err(|e| { - DapiClientError::<::Error>::Transport( - e, - address.clone(), - ) - }); - - match &response { - Ok(_) => { - // Unban the address if it was banned and node responded successfully this time - if address.is_banned() { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list.unban_address(&address) - .map_err(DapiClientError::<::Error>::AddressList)?; - } - - tracing::trace!(?response, "received {} response", response_name); + .map_err(|e| DapiClientError::Transport(e, address.clone())); + + match response_result { + Ok(response) => { + // Processing response + let result = process_response(response) + .await + .map_err(DapiClientError::Processing); + + update_ban_status( + &result, + Arc::clone(&self.address_list), + address, + applied_settings, + )?; + + result } - Err(error) => { - if !error.can_retry() { - if applied_settings.ban_failed_address { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list.ban_address(&address) - .map_err(DapiClientError::<::Error>::AddressList)?; - } - } else { - tracing::trace!(?error, "received error"); - } + error_result => { + update_ban_status( + &error_result, + Arc::clone(&self.address_list), + address, + applied_settings, + )?; + + Err(error_result.unwrap_err()) } - }; - - response + } } }; @@ -275,9 +324,55 @@ impl DapiRequestExecutor for DapiClient { } // Dump request and response to disk if dump_dir is set: - #[cfg(feature = "dump")] - Self::dump_request_response(&dump_request, &result, dump_dir); + // TODO: implement + //#[cfg(feature = "dump")] + //Self::dump_request_response(&dump_request, &response, dump_dir); result } } + +fn update_ban_status( + result: &Result, + address_list: Arc>, + address: Address, + applied_settings: AppliedRequestSettings, +) -> Result<(), DapiClientError> +where + E: Error + CanRetry, + PE: Mockable, + TE: Mockable, +{ + match result { + Ok(_) => { + // Unban the address if it was banned and node responded successfully this time + if address.is_banned() { + let mut list = address_list + .write() + .expect("can't get address list for write"); + + list.unban_address(&address) + .map_err(DapiClientError::AddressList)?; + } + + // TODO: implement tracing + //tracing::trace!(?response, "received {} response", response_name); + } + Err(error) => { + if !error.can_retry() { + if applied_settings.ban_failed_address { + let mut list = address_list + .write() + .expect("can't get address list for write"); + + list.ban_address(&address) + .map_err(DapiClientError::AddressList)?; + } + } else { + tracing::trace!(?error, "received retryable error"); + } + } + }; + + Ok(()) +} diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 760d9ce2e78..2663802d80d 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -12,6 +12,7 @@ pub mod mock; mod request_settings; pub mod transport; +use crate::dapi_client::DummyProcessingError; pub use address_list::Address; pub use address_list::AddressList; pub use connection_pool::ConnectionPool; @@ -22,24 +23,28 @@ use dapi_grpc::mock::Mockable; pub use dump::DumpData; use futures::{future::BoxFuture, FutureExt}; pub use request_settings::RequestSettings; +use std::error::Error; +use std::fmt::Debug; +use std::future::Future; /// A DAPI request could be executed with an initialized [DapiClient]. /// /// # Examples /// ``` +/// use std::sync::Arc; /// use rs_dapi_client::{RequestSettings, AddressList, mock::MockDapiClient, DapiClientError, DapiRequest}; -/// use dapi_grpc::platform::v0::{self as proto}; +/// use dapi_grpc::platform::v0::{self as proto, GetIdentityResponse}; /// /// # let _ = async { /// let mut client = MockDapiClient::new(); /// let request: proto::GetIdentityRequest = proto::get_identity_request::GetIdentityRequestV0 { id: b"0".to_vec(), prove: true }.into(); /// let response = request.execute(&mut client, RequestSettings::default()).await?; -/// # Ok::<(), DapiClientError<_>>(()) +/// # Ok::<(), DapiClientError<_, _>>(()) /// # }; /// ``` pub trait DapiRequest { /// Response from DAPI for this specific request. - type Response; + type Response: Send + Debug + 'static; /// An error type for the transport this request uses. type TransportError: Mockable; @@ -48,8 +53,32 @@ pub trait DapiRequest { self, dapi_client: &'c D, settings: RequestSettings, - ) -> BoxFuture<'c, Result>> + ) -> BoxFuture< + 'c, + Result>, + > where + Self: 'c, + Self: Sized, + { + let process_response = move |response: Self::Response| async move { Ok(response) }; + + self.execute_and_process(dapi_client, process_response, settings) + } + + /// Executes the request. + fn execute_and_process<'c, D, O, PE, F, Fut>( + self, + dapi_client: &'c D, + process_response: F, + settings: RequestSettings, + ) -> BoxFuture<'c, Result>> + where + D: DapiRequestExecutor, + PE: Error + Mockable + CanRetry + Send + Sync + 'static, + O: Debug + Send + 'static, + F: Fn(Self::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, Self: 'c; } @@ -59,15 +88,23 @@ impl DapiRequest for T { type TransportError = ::Error; - fn execute<'c, D: DapiRequestExecutor>( + fn execute_and_process<'c, D, O, PE, F, Fut>( self, dapi_client: &'c D, + process_response: F, settings: RequestSettings, - ) -> BoxFuture<'c, Result>> + ) -> BoxFuture<'c, Result>> where + D: DapiRequestExecutor, + PE: Error + Mockable + CanRetry + Send + 'static, + O: Debug + Send + 'static, + F: Fn(Self::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, Self: 'c, { - dapi_client.execute(self, settings).boxed() + dapi_client + .execute_and_process(self, process_response, settings) + .boxed() } } diff --git a/packages/rs-dapi-client/src/mock.rs b/packages/rs-dapi-client/src/mock.rs index 813546686d4..2e75e8c1b32 100644 --- a/packages/rs-dapi-client/src/mock.rs +++ b/packages/rs-dapi-client/src/mock.rs @@ -11,14 +11,18 @@ //! //! See `tests/mock_dapi_client.rs` for an example. +use crate::dapi_client::DummyProcessingError; use crate::{ transport::{TransportClient, TransportRequest}, - DapiClientError, DapiRequestExecutor, RequestSettings, + CanRetry, DapiClientError, DapiRequestExecutor, RequestSettings, }; use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; use hex::ToHex; use sha2::Digest; +use std::error::Error; +use std::future::Future; +use std::sync::Arc; use std::{ any::type_name, collections::HashMap, @@ -37,7 +41,10 @@ pub struct MockDapiClient { /// Result of executing a mock request pub type MockResult = Result< ::Response, - DapiClientError<<::Client as TransportClient>::Error>, + DapiClientError< + <::Client as TransportClient>::Error, + DummyProcessingError, + >, >; impl MockDapiClient { @@ -105,14 +112,20 @@ impl MockDapiClient { #[async_trait] impl DapiRequestExecutor for MockDapiClient { - async fn execute( + async fn execute_and_process( &self, request: R, + process_response: F, _settings: RequestSettings, - ) -> MockResult + ) -> Result::Error, PE>> where - R: Mockable, + R: TransportRequest + Mockable, R::Response: Mockable, + ::Error: Mockable, + PE: Error + Mockable + CanRetry, + O: Debug, + F: Fn(R::Response) -> Fut + Send + Sync, + Fut: Future> + Send, { let (key, response) = self.expectations.get(&request); @@ -124,15 +137,17 @@ impl DapiRequestExecutor for MockDapiClient { "mock execute" ); - return if let Some(response) = response { - response + if let Some(response) = response { + process_response(response) + .await + .map_err(DapiClientError::Processing) } else { Err(MockError::MockExpectationNotFound(format!( "unexpected mock request with key {}, use MockDapiClient::expect(): {:?}", key, request )) .into()) - }; + } } } @@ -248,7 +263,7 @@ impl Expectations { /// Get the response for a given request. /// /// Returns `None` if the request has not been expected. - pub fn get(&self, request: &I) -> (Key, Option) { + pub fn get(&self, request: &I) -> (Key, Option) { let key = Key::new(request); let response = self.expectations.get(&key).and_then(|v| v.deserialize()); diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index 600189fc2fe..2bd12726452 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -10,6 +10,7 @@ use dapi_grpc::tonic::transport::Uri; pub use futures::future::BoxFuture; pub use grpc::{CoreGrpcClient, PlatformGrpcClient}; use std::any; +use std::error::Error; use std::fmt::Debug; /// Generic transport layer request. @@ -19,7 +20,7 @@ pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable { type Client: TransportClient; /// Transport layer response. - type Response: Mockable + Send + Debug; + type Response: Mockable + Send + Debug + 'static; /// Settings that will override [DapiClient](crate::DapiClient)'s ones each time the request is executed. const SETTINGS_OVERRIDES: RequestSettings; @@ -48,7 +49,7 @@ pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable { /// Generic way to create a transport client from provided [Uri]. pub trait TransportClient: Send + Sized { /// Error type for the specific client. - type Error: CanRetry + Send + Debug + Mockable; + type Error: Error + CanRetry + Send + Sync + Mockable; /// Build client using node's url. fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result; diff --git a/packages/rs-sdk/src/core/transaction.rs b/packages/rs-sdk/src/core/transaction.rs index 39dd85e562c..dbfb3e8125f 100644 --- a/packages/rs-sdk/src/core/transaction.rs +++ b/packages/rs-sdk/src/core/transaction.rs @@ -11,6 +11,7 @@ use dpp::dashcore::{Address, InstantLock, MerkleBlock, OutPoint, Transaction, Tx use dpp::identity::state_transition::asset_lock_proof::chain::ChainAssetLockProof; use dpp::identity::state_transition::asset_lock_proof::InstantAssetLockProof; use dpp::prelude::AssetLockProof; +use std::sync::Arc; use rs_dapi_client::{DapiRequestExecutor, RequestSettings}; use std::time::Duration; @@ -54,6 +55,7 @@ impl Sdk { from_block_hash, )), }; + self.execute(core_transactions_stream, RequestSettings::default()) .await .map_err(|e| Error::DapiClientError(e.to_string())) @@ -168,6 +170,7 @@ impl Sdk { // Wait until the block is chainlocked let mut core_chain_locked_height; + loop { let GetTransactionResponse { height, diff --git a/packages/rs-sdk/src/error.rs b/packages/rs-sdk/src/error.rs index e55bda4742e..a4cf514375c 100644 --- a/packages/rs-sdk/src/error.rs +++ b/packages/rs-sdk/src/error.rs @@ -73,8 +73,10 @@ pub enum Error { StaleNode(#[from] StaleNodeError), } -impl From> for Error { - fn from(value: DapiClientError) -> Self { +impl From> + for Error +{ + fn from(value: DapiClientError) -> Self { Self::DapiClientError(format!("{:?}", value)) } } @@ -91,6 +93,8 @@ impl CanRetry for Error { } } +impl Mockable for Error {} + /// Server returned stale metadata #[derive(Debug, thiserror::Error)] pub enum StaleNodeError { diff --git a/packages/rs-sdk/src/mock/sdk.rs b/packages/rs-sdk/src/mock/sdk.rs index bc9c3927716..a8d9c285e46 100644 --- a/packages/rs-sdk/src/mock/sdk.rs +++ b/packages/rs-sdk/src/mock/sdk.rs @@ -24,6 +24,7 @@ use rs_dapi_client::{ transport::TransportRequest, DapiClient, DumpData, }; +use std::fmt::Debug; use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; use tokio::sync::{Mutex, OwnedMutexGuard}; @@ -325,7 +326,9 @@ impl MockDashPlatformSdk { Response = <>::Request as TransportRequest>::Response, > + Sync + Send - + Default, + + Default + + Debug + + 'static, <>::Request as TransportRequest>::Response: Default, { let grpc_request = query.query(self.prove()).expect("query must be correct"); diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 109140bdb7c..5c2832a1232 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -8,6 +8,8 @@ //! It requires the implementing type to also implement [Debug] and [FromProof] //! traits. The associated [Fetch::Request]` type needs to implement [TransportRequest]. +use super::types::identity::IdentityRequest; +use super::DocumentQuery; use crate::mock::MockResponse; use crate::{error::Error, platform::query::Query, Sdk}; use dapi_grpc::platform::v0::{self as platform_proto, Proof, ResponseMetadata}; @@ -19,9 +21,7 @@ use dpp::{ use drive_proof_verifier::FromProof; use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; use std::fmt::Debug; - -use super::types::identity::IdentityRequest; -use super::DocumentQuery; +use std::sync::Arc; /// Trait implemented by objects that can be fetched from Platform. /// @@ -53,18 +53,22 @@ where Self: Sized + Debug + MockResponse + + Send + + Sync + FromProof< ::Request, Request = ::Request, Response = <::Request as DapiRequest>::Response, - >, + > + 'static, { /// Type of request used to fetch data from Platform. /// /// Most likely, one of the types defined in [`dapi_grpc::platform::v0`]. /// /// This type must implement [`TransportRequest`] and [`MockRequest`]. - type Request: TransportRequest + Into<::Request>>::Request>; + type Request: TransportRequest + + Into<::Request>>::Request> + + 'static; /// Fetch single object from Platform. /// @@ -120,17 +124,19 @@ where settings: Option, ) -> Result<(Option, ResponseMetadata), Error> { let request = query.query(sdk.prove())?; + let request_clone = request.clone(); + let sdk_arc = Arc::new(sdk.clone()); - let response = request - .clone() - .execute(sdk, settings.unwrap_or_default()) - .await?; + let process_response = move |response| { + let request = request_clone.clone(); + let sdk = Arc::clone(&sdk_arc); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + async move { sdk.parse_proof_with_metadata(request, response).await } + }; - let (object, response_metadata): (Option, ResponseMetadata) = - sdk.parse_proof_with_metadata(request, response).await?; + let (object, response_metadata): (Option, ResponseMetadata) = request + .execute_and_process(sdk, process_response, settings.unwrap_or_default()) + .await?; match object { Some(item) => Ok((item.into(), response_metadata)), @@ -169,16 +175,21 @@ where ) -> Result<(Option, ResponseMetadata, Proof), Error> { let request = query.query(sdk.prove())?; - let response = request - .clone() - .execute(sdk, settings.unwrap_or_default()) - .await?; + let request_clone = request.clone(); + let sdk_arc = Arc::new(sdk.clone()); + + let process_response = move |response| { + let request = request_clone.clone(); + let sdk = Arc::clone(&sdk_arc); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + async move { + sdk.parse_proof_with_metadata_and_proof(request, response) + .await + } + }; - let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = sdk - .parse_proof_with_metadata_and_proof(request, response) + let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = request + .execute_and_process(sdk, process_response, settings.unwrap_or_default()) .await?; match object { diff --git a/packages/rs-sdk/src/platform/fetch_many.rs b/packages/rs-sdk/src/platform/fetch_many.rs index 4ecf6e42cc0..249f0fe1a70 100644 --- a/packages/rs-sdk/src/platform/fetch_many.rs +++ b/packages/rs-sdk/src/platform/fetch_many.rs @@ -4,6 +4,7 @@ //! //! ## Traits //! - `[FetchMany]`: An async trait that fetches multiple items of a specific type from Platform. + use super::LimitQuery; use crate::{ error::Error, @@ -14,11 +15,10 @@ use crate::{ use dapi_grpc::platform::v0::{ GetContestedResourceIdentityVotesRequest, GetContestedResourceVoteStateRequest, GetContestedResourceVotersForIdentityRequest, GetContestedResourcesRequest, - GetDataContractsRequest, GetDocumentsResponse, GetEpochsInfoRequest, - GetEvonodesProposedEpochBlocksByIdsRequest, GetEvonodesProposedEpochBlocksByRangeRequest, - GetIdentitiesBalancesRequest, GetIdentityKeysRequest, GetPathElementsRequest, - GetProtocolVersionUpgradeStateRequest, GetProtocolVersionUpgradeVoteStatusRequest, - GetVotePollsByEndDateRequest, + GetDataContractsRequest, GetEpochsInfoRequest, GetEvonodesProposedEpochBlocksByIdsRequest, + GetEvonodesProposedEpochBlocksByRangeRequest, GetIdentitiesBalancesRequest, + GetIdentityKeysRequest, GetPathElementsRequest, GetProtocolVersionUpgradeStateRequest, + GetProtocolVersionUpgradeVoteStatusRequest, GetVotePollsByEndDateRequest, }; use dashcore_rpc::dashcore::ProTxHash; use dpp::data_contract::DataContract; @@ -41,6 +41,8 @@ use drive_proof_verifier::types::{ }; use drive_proof_verifier::{types::Documents, FromProof}; use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; +use std::fmt::Debug; +use std::sync::Arc; /// Fetch multiple objects from Platform. /// @@ -89,7 +91,10 @@ where Request = Self::Request, Response = <>::Request as TransportRequest>::Response, > + Send - + Default, + + Sync + + Default + + Debug + + 'static, { /// Type of request used to fetch multiple objects from Platform. /// @@ -97,7 +102,8 @@ where /// /// This type must implement [`TransportRequest`] and [`MockRequest`](crate::mock::MockRequest). type Request: TransportRequest - + Into<>::Request>>::Request>; + + Into<>::Request>>::Request> + + 'static; /// Fetch (or search) multiple objects on the Dash Platform /// @@ -143,20 +149,24 @@ where ) -> Result { let request = query.query(sdk.prove())?; - let response = request - .clone() - .execute(sdk, RequestSettings::default()) - .await?; + let request_clone = request.clone(); + let sdk_arc = Arc::new(sdk.clone()); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + let process_response = move |response| { + let request = request_clone.clone(); + let sdk = Arc::clone(&sdk_arc); - let object: O = sdk - .parse_proof::<>::Request, O>(request, response) - .await? - .unwrap_or_default(); + async move { + sdk.parse_proof::<>::Request, O>(request, response) + .await + .map(|x| x.unwrap_or_default()) + } + }; - Ok(object) + request + .execute_and_process(sdk, process_response, RequestSettings::default()) + .await + .map_err(Error::from) } /// Fetch multiple objects from Platform by their identifiers. @@ -185,7 +195,7 @@ where /// Fetch multiple objects from Platform with limit. /// - /// Fetches up to `limit` objects matching the `query`. + /// Fetches up to `limit` objects matching the `query`. /// See [FetchMany] and [FetchMany::fetch_many()] for more detailed documentation. /// /// ## Parameters @@ -231,19 +241,24 @@ impl FetchMany for Document { ) -> Result { let document_query: DocumentQuery = query.query(sdk.prove())?; - let request = document_query.clone(); - let response: GetDocumentsResponse = - request.execute(sdk, RequestSettings::default()).await?; + let document_query_clone = document_query.clone(); + let sdk_arc = Arc::new(sdk.clone()); - tracing::trace!(request=?document_query, response=?response, "fetch multiple documents"); + let process_response = move |response| { + let document_query = document_query_clone.clone(); + let sdk = Arc::clone(&sdk_arc); - // let object: Option> = sdk - let documents: Documents = sdk - .parse_proof::(document_query, response) - .await? - .unwrap_or_default(); + async move { + sdk.parse_proof::(document_query, response) + .await + .map(|x| x.unwrap_or_default()) + } + }; - Ok(documents) + document_query + .execute_and_process(sdk, process_response, RequestSettings::default()) + .await + .map_err(Error::from) } } diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 2b9268f2751..22ba4b6ac65 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -28,10 +28,11 @@ pub use rs_dapi_client::AddressList; pub use rs_dapi_client::RequestSettings; use rs_dapi_client::{ transport::{TransportClient, TransportRequest}, - DapiClient, DapiClientError, DapiRequestExecutor, + CanRetry, DapiClient, DapiClientError, DapiRequestExecutor, }; use std::collections::btree_map::Entry; use std::fmt::Debug; +use std::future::Future; #[cfg(feature = "mocks")] use std::num::NonZeroUsize; #[cfg(feature = "mocks")] @@ -83,7 +84,7 @@ pub type LastQueryTimestamp = u64; /// /// See tests/ for examples of using the SDK. pub struct Sdk { - /// The network that the sdk is configured for (Dash (mainnet), Testnet, Devnet, Regtest) + /// The network that the sdk is configured for (Dash (mainnet), Testnet, Devnet, Regtest) pub network: Network, inner: SdkInstance, /// Use proofs when retrieving data from Platform. @@ -668,17 +669,32 @@ fn verify_metadata_height( #[async_trait::async_trait] impl DapiRequestExecutor for Sdk { - async fn execute( + async fn execute_and_process( &self, request: R, + process_response: F, settings: RequestSettings, - ) -> Result::Error>> { + ) -> Result::Error, PE>> + where + R: TransportRequest + Mockable, + R::Response: Mockable, + ::Error: Mockable, + PE: std::error::Error + Mockable + CanRetry + Send, + O: Debug + Send, + F: Fn(R::Response) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { match self.inner { - SdkInstance::Dapi { ref dapi, .. } => dapi.execute(request, settings).await, + SdkInstance::Dapi { ref dapi, .. } => { + dapi.execute_and_process(request, process_response, settings) + .await + } #[cfg(feature = "mocks")] SdkInstance::Mock { ref dapi, .. } => { let dapi_guard = dapi.lock().await; - dapi_guard.execute(request, settings).await + dapi_guard + .execute_and_process(request, process_response, settings) + .await } } } diff --git a/packages/rs-sdk/tests/fetch/epoch.rs b/packages/rs-sdk/tests/fetch/epoch.rs index 0c8f429d7fb..e6805ed700b 100644 --- a/packages/rs-sdk/tests/fetch/epoch.rs +++ b/packages/rs-sdk/tests/fetch/epoch.rs @@ -8,7 +8,7 @@ use dash_sdk::{ fetch_current_no_parameters::FetchCurrent, Fetch, FetchMany, LimitQuery, DEFAULT_EPOCH_QUERY_LIMIT, }, - Sdk, + Error, Sdk, }; use dpp::block::epoch::EpochIndex; use dpp::block::extended_epoch_info::v0::ExtendedEpochInfoV0Getters;