diff --git a/Cargo.lock b/Cargo.lock index b763e5d5e3..6c3ed33967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1215,6 +1215,7 @@ dependencies = [ "anyhow", "app-data", "async-trait", + "axum", "bigdecimal", "bytes-hex", "chain", @@ -1230,6 +1231,7 @@ dependencies = [ "futures", "hex-literal", "humantime", + "hyper 0.14.29", "indexmap 2.10.0", "itertools 0.14.0", "maplit", @@ -1255,6 +1257,8 @@ dependencies = [ "thiserror 1.0.61", "tikv-jemallocator", "tokio", + "tower 0.4.13", + "tower-http", "tracing", "url", "vergen", diff --git a/crates/autopilot/Cargo.toml b/crates/autopilot/Cargo.toml index 6630999911..ecd973993e 100644 --- a/crates/autopilot/Cargo.toml +++ b/crates/autopilot/Cargo.toml @@ -17,6 +17,7 @@ path = "src/main.rs" [dependencies] alloy = { workspace = true, features = ["rand", "provider-debug-api", "provider-trace-api"] } app-data = { workspace = true } +axum = { workspace = true } bytes-hex = { workspace = true } # may get marked as unused but it's used with serde anyhow = { workspace = true } async-trait = { workspace = true } @@ -31,7 +32,8 @@ database = { workspace = true } derive_more = { workspace = true } ethrpc = { workspace = true } futures = { workspace = true } -observe = { workspace = true } +hyper = { workspace = true } +observe = { workspace = true, features = ["axum-tracing"] } const-hex = { workspace = true } hex-literal = { workspace = true } humantime = { workspace = true } @@ -58,6 +60,8 @@ sqlx = { workspace = true } strum = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } +tower = { workspace = true } +tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } url = { workspace = true } web3 = { workspace = true } diff --git a/crates/autopilot/openapi.yml b/crates/autopilot/openapi.yml new file mode 100644 index 0000000000..7ab842300d --- /dev/null +++ b/crates/autopilot/openapi.yml @@ -0,0 +1,94 @@ +openapi: 3.0.3 +info: + title: Autopilot Native Price API + description: | + Internal API for retrieving native token prices from the autopilot service. + This API is intended to be used by orderbook instances to query the shared + native price cache maintained by autopilot. + version: 0.0.1 +paths: + /native_price/{token}: + get: + operationId: getNativePrice + summary: Get the native price for a token + description: | + Returns the price of the specified token denominated in the native token + (e.g., ETH on Ethereum mainnet, xDAI on Gnosis Chain). + parameters: + - in: path + name: token + description: The address of the token to get the price for. + schema: + $ref: "#/components/schemas/Address" + required: true + - in: query + name: timeout_ms + description: | + Optional timeout in milliseconds for the price estimation request. + If not provided, uses the default timeout configured for autopilot. + Values below 250ms are automatically clamped to the minimum (250ms). + Values exceeding the configured maximum are clamped to the maximum. + schema: + type: integer + format: int64 + minimum: 250 + example: 5000 + required: false + responses: + "200": + description: Native price retrieved successfully. + content: + application/json: + schema: + $ref: "#/components/schemas/NativeTokenPrice" + "400": + description: | + Bad request. Possible causes: + - Unsupported token + content: + text/plain: + schema: + type: string + example: "Unsupported token" + "404": + description: No liquidity available for this token. + content: + text/plain: + schema: + type: string + example: "No liquidity" + "429": + description: Rate limited by upstream price estimator. + content: + text/plain: + schema: + type: string + example: "Rate limited" + "500": + description: Internal server error. + content: + text/plain: + schema: + type: string + example: "Internal error" +components: + schemas: + Address: + description: | + An Ethereum address encoded as a hex with `0x` prefix. + type: string + example: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" + NativeTokenPrice: + description: | + The price of a token denominated in the native token (e.g., ETH). + type: object + properties: + price: + type: number + format: double + description: | + The price of the token in terms of the native token. For example, if + 1 USDC = 0.0005 ETH, the price would be 0.0005. + example: 0.0005 + required: + - price diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 1785cd313a..193be1a5a9 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -59,6 +59,10 @@ pub struct Arguments { #[clap(long, env, default_value = "0.0.0.0:9589")] pub metrics_address: SocketAddr, + /// Address to bind the HTTP API server + #[clap(long, env, default_value = "0.0.0.0:12088")] + pub api_address: SocketAddr, + /// Url of the Postgres database. By default connects to locally running /// postgres. #[clap(long, env, default_value = "postgresql://")] @@ -370,6 +374,7 @@ impl std::fmt::Display for Arguments { ethflow_contracts, ethflow_indexing_start, metrics_address, + api_address, skip_event_sync, allowed_tokens, unsupported_tokens, @@ -417,6 +422,7 @@ impl std::fmt::Display for Arguments { writeln!(f, "ethflow_contracts: {ethflow_contracts:?}")?; writeln!(f, "ethflow_indexing_start: {ethflow_indexing_start:?}")?; writeln!(f, "metrics_address: {metrics_address}")?; + writeln!(f, "api_address: {api_address}")?; display_secret_option(f, "db_write_url", Some(&db_write_url))?; writeln!(f, "skip_event_sync: {skip_event_sync}")?; writeln!(f, "allowed_tokens: {allowed_tokens:?}")?; diff --git a/crates/autopilot/src/infra/api.rs b/crates/autopilot/src/infra/api.rs new file mode 100644 index 0000000000..d45789c671 --- /dev/null +++ b/crates/autopilot/src/infra/api.rs @@ -0,0 +1,133 @@ +use { + alloy::primitives::Address, + axum::{ + Router, + extract::{Path, Query, State as AxumState}, + http::StatusCode, + response::{IntoResponse, Json, Response}, + routing::get, + }, + model::quote::NativeTokenPrice, + observe::distributed_tracing::tracing_axum::{make_span, record_trace_id}, + serde::Deserialize, + shared::price_estimation::{PriceEstimationError, native::NativePriceEstimating}, + std::{ + net::SocketAddr, + ops::RangeInclusive, + sync::Arc, + time::{Duration, Instant}, + }, + tokio::sync::oneshot, +}; + +/// Minimum allowed timeout for price estimation requests. +/// Values below this are not useful as they don't give estimators enough time. +const MIN_TIMEOUT: Duration = Duration::from_millis(250); + +#[derive(Clone)] +struct State { + estimator: Arc, + allowed_timeout: RangeInclusive, +} + +#[derive(Debug, Deserialize)] +struct NativePriceQuery { + /// Optional timeout in milliseconds for the price estimation request. + /// If not provided, uses the default timeout configured for autopilot. + /// Values below 250ms are automatically clamped to the minimum (250ms). + /// Values exceeding the configured maximum are clamped to the maximum. + #[serde(default)] + timeout_ms: Option, +} + +pub async fn serve( + addr: SocketAddr, + estimator: Arc, + max_timeout: Duration, + shutdown: oneshot::Receiver<()>, +) -> Result<(), hyper::Error> { + let state = State { + estimator, + allowed_timeout: MIN_TIMEOUT..=max_timeout, + }; + + let app = Router::new() + .route("/native_price/:token", get(get_native_price)) + .with_state(state) + .layer( + tower::ServiceBuilder::new() + .layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span)) + .map_request(record_trace_id), + ); + + let server = axum::Server::bind(&addr).serve(app.into_make_service()); + tracing::info!(?addr, "serving HTTP API"); + + server + .with_graceful_shutdown(async { + shutdown.await.ok(); + }) + .await +} + +async fn get_native_price( + Path(token): Path
, + Query(query): Query, + AxumState(state): AxumState, +) -> Response { + let timeout = query + .timeout_ms + .map(Duration::from_millis) + .unwrap_or(*state.allowed_timeout.end()) + .clamp(*state.allowed_timeout.start(), *state.allowed_timeout.end()); + + let start = Instant::now(); + match state.estimator.estimate_native_price(token, timeout).await { + Ok(price) => { + let elapsed = start.elapsed(); + tracing::debug!( + ?token, + ?timeout, + ?elapsed, + ?price, + "estimated native token price" + ); + Json(NativeTokenPrice { price }).into_response() + } + Err(err) => { + let elapsed = start.elapsed(); + tracing::warn!( + ?err, + ?token, + ?timeout, + ?elapsed, + "failed to estimate native token price" + ); + error_to_response(err) + } + } +} + +fn error_to_response(err: PriceEstimationError) -> Response { + match err { + PriceEstimationError::NoLiquidity | PriceEstimationError::EstimatorInternal(_) => { + (StatusCode::NOT_FOUND, "No liquidity").into_response() + } + PriceEstimationError::UnsupportedToken { token: _, reason } => ( + StatusCode::BAD_REQUEST, + format!("Unsupported token, reason: {reason}"), + ) + .into_response(), + PriceEstimationError::RateLimited => { + (StatusCode::TOO_MANY_REQUESTS, "Rate limited").into_response() + } + PriceEstimationError::UnsupportedOrderType(reason) => ( + StatusCode::BAD_REQUEST, + format!("Unsupported order type, reason: {reason}"), + ) + .into_response(), + PriceEstimationError::ProtocolInternal(_) => { + (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response() + } + } +} diff --git a/crates/autopilot/src/infra/mod.rs b/crates/autopilot/src/infra/mod.rs index 52809890c3..11c2d58857 100644 --- a/crates/autopilot/src/infra/mod.rs +++ b/crates/autopilot/src/infra/mod.rs @@ -1,3 +1,4 @@ +pub mod api; pub mod blockchain; pub mod persistence; pub mod shadow; diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index bd8fe974e6..7bba0e909b 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -533,6 +533,15 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) { let liveness = Arc::new(Liveness::new(args.max_auction_age)); let startup = Arc::new(Some(AtomicBool::new(false))); + + let (api_shutdown_sender, api_shutdown_receiver) = tokio::sync::oneshot::channel(); + let api_task = tokio::spawn(infra::api::serve( + args.api_address, + native_price_estimator.clone(), + args.price_estimation.quote_timeout, + api_shutdown_receiver, + )); + observe::metrics::serve_metrics( liveness.clone(), args.metrics_address, @@ -699,6 +708,9 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) { competition_updates_sender, ); run.run_forever(shutdown_controller).await; + + api_shutdown_sender.send(()).ok(); + api_task.await.ok(); } async fn shadow_mode(args: Arguments) -> ! { diff --git a/crates/e2e/src/setup/mod.rs b/crates/e2e/src/setup/mod.rs index 454d3b149a..472024fc89 100644 --- a/crates/e2e/src/setup/mod.rs +++ b/crates/e2e/src/setup/mod.rs @@ -3,6 +3,7 @@ mod deploy; #[macro_use] pub mod onchain_components; pub mod fee; +pub mod proxy; mod services; mod solver; diff --git a/crates/e2e/src/setup/proxy.rs b/crates/e2e/src/setup/proxy.rs new file mode 100644 index 0000000000..b6b8caa208 --- /dev/null +++ b/crates/e2e/src/setup/proxy.rs @@ -0,0 +1,176 @@ +//! Simple HTTP reverse proxy with automatic failover for e2e testing. +//! +//! This module provides a test-only reverse proxy that simulates how Kubernetes +//! service pools work in production. In production, when multiple instances of +//! a service run (e.g., autopilot with leader/follower pattern), Kubernetes +//! routes traffic to the active instance and automatically fails over to a +//! backup when the primary becomes unavailable. +//! +//! The proxy maintains a queue of backend URLs and automatically rotates +//! through them when the currently active backend fails. This allows e2e tests +//! to simulate production failover behavior without requiring a full k8s +//! cluster. + +use { + axum::{Router, http::Request, response::IntoResponse}, + std::{collections::VecDeque, net::SocketAddr, sync::Arc}, + tokio::{sync::RwLock, task::JoinHandle}, + url::Url, + warp::hyper::Body, +}; + +/// HTTP reverse proxy with automatic failover that permanently switches +/// to the fallback backend when the current backend fails. +/// +/// This simulates k8s service pools where traffic is automatically routed +/// to healthy backend instances. +pub struct ReverseProxy { + _server_handle: JoinHandle<()>, +} + +#[derive(Clone)] +struct ProxyState { + backends: Arc>>, +} + +impl ProxyState { + /// Returns the current active backend URL. + async fn get_current_backend(&self) -> Url { + self.backends + .read() + .await + .front() + .cloned() + .expect("backends should never be empty") + } + + /// Rotates to the next backend by moving the current backend to the end of + /// the queue. + async fn rotate_backends(&self) { + let mut backends = self.backends.write().await; + if let Some(current) = backends.pop_front() { + backends.push_back(current); + } + } + + /// Returns the total number of backends configured. + /// + /// Used to determine how many retry attempts to make before giving up. + async fn backend_count(&self) -> usize { + self.backends.read().await.len() + } +} + +impl ReverseProxy { + /// Start a new proxy server with automatic failover between backends + /// + /// # Panics + /// Panics if `backends` is empty. At least one backend URL is required. + pub fn start(listen_addr: SocketAddr, backends: &[Url]) -> Self { + assert!( + !backends.is_empty(), + "At least one backend URL is required for the proxy" + ); + + let backends_queue: VecDeque = backends.iter().cloned().collect(); + + let state = ProxyState { + backends: Arc::new(RwLock::new(backends_queue)), + }; + + let backends_log: Vec = backends.to_vec(); + let server_handle = tokio::spawn(serve(listen_addr, backends_log, state)); + + Self { + _server_handle: server_handle, + } + } +} + +async fn serve(listen_addr: SocketAddr, backends: Vec, state: ProxyState) { + let client = reqwest::Client::new(); + + let proxy_handler = move |req: Request| { + let client = client.clone(); + let state = state.clone(); + async move { handle_request(client, state, req).await } + }; + + let app = Router::new().fallback(proxy_handler); + + tracing::info!(?listen_addr, ?backends, "starting reverse proxy"); + axum::Server::bind(&listen_addr) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +async fn handle_request( + client: reqwest::Client, + state: ProxyState, + req: Request, +) -> impl IntoResponse { + let (parts, body) = req.into_parts(); + + // Convert body to bytes once for reuse across retries + let body_bytes = match warp::hyper::body::to_bytes(body).await { + Ok(bytes) => bytes, + Err(err) => { + return ( + axum::http::StatusCode::BAD_REQUEST, + format!("Failed to read request body: {}", err), + ) + .into_response(); + } + }; + + let backend_count = state.backend_count().await; + + for attempt in 0..backend_count { + let backend = state.get_current_backend().await; + + match try_backend(&client, &parts, body_bytes.to_vec(), &backend).await { + Ok(response) => return response.into_response(), + Err(err) => { + tracing::warn!(?err, ?backend, attempt, "backend failed, rotating to next"); + state.rotate_backends().await; + } + } + } + + ( + axum::http::StatusCode::BAD_GATEWAY, + "All backends unavailable", + ) + .into_response() +} + +async fn try_backend( + client: &reqwest::Client, + parts: &axum::http::request::Parts, + body: Vec, + backend: &Url, +) -> Result<(axum::http::StatusCode, Vec), reqwest::Error> { + let path = parts + .uri + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or(""); + + // Build the full URL by combining backend and path + let url = format!("{}{}", backend, path); + // Build a reqwest request with the same method + let mut backend_req = client.request(parts.method.clone(), &url); + // Forward all headers from the original request + for (name, value) in &parts.headers { + backend_req = backend_req.header(name, value); + } + + // Attach the body + backend_req = backend_req.body(body); + + let backend_resp = backend_req.send().await?; + let status = axum::http::StatusCode::from_u16(backend_resp.status().as_u16()).unwrap(); + let bytes = backend_resp.bytes().await?; + Ok((status, bytes.to_vec())) +} diff --git a/crates/e2e/src/setup/services.rs b/crates/e2e/src/setup/services.rs index 145a09aae6..f1a8ae6265 100644 --- a/crates/e2e/src/setup/services.rs +++ b/crates/e2e/src/setup/services.rs @@ -129,7 +129,7 @@ impl<'a> Services<'a> { fn api_autopilot_arguments(&self) -> impl Iterator + use<> { [ - "--native-price-estimators=test_quoter|http://localhost:11088/test_solver".to_string(), + "--native-price-estimators=Forwarder|http://localhost:12088".to_string(), "--amount-to-estimate-prices-with=1000000000000000000".to_string(), "--block-stream-poll-interval=1s".to_string(), format!("--node-ws-url={NODE_WS_HOST}"), @@ -144,6 +144,14 @@ impl<'a> Services<'a> { .into_iter() } + fn autopilot_arguments(&self) -> impl Iterator + use<> { + self.api_autopilot_arguments().chain([ + "--quote-timeout=10s".to_string(), + "--native-price-estimators=Driver|test_quoter|http://localhost:11088/test_solver" + .to_string(), + ]) + } + fn api_autopilot_solver_arguments(&self) -> impl Iterator + use<> { [ "--baseline-sources=None".to_string(), @@ -199,11 +207,12 @@ impl<'a> Services<'a> { "--run-loop-native-price-timeout=500ms".to_string(), format!("--ethflow-contracts={ethflow_contracts}"), "--skip-event-sync=true".to_string(), + "--api-address=0.0.0.0:12088".to_string(), format!("--solve-deadline={solve_deadline:?}"), ] .into_iter() .chain(self.api_autopilot_solver_arguments()) - .chain(self.api_autopilot_arguments()) + .chain(self.autopilot_arguments()) .chain(extra_args) .collect(); let args = ignore_overwritten_cli_params(args); @@ -347,11 +356,10 @@ impl<'a> Services<'a> { let autopilot_args = vec![ format!("--drivers=test_solver|http://localhost:11088/test_solver|{}", const_hex::encode(solver.address())), "--price-estimation-drivers=test_quoter|http://localhost:11088/baseline_solver,test_solver|http://localhost:11088/test_solver".to_string(), - "--native-price-estimators=test_quoter|http://localhost:11088/baseline_solver,test_solver|http://localhost:11088/test_solver".to_string(), + "--native-price-estimators=Driver|test_quoter|http://localhost:11088/baseline_solver,Driver|test_solver|http://localhost:11088/test_solver".to_string(), ]; let api_args = vec![ "--price-estimation-drivers=test_quoter|http://localhost:11088/baseline_solver,test_solver|http://localhost:11088/test_solver".to_string(), - "--native-price-estimators=test_quoter|http://localhost:11088/baseline_solver,test_solver|http://localhost:11088/test_solver".to_string(), ]; (autopilot_args, api_args) } else { @@ -362,15 +370,13 @@ impl<'a> Services<'a> { ), "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver" .to_string(), - "--native-price-estimators=test_quoter|http://localhost:11088/test_solver" + "--native-price-estimators=Driver|test_quoter|http://localhost:11088/test_solver" .to_string(), ]; let api_args = vec![ "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver" .to_string(), - "--native-price-estimators=test_quoter|http://localhost:11088/test_solver" - .to_string(), ]; (autopilot_args, api_args) }; diff --git a/crates/e2e/tests/e2e/autopilot_leader.rs b/crates/e2e/tests/e2e/autopilot_leader.rs index 7fdfd8f51d..7ff1e1fc7d 100644 --- a/crates/e2e/tests/e2e/autopilot_leader.rs +++ b/crates/e2e/tests/e2e/autopilot_leader.rs @@ -1,6 +1,14 @@ use { autopilot::shutdown_controller::ShutdownController, - e2e::setup::{OnchainComponents, Services, TIMEOUT, colocation, run_test, wait_for_condition}, + e2e::setup::{ + OnchainComponents, + Services, + TIMEOUT, + colocation, + proxy::ReverseProxy, + run_test, + wait_for_condition, + }, ethrpc::{Web3, alloy::CallBuilderExt}, model::order::{OrderCreation, OrderKind}, number::units::EthUnit, @@ -69,6 +77,15 @@ async fn dual_autopilot_only_leader_produces_auctions(web3: Web3) { let services = Services::new(&onchain).await; let (manual_shutdown, control) = ShutdownController::new_manual_shutdown(); + // Start proxy for native price API with automatic failover + let _proxy = ReverseProxy::start( + "0.0.0.0:9588".parse().unwrap(), + &[ + "http://0.0.0.0:12088".parse().unwrap(), // autopilot_leader + "http://0.0.0.0:12089".parse().unwrap(), // autopilot_follower + ], + ); + // Configure autopilot-leader only with test_solver let autopilot_leader = services.start_autopilot_with_shutdown_controller(None, vec![ format!("--drivers=test_solver|http://localhost:11088/test_solver|{}|requested-timeout-on-problems", @@ -76,6 +93,7 @@ async fn dual_autopilot_only_leader_produces_auctions(web3: Web3) { "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver".to_string(), "--gas-estimators=http://localhost:11088/gasprice".to_string(), "--metrics-address=0.0.0.0:9590".to_string(), + "--api-address=0.0.0.0:12088".to_string(), "--enable-leader-lock=true".to_string(), ], control).await; @@ -85,12 +103,14 @@ async fn dual_autopilot_only_leader_produces_auctions(web3: Web3) { const_hex::encode(solver2.address())), "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver2".to_string(), "--gas-estimators=http://localhost:11088/gasprice".to_string(), + "--api-address=0.0.0.0:12089".to_string(), "--enable-leader-lock=true".to_string(), ]).await; services .start_api(vec![ "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver1,test_solver2|http://localhost:11088/test_solver2".to_string(), + "--native-price-estimators=Forwarder|http://0.0.0.0:9588".to_string(), ]) .await; diff --git a/crates/e2e/tests/e2e/limit_orders.rs b/crates/e2e/tests/e2e/limit_orders.rs index 568cdc7abf..55a3759e79 100644 --- a/crates/e2e/tests/e2e/limit_orders.rs +++ b/crates/e2e/tests/e2e/limit_orders.rs @@ -422,6 +422,8 @@ async fn two_limit_orders_multiple_winners_test(web3: Web3) { services .start_api(vec![ "--price-estimation-drivers=solver1|http://localhost:11088/test_solver".to_string(), + "--native-price-estimators=Driver|test_quoter|http://localhost:11088/test_solver" + .to_string(), ]) .await; @@ -594,6 +596,7 @@ async fn too_many_limit_orders_test(web3: Web3) { let mut onchain = OnchainComponents::deploy(web3.clone()).await; let [solver] = onchain.make_solvers(1u64.eth()).await; + let solver_address = solver.address(); let [trader] = onchain.make_accounts(1u64.eth()).await; let [token_a] = onchain .deploy_tokens_with_weth_uni_v2_pools(1_000u64.eth(), 1_000u64.eth()) @@ -627,6 +630,19 @@ async fn too_many_limit_orders_test(web3: Web3) { colocation::LiquidityProvider::UniswapV2, false, ); + services + .start_autopilot( + None, + vec![ + format!( + "--drivers=test_solver|http://localhost:11088/test_solver|{}", + const_hex::encode(solver_address) + ), + "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver" + .to_string(), + ], + ) + .await; services .start_api(vec![ "--max-limit-orders-per-user=1".into(), @@ -676,6 +692,7 @@ async fn limit_does_not_apply_to_in_market_orders_test(web3: Web3) { let mut onchain = OnchainComponents::deploy(web3.clone()).await; let [solver] = onchain.make_solvers(1u64.eth()).await; + let solver_address = solver.address(); let [trader] = onchain.make_accounts(1u64.eth()).await; let [token] = onchain .deploy_tokens_with_weth_uni_v2_pools(1_000u64.eth(), 1_000u64.eth()) @@ -709,6 +726,19 @@ async fn limit_does_not_apply_to_in_market_orders_test(web3: Web3) { colocation::LiquidityProvider::UniswapV2, false, ); + services + .start_autopilot( + None, + vec![ + format!( + "--drivers=test_solver|http://localhost:11088/test_solver|{}", + const_hex::encode(solver_address) + ), + "--price-estimation-drivers=test_quoter|http://localhost:11088/test_solver" + .to_string(), + ], + ) + .await; services .start_api(vec![ "--max-limit-orders-per-user=1".into(), diff --git a/crates/e2e/tests/e2e/quoting.rs b/crates/e2e/tests/e2e/quoting.rs index 4281720cd6..87491855b3 100644 --- a/crates/e2e/tests/e2e/quoting.rs +++ b/crates/e2e/tests/e2e/quoting.rs @@ -308,6 +308,8 @@ async fn quote_timeout(web3: Web3) { services .start_api(vec![ "--price-estimation-drivers=test_quoter|http://localhost:11088/test_quoter".to_string(), + "--native-price-estimators=Driver|test_quoter|http://localhost:11088/test_solver" + .to_string(), format!("--quote-timeout={MAX_QUOTE_TIME_MS}ms"), ]) .await; diff --git a/crates/shared/src/price_estimation/factory.rs b/crates/shared/src/price_estimation/factory.rs index 7dec89e2e8..37efcaf17e 100644 --- a/crates/shared/src/price_estimation/factory.rs +++ b/crates/shared/src/price_estimation/factory.rs @@ -181,6 +181,16 @@ impl<'a> PriceEstimatorFactory<'a> { weth: &WETH9::Instance, ) -> Result<(String, Arc)> { match source { + NativePriceEstimatorSource::Forwarder(url) => { + let name = format!("Forwarder|{}", url); + Ok(( + name.clone(), + Arc::new(InstrumentedPriceEstimator::new( + native::Forwarder::new(self.components.http_factory.create(), url.clone()), + name, + )), + )) + } NativePriceEstimatorSource::Driver(driver) => { let native_token_price_estimation_amount = self.native_token_price_estimation_amount()?; diff --git a/crates/shared/src/price_estimation/mod.rs b/crates/shared/src/price_estimation/mod.rs index 065780f7fb..e6f3017864 100644 --- a/crates/shared/src/price_estimation/mod.rs +++ b/crates/shared/src/price_estimation/mod.rs @@ -5,7 +5,7 @@ use { trade_finding::{Interaction, QuoteExecution}, }, alloy::primitives::{Address, U256}, - anyhow::{Result, ensure}, + anyhow::{Context, Result, ensure}, bigdecimal::BigDecimal, ethcontract::H160, futures::future::BoxFuture, @@ -76,6 +76,7 @@ impl FromStr for ExternalSolver { #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub enum NativePriceEstimator { Driver(ExternalSolver), + Forwarder(Url), OneInchSpotPriceApi, CoinGecko, } @@ -83,7 +84,8 @@ pub enum NativePriceEstimator { impl Display for NativePriceEstimator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let formatter = match self { - NativePriceEstimator::Driver(s) => format!("{}|{}", &s.name, s.url), + NativePriceEstimator::Driver(s) => format!("Driver|{}|{}", &s.name, s.url), + NativePriceEstimator::Forwarder(url) => format!("Forwarder|{}", url), NativePriceEstimator::OneInchSpotPriceApi => "OneInchSpotPriceApi".into(), NativePriceEstimator::CoinGecko => "CoinGecko".into(), }; @@ -133,12 +135,18 @@ impl FromStr for NativePriceEstimator { type Err = anyhow::Error; fn from_str(s: &str) -> Result { - match s { + let (variant, args) = s.split_once('|').unwrap_or((s, "")); + match variant { "OneInchSpotPriceApi" => Ok(NativePriceEstimator::OneInchSpotPriceApi), "CoinGecko" => Ok(NativePriceEstimator::CoinGecko), - estimator => Ok(NativePriceEstimator::Driver(ExternalSolver::from_str( - estimator, + "Driver" => Ok(NativePriceEstimator::Driver(ExternalSolver::from_str( + args, )?)), + "Forwarder" => Ok(NativePriceEstimator::Forwarder( + args.parse() + .context("Forwarder price estimator invalid URL")?, + )), + _ => Err(anyhow::anyhow!("unsupported native price estimator: {}", s)), } } } @@ -637,9 +645,10 @@ mod tests { ) .to_string(), &NativePriceEstimator::OneInchSpotPriceApi.to_string(), - "one|http://localhost:1111/,two|http://localhost:2222/;three|http://localhost:3333/,four|http://localhost:4444/", + &NativePriceEstimator::Forwarder("http://localhost:9588".parse().unwrap()).to_string(), + "Driver|one|http://localhost:1111/,Driver|two|http://localhost:2222/;Driver|three|http://localhost:3333/,Driver|four|http://localhost:4444/", &format!( - "one|http://localhost:1111/,two|http://localhost:2222/;{},four|http://localhost:4444/", + "Driver|one|http://localhost:1111/,Driver|two|http://localhost:2222/;{},Driver|four|http://localhost:4444/", NativePriceEstimator::OneInchSpotPriceApi ), ] { diff --git a/crates/shared/src/price_estimation/native/forwarder.rs b/crates/shared/src/price_estimation/native/forwarder.rs new file mode 100644 index 0000000000..4d6aaa72d4 --- /dev/null +++ b/crates/shared/src/price_estimation/native/forwarder.rs @@ -0,0 +1,88 @@ +//! Forwards native price estimation requests to autopilot's HTTP API. +//! +//! This allows orderbook instances to share autopilot's native price cache +//! instead of maintaining independent caches, avoiding cache inconsistencies +//! and reducing rate limiting from external price estimators. + +use { + super::{NativePriceEstimateResult, NativePriceEstimating}, + crate::price_estimation::PriceEstimationError, + alloy::primitives::Address, + anyhow::Context, + futures::{FutureExt, future::BoxFuture}, + model::quote::NativeTokenPrice, + reqwest::StatusCode, + std::time::Duration, + url::Url, +}; + +pub struct Forwarder { + client: reqwest::Client, + autopilot_url: Url, +} + +impl Forwarder { + pub fn new(client: reqwest::Client, autopilot_url: Url) -> Self { + Self { + client, + autopilot_url, + } + } + + async fn try_fetch(&self, token: Address, timeout: Duration) -> NativePriceEstimateResult { + let url = self + .autopilot_url + .join(format!("native_price/{:?}", token).as_str()) + .context("failed to construct autopilot URL")?; + + let response = self + .client + .get(url) + .query(&[("timeout_ms", timeout.as_millis() as u64)]) + .timeout(timeout) + .send() + .await + .context("failed to send request")?; + + match response.status() { + StatusCode::OK => { + let price: NativeTokenPrice = + response.json().await.context("failed to parse response")?; + Ok(price.price) + } + StatusCode::NOT_FOUND => Err(PriceEstimationError::NoLiquidity), + StatusCode::TOO_MANY_REQUESTS => Err(PriceEstimationError::RateLimited), + StatusCode::BAD_REQUEST => { + let error_text = response + .text() + .await + .unwrap_or_else(|_| "unknown error".to_string()); + Err(PriceEstimationError::ProtocolInternal(anyhow::anyhow!( + "bad request: {}", + error_text + ))) + } + status => { + let error_text = response + .text() + .await + .unwrap_or_else(|_| format!("HTTP {}", status)); + Err(PriceEstimationError::ProtocolInternal(anyhow::anyhow!( + "autopilot returned status {}: {}", + status, + error_text + ))) + } + } + } +} + +impl NativePriceEstimating for Forwarder { + fn estimate_native_price( + &self, + token: Address, + timeout: Duration, + ) -> BoxFuture<'_, NativePriceEstimateResult> { + self.try_fetch(token, timeout).boxed() + } +} diff --git a/crates/shared/src/price_estimation/native/mod.rs b/crates/shared/src/price_estimation/native/mod.rs index 23702c8ac5..0135dedc10 100644 --- a/crates/shared/src/price_estimation/native/mod.rs +++ b/crates/shared/src/price_estimation/native/mod.rs @@ -13,9 +13,10 @@ use { }; mod coingecko; +mod forwarder; mod oneinch; -pub use self::{coingecko::CoinGecko, oneinch::OneInch}; +pub use self::{coingecko::CoinGecko, forwarder::Forwarder, oneinch::OneInch}; pub type NativePrice = f64; pub type NativePriceEstimateResult = Result;