Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 159 additions & 64 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,76 @@
use backon::{ExponentialBuilder, Retryable};
use dapi_grpc::mock::Mockable;
use dapi_grpc::tonic::async_trait;
use std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tracing::Instrument;

use crate::address_list::AddressListError;
use crate::connection_pool::ConnectionPool;
use crate::request_settings::AppliedRequestSettings;
use crate::{
transport::{TransportClient, TransportRequest},
Address, AddressList, CanRetry, RequestSettings,
};

/// Processing Error for tests and non-processing request execution
#[derive(Debug, thiserror::Error)]
#[error("dummy processing error")]
#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
pub struct DummyProcessingError;

impl CanRetry for DummyProcessingError {
fn can_retry(&self) -> bool {
false
}
}

impl Mockable for DummyProcessingError {}

/// General DAPI request error type.
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
pub enum DapiClientError<TE: Mockable> {
/// The error happened on transport layer
pub enum DapiClientError<TE, PE>
where
TE: Mockable,
PE: Mockable,
{
/// The error happened on transport layer.
#[error("transport error with {1}: {0}")]
Transport(
#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] TE,
Address,
),
/// Processing error from the closure.
#[error("processing error: {0}")]
Processing(#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] PE),
/// There are no valid DAPI addresses to use.
#[error("no available addresses to use")]
NoAvailableAddresses,
/// [AddressListError] errors
/// [AddressListError] errors.
#[error("address list error: {0}")]
AddressList(AddressListError),

#[cfg(feature = "mocks")]
#[error("mock error: {0}")]
/// Error happened in mock client
/// Error happened in mock client.
Mock(#[from] crate::mock::MockError),
}

impl<TE: CanRetry + Mockable> CanRetry for DapiClientError<TE> {
impl<TE, PE> CanRetry for DapiClientError<TE, PE>
where
TE: CanRetry + Mockable,
PE: CanRetry + Mockable,
{
fn can_retry(&self) -> bool {
use DapiClientError::*;
match self {
NoAvailableAddresses => false,
Transport(transport_error, _) => transport_error.can_retry(),
Processing(processing_error) => processing_error.can_retry(),
AddressList(_) => false,
#[cfg(feature = "mocks")]
Mock(_) => false,
Expand All @@ -61,7 +90,7 @@ struct TransportErrorData {
/// Serialization of [DapiClientError].
///
/// We need to do manual serialization because of the generic type parameter which doesn't support serde derive.
impl<TE: Mockable> Mockable for DapiClientError<TE> {
impl<TE: Mockable, PE: Mockable> Mockable for DapiClientError<TE, PE> {
#[cfg(feature = "mocks")]
fn mock_serialize(&self) -> Option<Vec<u8>> {
Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
Expand All @@ -81,11 +110,37 @@ pub trait DapiRequestExecutor {
&self,
request: R,
settings: RequestSettings,
) -> Result<R::Response, DapiClientError<<R::Client as TransportClient>::Error>>
) -> Result<
R::Response,
DapiClientError<<R::Client as TransportClient>::Error, DummyProcessingError>,
>
where
R: TransportRequest + Mockable,
R::Response: Mockable,
<R::Client as TransportClient>::Error: Mockable,
{
let process_response = move |response: R::Response| async move { Ok(response) };

self.execute_and_process(request, process_response, settings)
.await
}

/// Execute request using this DAPI client.
async fn execute_and_process<R, O, PE, F, Fut>(
&self,
request: R,
// TODO: Figure out how to make it optional. For example we can do two methods: execute and execute_and_process
process_response: F,
settings: RequestSettings,
) -> Result<O, DapiClientError<<R::Client as TransportClient>::Error, PE>>
where
R: TransportRequest + Mockable,
R::Response: Mockable,
<R::Client as TransportClient>::Error: Mockable;
<R::Client as TransportClient>::Error: Mockable,
PE: Error + Mockable + CanRetry + Send,
O: Debug + Send,
F: Fn(R::Response) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, PE>> + Send + 'static;
}

/// Access point to DAPI.
Expand All @@ -101,7 +156,7 @@ pub struct DapiClient {
impl DapiClient {
/// Initialize new [DapiClient] and optionally override default settings.
pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
// multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case
// Multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case.
let address_count = 3 * address_list.len();

Self {
Expand All @@ -121,16 +176,20 @@ impl DapiClient {

#[async_trait]
impl DapiRequestExecutor for DapiClient {
/// Execute the [DapiRequest](crate::DapiRequest).
async fn execute<R>(
async fn execute_and_process<R, O, PE, F, Fut>(
&self,
request: R,
process_response: F,
settings: RequestSettings,
) -> Result<R::Response, DapiClientError<<R::Client as TransportClient>::Error>>
) -> Result<O, DapiClientError<<R::Client as TransportClient>::Error, PE>>
where
R: TransportRequest + Mockable,
R::Response: Mockable,
R::Response: Mockable + Send,
<R::Client as TransportClient>::Error: Mockable,
PE: Error + Mockable + CanRetry + Send,
O: Debug + Send,
F: Fn(R::Response) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, PE>> + Send + 'static,
{
// Join settings of different sources to get final version of the settings for this execution:
let applied_settings = self
Expand All @@ -153,19 +212,25 @@ impl DapiRequestExecutor for DapiClient {
#[cfg(feature = "dump")]
let dump_request = request.clone();

let process_response = Arc::new(process_response);

// Setup DAPI request execution routine future. It's a closure that will be called
// more once to build new future on each retry.
let routine = move || {
// Clone `process_response` inside `routine`
let process_response = Arc::clone(&process_response);

// Try to get an address to initialize transport on:

let address_list = self
.address_list
.read()
.expect("can't get address list for read");

let address_result = address_list.get_live_address().cloned().ok_or(
DapiClientError::<<R::Client as TransportClient>::Error>::NoAvailableAddresses,
);
let address_result = address_list
.get_live_address()
.cloned()
.ok_or(DapiClientError::NoAvailableAddresses);

drop(address_list);

Expand Down Expand Up @@ -200,56 +265,40 @@ impl DapiRequestExecutor for DapiClient {
&applied_settings,
&pool,
)
.map_err(|e| {
DapiClientError::<<R::Client as TransportClient>::Error>::Transport(
e,
address.clone(),
)
})?;

let response = transport_request
.map_err(|e| DapiClientError::Transport(e, address.clone()))?;

let response_result = transport_request
.execute_transport(&mut transport_client, &applied_settings)
.await
.map_err(|e| {
DapiClientError::<<R::Client as TransportClient>::Error>::Transport(
e,
address.clone(),
)
});

match &response {
Ok(_) => {
// Unban the address if it was banned and node responded successfully this time
if address.is_banned() {
let mut address_list = self
.address_list
.write()
.expect("can't get address list for write");

address_list.unban_address(&address)
.map_err(DapiClientError::<<R::Client as TransportClient>::Error>::AddressList)?;
}

tracing::trace!(?response, "received {} response", response_name);
.map_err(|e| DapiClientError::Transport(e, address.clone()));

match response_result {
Ok(response) => {
// Processing response
let result = process_response(response)
.await
.map_err(DapiClientError::Processing);

update_ban_status(
&result,
Arc::clone(&self.address_list),
address,
applied_settings,
)?;

result
}
Err(error) => {
if !error.can_retry() {
if applied_settings.ban_failed_address {
let mut address_list = self
.address_list
.write()
.expect("can't get address list for write");

address_list.ban_address(&address)
.map_err(DapiClientError::<<R::Client as TransportClient>::Error>::AddressList)?;
}
} else {
tracing::trace!(?error, "received error");
}
error_result => {
update_ban_status(
&error_result,
Arc::clone(&self.address_list),
address,
applied_settings,
)?;

Err(error_result.unwrap_err())
}
};

response
}
}
};

Expand All @@ -275,9 +324,55 @@ impl DapiRequestExecutor for DapiClient {
}

// Dump request and response to disk if dump_dir is set:
#[cfg(feature = "dump")]
Self::dump_request_response(&dump_request, &result, dump_dir);
// TODO: implement
//#[cfg(feature = "dump")]
//Self::dump_request_response(&dump_request, &response, dump_dir);

result
}
}

fn update_ban_status<O, E, TE, PE>(
result: &Result<O, E>,
address_list: Arc<RwLock<AddressList>>,
address: Address,
applied_settings: AppliedRequestSettings,
) -> Result<(), DapiClientError<TE, PE>>
where
E: Error + CanRetry,
PE: Mockable,
TE: Mockable,
{
match result {
Ok(_) => {
// Unban the address if it was banned and node responded successfully this time
if address.is_banned() {
let mut list = address_list
.write()
.expect("can't get address list for write");

list.unban_address(&address)
.map_err(DapiClientError::AddressList)?;
}

// TODO: implement tracing
//tracing::trace!(?response, "received {} response", response_name);
}
Err(error) => {
if !error.can_retry() {
if applied_settings.ban_failed_address {
let mut list = address_list
.write()
.expect("can't get address list for write");

list.ban_address(&address)
.map_err(DapiClientError::AddressList)?;
}
} else {
tracing::trace!(?error, "received retryable error");
}
}
};

Ok(())
}
Loading