From c16f4b5591909ca662c11c50cdecd801bf3006c6 Mon Sep 17 00:00:00 2001 From: "Jamil Lambert, PhD" Date: Mon, 20 Apr 2026 18:34:39 +0100 Subject: [PATCH 1/5] Round timeouts to whole seconds Timeouts must be whole seconds for some HTTP clients like bitreq. Round timeouts to whole seconds to avoid issues with these clients. --- jsonrpc/src/http/bitreq_http.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/jsonrpc/src/http/bitreq_http.rs b/jsonrpc/src/http/bitreq_http.rs index 20903bfc5..717508ed1 100644 --- a/jsonrpc/src/http/bitreq_http.rs +++ b/jsonrpc/src/http/bitreq_http.rs @@ -51,17 +51,29 @@ impl BitreqHttpTransport { /// Returns a builder for [`BitreqHttpTransport`]. pub fn builder() -> Builder { Builder::new() } + /// Returns the timeout in whole seconds, rounding positive sub-second values up to one. + fn timeout_secs(&self) -> u64 { + let secs = self.timeout.as_secs(); + if secs == 0 && self.timeout > Duration::from_secs(0) { + 1 + } else { + secs + } + } + fn request(&self, req: impl serde::Serialize) -> Result where R: for<'a> serde::de::Deserialize<'a>, { + let timeout_secs = self.timeout_secs(); + let req = match &self.basic_auth { Some(auth) => bitreq::Request::new(bitreq::Method::Post, &self.url) - .with_timeout(self.timeout.as_secs()) + .with_timeout(timeout_secs) .with_header("Authorization", auth) .with_json(&req)?, None => bitreq::Request::new(bitreq::Method::Post, &self.url) - .with_timeout(self.timeout.as_secs()) + .with_timeout(timeout_secs) .with_json(&req)?, }; From e62b04409a91c1535c96982604d67fadb259e680 Mon Sep 17 00:00:00 2001 From: "Jamil Lambert, PhD" Date: Fri, 6 Feb 2026 16:20:46 +0000 Subject: [PATCH 2/5] Copy jsonrpc client and bitreq_http for async. Code copy only to make it easier in the next patch to see what the changes are for async. --- jsonrpc/src/client_async.rs | 251 ++++++++++++++++++++++ jsonrpc/src/http/bitreq_http_async.rs | 286 ++++++++++++++++++++++++++ 2 files changed, 537 insertions(+) create mode 100644 jsonrpc/src/client_async.rs create mode 100644 jsonrpc/src/http/bitreq_http_async.rs diff --git a/jsonrpc/src/client_async.rs b/jsonrpc/src/client_async.rs new file mode 100644 index 000000000..e9cb0ca67 --- /dev/null +++ b/jsonrpc/src/client_async.rs @@ -0,0 +1,251 @@ +// SPDX-License-Identifier: CC0-1.0 + +//! # Client support +//! +//! Support for connecting to JSONRPC servers over HTTP, sending requests, +//! and parsing responses. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::atomic; + +use serde_json::value::RawValue; +use serde_json::Value; + +use crate::error::Error; +use crate::{Request, Response}; + +/// An interface for a transport over which to use the JSONRPC protocol. +pub trait Transport: Send + Sync + 'static { + /// Sends an RPC request over the transport. + fn send_request(&self, _: Request) -> Result; + /// Sends a batch of RPC requests over the transport. + fn send_batch(&self, _: &[Request]) -> Result, Error>; + /// Formats the target of this transport. I.e. the URL/socket/... + fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result; +} + +/// A JSON-RPC client. +/// +/// Creates a new Client using one of the transport-specific constructors e.g., +/// [`Client::simple_http`] for a bare-minimum HTTP transport. +pub struct Client { + pub(crate) transport: Box, + nonce: atomic::AtomicUsize, +} + +impl Client { + /// Creates a new client with the given transport. + pub fn with_transport(transport: T) -> Client { + Client { transport: Box::new(transport), nonce: atomic::AtomicUsize::new(1) } + } + + /// Builds a request. + /// + /// To construct the arguments, one can use one of the shorthand methods. + /// [`crate::arg`] or [`crate::try_arg`]. + pub fn build_request<'a>(&self, method: &'a str, params: Option<&'a RawValue>) -> Request<'a> { + let nonce = self.nonce.fetch_add(1, atomic::Ordering::Relaxed); + Request { method, params, id: serde_json::Value::from(nonce), jsonrpc: Some("2.0") } + } + + /// Sends a request to a client. + pub fn send_request(&self, request: Request) -> Result { + self.transport.send_request(request) + } + + /// Sends a batch of requests to the client. + /// + /// Note that the requests need to have valid IDs, so it is advised to create the requests + /// with [`Client::build_request`]. + /// + /// # Returns + /// + /// The return vector holds the response for the request at the corresponding index. If no + /// response was provided, it's [`None`]. + pub fn send_batch(&self, requests: &[Request]) -> Result>, Error> { + if requests.is_empty() { + return Err(Error::EmptyBatch); + } + + // If the request body is invalid JSON, the response is a single response object. + // We ignore this case since we are confident we are producing valid JSON. + let responses = self.transport.send_batch(requests)?; + if responses.len() > requests.len() { + return Err(Error::WrongBatchResponseSize); + } + + //TODO(stevenroose) check if the server preserved order to avoid doing the mapping + + // First index responses by ID and catch duplicate IDs. + let mut by_id = HashMap::with_capacity(requests.len()); + for resp in responses.into_iter() { + let id = HashableValue(Cow::Owned(resp.id.clone())); + if let Some(dup) = by_id.insert(id, resp) { + return Err(Error::BatchDuplicateResponseId(dup.id)); + } + } + // Match responses to the requests. + let results = + requests.iter().map(|r| by_id.remove(&HashableValue(Cow::Borrowed(&r.id)))).collect(); + + // Since we're also just producing the first duplicate ID, we can also just produce the + // first incorrect ID in case there are multiple. + if let Some(id) = by_id.keys().next() { + return Err(Error::WrongBatchResponseId((*id.0).clone())); + } + + Ok(results) + } + + /// Makes a request and deserializes the response. + /// + /// To construct the arguments, one can use one of the shorthand methods + /// [`crate::arg`] or [`crate::try_arg`]. + pub fn call serde::de::Deserialize<'a>>( + &self, + method: &str, + args: Option<&RawValue>, + ) -> Result { + let request = self.build_request(method, args); + let id = request.id.clone(); + + let response = self.send_request(request)?; + if response.jsonrpc.is_some() && response.jsonrpc != Some(From::from("2.0")) { + return Err(Error::VersionMismatch); + } + if response.id != id { + return Err(Error::NonceMismatch); + } + + response.result() + } +} + +impl fmt::Debug for crate::Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "jsonrpc::Client(")?; + self.transport.fmt_target(f)?; + write!(f, ")") + } +} + +impl From for Client { + fn from(t: T) -> Client { Client::with_transport(t) } +} + +/// Newtype around `Value` which allows hashing for use as hashmap keys, +/// this is needed for batch requests. +/// +/// The reason `Value` does not support `Hash` or `Eq` by itself +/// is that it supports `f64` values; but for batch requests we +/// will only be hashing the "id" field of the request/response +/// pair, which should never need decimal precision and therefore +/// never use `f64`. +#[derive(Clone, PartialEq, Debug)] +struct HashableValue<'a>(pub Cow<'a, Value>); + +impl Eq for HashableValue<'_> {} + +impl Hash for HashableValue<'_> { + fn hash(&self, state: &mut H) { + match *self.0.as_ref() { + Value::Null => "null".hash(state), + Value::Bool(false) => "false".hash(state), + Value::Bool(true) => "true".hash(state), + Value::Number(ref n) => { + "number".hash(state); + if let Some(n) = n.as_i64() { + n.hash(state); + } else if let Some(n) = n.as_u64() { + n.hash(state); + } else { + n.to_string().hash(state); + } + } + Value::String(ref s) => { + "string".hash(state); + s.hash(state); + } + Value::Array(ref v) => { + "array".hash(state); + v.len().hash(state); + for obj in v { + HashableValue(Cow::Borrowed(obj)).hash(state); + } + } + Value::Object(ref m) => { + "object".hash(state); + m.len().hash(state); + for (key, val) in m { + key.hash(state); + HashableValue(Cow::Borrowed(val)).hash(state); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::borrow::Cow; + use std::collections::HashSet; + use std::str::FromStr; + use std::sync; + + use super::*; + + struct DummyTransport; + impl Transport for DummyTransport { + fn send_request(&self, _: Request) -> Result { Err(Error::NonceMismatch) } + fn send_batch(&self, _: &[Request]) -> Result, Error> { Ok(vec![]) } + fn fmt_target(&self, _: &mut fmt::Formatter) -> fmt::Result { Ok(()) } + } + + #[test] + fn sanity() { + let client = Client::with_transport(DummyTransport); + assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 1); + let req1 = client.build_request("test", None); + assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 2); + let req2 = client.build_request("test", None); + assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 3); + assert!(req1.id != req2.id); + } + + #[test] + fn hash_value() { + let val = HashableValue(Cow::Owned(Value::from_str("null").unwrap())); + let t = HashableValue(Cow::Owned(Value::from_str("true").unwrap())); + let f = HashableValue(Cow::Owned(Value::from_str("false").unwrap())); + let ns = + HashableValue(Cow::Owned(Value::from_str("[0, -0, 123.4567, -100000000]").unwrap())); + let m = + HashableValue(Cow::Owned(Value::from_str("{ \"field\": 0, \"field\": -0 }").unwrap())); + + let mut coll = HashSet::new(); + + assert!(!coll.contains(&val)); + coll.insert(val.clone()); + assert!(coll.contains(&val)); + + assert!(!coll.contains(&t)); + assert!(!coll.contains(&f)); + coll.insert(t.clone()); + assert!(coll.contains(&t)); + assert!(!coll.contains(&f)); + coll.insert(f.clone()); + assert!(coll.contains(&t)); + assert!(coll.contains(&f)); + + assert!(!coll.contains(&ns)); + coll.insert(ns.clone()); + assert!(coll.contains(&ns)); + + assert!(!coll.contains(&m)); + coll.insert(m.clone()); + assert!(coll.contains(&m)); + } +} diff --git a/jsonrpc/src/http/bitreq_http_async.rs b/jsonrpc/src/http/bitreq_http_async.rs new file mode 100644 index 000000000..717508ed1 --- /dev/null +++ b/jsonrpc/src/http/bitreq_http_async.rs @@ -0,0 +1,286 @@ +//! This module implements the [`crate::client::Transport`] trait using [`bitreq`] +//! as the underlying HTTP transport. +//! +//! [bitreq]: + +#[cfg(jsonrpc_fuzz)] +use std::io::{self, Read, Write}; +#[cfg(jsonrpc_fuzz)] +use std::sync::Mutex; +use std::time::Duration; +use std::{error, fmt}; + +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::Engine; + +use crate::client::Transport; +use crate::{Request, Response}; + +const DEFAULT_URL: &str = "http://localhost"; +const DEFAULT_PORT: u16 = 8332; // the default RPC port for bitcoind. +#[cfg(not(jsonrpc_fuzz))] +const DEFAULT_TIMEOUT_SECONDS: u64 = 15; +#[cfg(jsonrpc_fuzz)] +const DEFAULT_TIMEOUT_SECONDS: u64 = 1; + +/// An HTTP transport that uses [`bitreq`] and is useful for running a bitcoind RPC client. +#[derive(Clone, Debug)] +pub struct BitreqHttpTransport { + /// URL of the RPC server. + url: String, + /// Timeout only supports second granularity. + timeout: Duration, + /// The value of the `Authorization` HTTP header, i.e., a base64 encoding of 'user:password'. + basic_auth: Option, +} + +impl Default for BitreqHttpTransport { + fn default() -> Self { + BitreqHttpTransport { + url: format!("{}:{}", DEFAULT_URL, DEFAULT_PORT), + timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECONDS), + basic_auth: None, + } + } +} + +impl BitreqHttpTransport { + /// Constructs a new [`BitreqHttpTransport`] with default parameters. + pub fn new() -> Self { BitreqHttpTransport::default() } + + /// Returns a builder for [`BitreqHttpTransport`]. + pub fn builder() -> Builder { Builder::new() } + + /// Returns the timeout in whole seconds, rounding positive sub-second values up to one. + fn timeout_secs(&self) -> u64 { + let secs = self.timeout.as_secs(); + if secs == 0 && self.timeout > Duration::from_secs(0) { + 1 + } else { + secs + } + } + + fn request(&self, req: impl serde::Serialize) -> Result + where + R: for<'a> serde::de::Deserialize<'a>, + { + let timeout_secs = self.timeout_secs(); + + let req = match &self.basic_auth { + Some(auth) => bitreq::Request::new(bitreq::Method::Post, &self.url) + .with_timeout(timeout_secs) + .with_header("Authorization", auth) + .with_json(&req)?, + None => bitreq::Request::new(bitreq::Method::Post, &self.url) + .with_timeout(timeout_secs) + .with_json(&req)?, + }; + + // Send the request and parse the response. If the response is an error that does not + // contain valid JSON in its body (for instance if the bitcoind HTTP server work queue + // depth is exceeded), return the raw HTTP error so users can match against it. + let resp = req.send()?; + match resp.json() { + Ok(json) => Ok(json), + Err(bitreq_err) => + if resp.status_code != 200 { + Err(Error::Http(HttpError { + status_code: resp.status_code, + body: resp.as_str().unwrap_or("").to_string(), + })) + } else { + Err(Error::Bitreq(bitreq_err)) + }, + } + } +} + +impl Transport for BitreqHttpTransport { + fn send_request(&self, req: Request) -> Result { + Ok(self.request(req)?) + } + + fn send_batch(&self, reqs: &[Request]) -> Result, crate::Error> { + Ok(self.request(reqs)?) + } + + fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.url) } +} + +/// Builder for simple bitcoind [`BitreqHttpTransport`]. +#[derive(Clone, Debug)] +pub struct Builder { + tp: BitreqHttpTransport, +} + +impl Builder { + /// Constructs a new [`Builder`] with default configuration and the URL to use. + pub fn new() -> Builder { Builder { tp: BitreqHttpTransport::new() } } + + /// Sets the timeout after which requests will abort if they aren't finished. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.tp.timeout = timeout; + self + } + + /// Sets the URL of the server to the transport. + #[allow(clippy::assigning_clones)] // clone_into is only available in Rust 1.63 + pub fn url(mut self, url: &str) -> Result { + self.tp.url = url.to_owned(); + Ok(self) + } + + /// Adds authentication information to the transport. + pub fn basic_auth(mut self, user: String, pass: Option) -> Self { + let mut s = user; + s.push(':'); + if let Some(ref pass) = pass { + s.push_str(pass.as_ref()); + } + self.tp.basic_auth = Some(format!("Basic {}", &BASE64.encode(s.as_bytes()))); + self + } + + /// Adds authentication information to the transport using a cookie string ('user:pass'). + /// + /// Does no checking on the format of the cookie string, just base64 encodes whatever is passed in. + /// + /// # Examples + /// + /// ```no_run + /// # use jsonrpc::bitreq_http::BitreqHttpTransport; + /// # use std::fs::{self, File}; + /// # use std::path::Path; + /// # let cookie_file = Path::new("~/.bitcoind/.cookie"); + /// let mut file = File::open(cookie_file).expect("couldn't open cookie file"); + /// let mut cookie = String::new(); + /// fs::read_to_string(&mut cookie).expect("couldn't read cookie file"); + /// let client = BitreqHttpTransport::builder().cookie_auth(cookie); + /// ``` + pub fn cookie_auth>(mut self, cookie: S) -> Self { + self.tp.basic_auth = Some(format!("Basic {}", &BASE64.encode(cookie.as_ref().as_bytes()))); + self + } + + /// Builds the final [`BitreqHttpTransport`]. + pub fn build(self) -> BitreqHttpTransport { self.tp } +} + +impl Default for Builder { + fn default() -> Self { Builder::new() } +} + +/// An HTTP error. +#[derive(Debug)] +pub struct HttpError { + /// Status code of the error response. + pub status_code: i32, + /// Raw body of the error response. + pub body: String, +} + +impl fmt::Display for HttpError { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "status: {}, body: {}", self.status_code, self.body) + } +} + +impl error::Error for HttpError {} + +/// Error that can happen when sending requests. +/// +/// In case of error, a JSON error is returned if the body of the response could be parsed as such. +/// Otherwise, an HTTP error is returned containing the status code and the raw body. +#[non_exhaustive] +#[derive(Debug)] +pub enum Error { + /// JSON parsing error. + Json(serde_json::Error), + /// Bitreq error. + Bitreq(bitreq::Error), + /// HTTP error that does not contain valid JSON as body. + Http(HttpError), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + Error::Json(ref e) => write!(f, "parsing JSON failed: {}", e), + Error::Bitreq(ref e) => write!(f, "bitreq: {}", e), + Error::Http(ref e) => write!(f, "http ({})", e), + } + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + use self::Error::*; + + match *self { + Json(ref e) => Some(e), + Bitreq(ref e) => Some(e), + Http(ref e) => Some(e), + } + } +} + +impl From for Error { + fn from(e: serde_json::Error) -> Self { Error::Json(e) } +} + +impl From for Error { + fn from(e: bitreq::Error) -> Self { Error::Bitreq(e) } +} + +impl From for crate::Error { + fn from(e: Error) -> crate::Error { + match e { + Error::Json(e) => crate::Error::Json(e), + e => crate::Error::Transport(Box::new(e)), + } + } +} + +/// Global mutex used by the fuzzing harness to inject data into the read end of the TCP stream. +#[cfg(jsonrpc_fuzz)] +pub static FUZZ_TCP_SOCK: Mutex>>> = Mutex::new(None); + +#[cfg(jsonrpc_fuzz)] +#[derive(Clone, Debug)] +struct TcpStream; + +#[cfg(jsonrpc_fuzz)] +mod impls { + use super::*; + + impl Read for TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match *FUZZ_TCP_SOCK.lock().unwrap() { + Some(ref mut cursor) => io::Read::read(cursor, buf), + None => Ok(0), + } + } + } + impl Write for TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result { io::sink().write(buf) } + fn flush(&mut self) -> io::Result<()> { Ok(()) } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Client; + + #[test] + fn construct() { + let tp = Builder::new() + .timeout(Duration::from_millis(100)) + .url("http://localhost:22") + .unwrap() + .basic_auth("user".to_string(), None) + .build(); + let _ = Client::with_transport(tp); + } +} From 7d556f98ec41a51cd4680f13b3c0a38c3057db5f Mon Sep 17 00:00:00 2001 From: "Jamil Lambert, PhD" Date: Fri, 6 Feb 2026 17:38:47 +0000 Subject: [PATCH 3/5] Remove fuzz related sections Remove code related to fuzzing from the new async files. Fuzzing of the async client can be added back later if needed. --- jsonrpc/src/http/bitreq_http_async.rs | 33 --------------------------- 1 file changed, 33 deletions(-) diff --git a/jsonrpc/src/http/bitreq_http_async.rs b/jsonrpc/src/http/bitreq_http_async.rs index 717508ed1..84cfef351 100644 --- a/jsonrpc/src/http/bitreq_http_async.rs +++ b/jsonrpc/src/http/bitreq_http_async.rs @@ -3,10 +3,6 @@ //! //! [bitreq]: -#[cfg(jsonrpc_fuzz)] -use std::io::{self, Read, Write}; -#[cfg(jsonrpc_fuzz)] -use std::sync::Mutex; use std::time::Duration; use std::{error, fmt}; @@ -18,10 +14,7 @@ use crate::{Request, Response}; const DEFAULT_URL: &str = "http://localhost"; const DEFAULT_PORT: u16 = 8332; // the default RPC port for bitcoind. -#[cfg(not(jsonrpc_fuzz))] const DEFAULT_TIMEOUT_SECONDS: u64 = 15; -#[cfg(jsonrpc_fuzz)] -const DEFAULT_TIMEOUT_SECONDS: u64 = 1; /// An HTTP transport that uses [`bitreq`] and is useful for running a bitcoind RPC client. #[derive(Clone, Debug)] @@ -242,32 +235,6 @@ impl From for crate::Error { } } -/// Global mutex used by the fuzzing harness to inject data into the read end of the TCP stream. -#[cfg(jsonrpc_fuzz)] -pub static FUZZ_TCP_SOCK: Mutex>>> = Mutex::new(None); - -#[cfg(jsonrpc_fuzz)] -#[derive(Clone, Debug)] -struct TcpStream; - -#[cfg(jsonrpc_fuzz)] -mod impls { - use super::*; - - impl Read for TcpStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match *FUZZ_TCP_SOCK.lock().unwrap() { - Some(ref mut cursor) => io::Read::read(cursor, buf), - None => Ok(0), - } - } - } - impl Write for TcpStream { - fn write(&mut self, buf: &[u8]) -> io::Result { io::sink().write(buf) } - fn flush(&mut self) -> io::Result<()> { Ok(()) } - } -} - #[cfg(test)] mod tests { use super::*; From 7302ebd383bbdc3e931ceedac615c58342c021a2 Mon Sep 17 00:00:00 2001 From: "Jamil Lambert, PhD" Date: Fri, 17 Apr 2026 09:42:10 +0100 Subject: [PATCH 4/5] Change Debug impl to local Client In preparation for adding another async client change the Debug impl to refer to the local sync Client so that there is no conflict when adding the second async Client struct. --- jsonrpc/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jsonrpc/src/client.rs b/jsonrpc/src/client.rs index e9cb0ca67..6ffdd5843 100644 --- a/jsonrpc/src/client.rs +++ b/jsonrpc/src/client.rs @@ -124,7 +124,7 @@ impl Client { } } -impl fmt::Debug for crate::Client { +impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "jsonrpc::Client(")?; self.transport.fmt_target(f)?; From 0aa780371fb965cdf6c0f7fefcf469f4e44f7d35 Mon Sep 17 00:00:00 2001 From: "Jamil Lambert, PhD" Date: Fri, 6 Feb 2026 15:46:02 +0000 Subject: [PATCH 5/5] Add async support to jsonrpc Update the lockfiles because of the new dev dependency on futures. --- Cargo-minimal.lock | 95 +++++++++++++++++++++++++++ Cargo-recent.lock | 95 +++++++++++++++++++++++++++ jsonrpc/Cargo.toml | 7 ++ jsonrpc/src/client_async.rs | 65 +++++++++++++----- jsonrpc/src/http/bitreq_http_async.rs | 40 +++++++---- jsonrpc/src/http/mod.rs | 3 + jsonrpc/src/lib.rs | 4 ++ 7 files changed, 279 insertions(+), 30 deletions(-) diff --git a/Cargo-minimal.lock b/Cargo-minimal.lock index d86144e06..134ffe154 100644 --- a/Cargo-minimal.lock +++ b/Cargo-minimal.lock @@ -361,6 +361,94 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -472,6 +560,7 @@ version = "0.19.0" dependencies = [ "base64 0.22.1", "bitreq", + "futures", "serde", "serde_json", "socks", @@ -959,6 +1048,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/Cargo-recent.lock b/Cargo-recent.lock index d86144e06..134ffe154 100644 --- a/Cargo-recent.lock +++ b/Cargo-recent.lock @@ -361,6 +361,94 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -472,6 +560,7 @@ version = "0.19.0" dependencies = [ "base64 0.22.1", "bitreq", + "futures", "serde", "serde_json", "socks", @@ -959,6 +1048,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 7c06709e7..786d75705 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -22,6 +22,10 @@ default = [ "simple_http", "simple_tcp" ] simple_http = [ "base64" ] # A transport that uses `bitreq` as the HTTP client. bitreq_http = [ "base64", "bitreq" ] +# A transport that uses `bitreq` as the async HTTP client. +bitreq_http_async = [ "base64", "bitreq", "bitreq/async", "client_async" ] +# An async JSON-RPC client implementation. +client_async = [] # Basic transport over a raw TcpListener simple_tcp = [] # Basic transport over a raw UnixStream @@ -37,5 +41,8 @@ base64 = { version = "0.22.1", optional = true } bitreq = { version = "0.3.0", path = "../bitreq", features = ["json-using-serde"], optional = true } socks = { version = "0.3.4", optional = true} +[dev-dependencies] +futures = "0.3" + [lints.rust] unexpected_cfgs = { level = "deny", check-cfg = ['cfg(jsonrpc_fuzz)'] } diff --git a/jsonrpc/src/client_async.rs b/jsonrpc/src/client_async.rs index e9cb0ca67..423fee427 100644 --- a/jsonrpc/src/client_async.rs +++ b/jsonrpc/src/client_async.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: CC0-1.0 -//! # Client support +//! # Async client support //! //! Support for connecting to JSONRPC servers over HTTP, sending requests, //! and parsing responses. @@ -8,7 +8,9 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt; +use std::future::Future; use std::hash::{Hash, Hasher}; +use std::pin::Pin; use std::sync::atomic; use serde_json::value::RawValue; @@ -17,20 +19,27 @@ use serde_json::Value; use crate::error::Error; use crate::{Request, Response}; -/// An interface for a transport over which to use the JSONRPC protocol. +const JSONRPC_VERSION: &str = "2.0"; + +/// Boxed future type used by async transports. +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +/// An interface for an async transport over which to use the JSONRPC protocol. pub trait Transport: Send + Sync + 'static { /// Sends an RPC request over the transport. - fn send_request(&self, _: Request) -> Result; + fn send_request<'a>(&'a self, req: Request<'a>) -> BoxFuture<'a, Result>; /// Sends a batch of RPC requests over the transport. - fn send_batch(&self, _: &[Request]) -> Result, Error>; + fn send_batch<'a>( + &'a self, + reqs: &'a [Request<'a>], + ) -> BoxFuture<'a, Result, Error>>; /// Formats the target of this transport. I.e. the URL/socket/... fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result; } -/// A JSON-RPC client. +/// An async JSON-RPC client. /// -/// Creates a new Client using one of the transport-specific constructors e.g., -/// [`Client::simple_http`] for a bare-minimum HTTP transport. +/// Creates a new Client using one of the transport-specific constructors. pub struct Client { pub(crate) transport: Box, nonce: atomic::AtomicUsize, @@ -48,11 +57,19 @@ impl Client { /// [`crate::arg`] or [`crate::try_arg`]. pub fn build_request<'a>(&self, method: &'a str, params: Option<&'a RawValue>) -> Request<'a> { let nonce = self.nonce.fetch_add(1, atomic::Ordering::Relaxed); - Request { method, params, id: serde_json::Value::from(nonce), jsonrpc: Some("2.0") } + Request { + method, + params, + id: serde_json::Value::from(nonce), + jsonrpc: Some(JSONRPC_VERSION), + } } /// Sends a request to a client. - pub fn send_request(&self, request: Request) -> Result { + pub fn send_request<'a>( + &'a self, + request: Request<'a>, + ) -> BoxFuture<'a, Result> { self.transport.send_request(request) } @@ -65,14 +82,17 @@ impl Client { /// /// The return vector holds the response for the request at the corresponding index. If no /// response was provided, it's [`None`]. - pub fn send_batch(&self, requests: &[Request]) -> Result>, Error> { + pub async fn send_batch( + &self, + requests: &[Request<'_>], + ) -> Result>, Error> { if requests.is_empty() { return Err(Error::EmptyBatch); } // If the request body is invalid JSON, the response is a single response object. // We ignore this case since we are confident we are producing valid JSON. - let responses = self.transport.send_batch(requests)?; + let responses = self.transport.send_batch(requests).await?; if responses.len() > requests.len() { return Err(Error::WrongBatchResponseSize); } @@ -104,7 +124,7 @@ impl Client { /// /// To construct the arguments, one can use one of the shorthand methods /// [`crate::arg`] or [`crate::try_arg`]. - pub fn call serde::de::Deserialize<'a>>( + pub async fn call serde::de::Deserialize<'a>>( &self, method: &str, args: Option<&RawValue>, @@ -112,8 +132,8 @@ impl Client { let request = self.build_request(method, args); let id = request.id.clone(); - let response = self.send_request(request)?; - if response.jsonrpc.is_some() && response.jsonrpc != Some(From::from("2.0")) { + let response = self.send_request(request).await?; + if response.jsonrpc.is_some() && response.jsonrpc.as_deref() != Some(JSONRPC_VERSION) { return Err(Error::VersionMismatch); } if response.id != id { @@ -124,7 +144,7 @@ impl Client { } } -impl fmt::Debug for crate::Client { +impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "jsonrpc::Client(")?; self.transport.fmt_target(f)?; @@ -195,12 +215,23 @@ mod tests { use std::str::FromStr; use std::sync; + use futures::future::{err, ok}; + use super::*; struct DummyTransport; impl Transport for DummyTransport { - fn send_request(&self, _: Request) -> Result { Err(Error::NonceMismatch) } - fn send_batch(&self, _: &[Request]) -> Result, Error> { Ok(vec![]) } + fn send_request<'a>(&'a self, _: Request<'a>) -> BoxFuture<'a, Result> { + Box::pin(err(Error::NonceMismatch)) + } + + fn send_batch<'a>( + &'a self, + _: &'a [Request<'a>], + ) -> BoxFuture<'a, Result, Error>> { + Box::pin(ok(vec![])) + } + fn fmt_target(&self, _: &mut fmt::Formatter) -> fmt::Result { Ok(()) } } diff --git a/jsonrpc/src/http/bitreq_http_async.rs b/jsonrpc/src/http/bitreq_http_async.rs index 84cfef351..0f47baeee 100644 --- a/jsonrpc/src/http/bitreq_http_async.rs +++ b/jsonrpc/src/http/bitreq_http_async.rs @@ -1,4 +1,4 @@ -//! This module implements the [`crate::client::Transport`] trait using [`bitreq`] +//! This module implements the [`crate::client_async::Transport`] trait using [`bitreq`] //! as the underlying HTTP transport. //! //! [bitreq]: @@ -9,7 +9,7 @@ use std::{error, fmt}; use base64::engine::general_purpose::STANDARD as BASE64; use base64::Engine; -use crate::client::Transport; +use crate::client_async::{BoxFuture, Transport}; use crate::{Request, Response}; const DEFAULT_URL: &str = "http://localhost"; @@ -54,7 +54,14 @@ impl BitreqHttpTransport { } } - fn request(&self, req: impl serde::Serialize) -> Result + async fn request(&self, req: impl serde::Serialize) -> Result + where + R: for<'a> serde::de::Deserialize<'a>, + { + Ok(self.request_inner(req).await?) + } + + async fn request_inner(&self, req: impl serde::Serialize) -> Result where R: for<'a> serde::de::Deserialize<'a>, { @@ -73,7 +80,7 @@ impl BitreqHttpTransport { // Send the request and parse the response. If the response is an error that does not // contain valid JSON in its body (for instance if the bitcoind HTTP server work queue // depth is exceeded), return the raw HTTP error so users can match against it. - let resp = req.send()?; + let resp = req.send_async().await?; match resp.json() { Ok(json) => Ok(json), Err(bitreq_err) => @@ -90,18 +97,24 @@ impl BitreqHttpTransport { } impl Transport for BitreqHttpTransport { - fn send_request(&self, req: Request) -> Result { - Ok(self.request(req)?) + fn send_request<'a>( + &'a self, + req: Request<'a>, + ) -> BoxFuture<'a, Result> { + Box::pin(self.request(req)) } - fn send_batch(&self, reqs: &[Request]) -> Result, crate::Error> { - Ok(self.request(reqs)?) + fn send_batch<'a>( + &'a self, + reqs: &'a [Request<'a>], + ) -> BoxFuture<'a, Result, crate::Error>> { + Box::pin(self.request(reqs)) } fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.url) } } -/// Builder for simple bitcoind [`BitreqHttpTransport`]. +/// Builder for async bitcoind [`BitreqHttpTransport`]. #[derive(Clone, Debug)] pub struct Builder { tp: BitreqHttpTransport, @@ -142,13 +155,14 @@ impl Builder { /// # Examples /// /// ```no_run - /// # use jsonrpc::bitreq_http::BitreqHttpTransport; - /// # use std::fs::{self, File}; + /// # use jsonrpc::bitreq_http_async::BitreqHttpTransport; + /// # use std::fs::File; + /// # use std::io::Read; /// # use std::path::Path; /// # let cookie_file = Path::new("~/.bitcoind/.cookie"); /// let mut file = File::open(cookie_file).expect("couldn't open cookie file"); /// let mut cookie = String::new(); - /// fs::read_to_string(&mut cookie).expect("couldn't read cookie file"); + /// file.read_to_string(&mut cookie).expect("couldn't read cookie file"); /// let client = BitreqHttpTransport::builder().cookie_auth(cookie); /// ``` pub fn cookie_auth>(mut self, cookie: S) -> Self { @@ -238,7 +252,7 @@ impl From for crate::Error { #[cfg(test)] mod tests { use super::*; - use crate::Client; + use crate::client_async::Client; #[test] fn construct() { diff --git a/jsonrpc/src/http/mod.rs b/jsonrpc/src/http/mod.rs index f6221f388..d6f7f5a73 100644 --- a/jsonrpc/src/http/mod.rs +++ b/jsonrpc/src/http/mod.rs @@ -6,6 +6,9 @@ pub mod simple_http; #[cfg(feature = "bitreq_http")] pub mod bitreq_http; +#[cfg(feature = "bitreq_http_async")] +pub mod bitreq_http_async; + /// The default TCP port to use for connections. /// Set to 8332, the default RPC port for bitcoind. pub const DEFAULT_PORT: u16 = 8332; diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index ee2953ee7..c06178510 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -21,11 +21,15 @@ pub extern crate base64; pub extern crate bitreq; pub mod client; +#[cfg(feature = "client_async")] +pub mod client_async; pub mod error; pub mod http; #[cfg(feature = "bitreq_http")] pub use http::bitreq_http; +#[cfg(feature = "bitreq_http_async")] +pub use http::bitreq_http_async; #[cfg(feature = "simple_http")] pub use http::simple_http;