From 086244abbf6163a5af15a59b810140f69bb4ba87 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 8 Dec 2025 10:09:13 +0000 Subject: [PATCH 1/4] Keep tokio console only in the playground --- .cargo/config.toml | 2 +- crates/observe/Cargo.toml | 3 ++- crates/observe/build.rs | 2 ++ crates/observe/src/tracing.rs | 49 +++++++++++++++++++---------------- playground/Dockerfile | 18 ++++++------- 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index bff29e6e17..77972bb7bd 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,2 @@ [build] -rustflags = ["--cfg", "tokio_unstable"] +# rustflags configuration removed - tokio_unstable only enabled in playground diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index ed98c00503..147923ef1c 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -10,7 +10,7 @@ axum = { workspace = true, optional = true } atty = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true, features = ["now"] } -console-subscriber = { workspace = true } +console-subscriber = { workspace = true, optional = true } futures = { workspace = true } opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["grpc-tonic"] } @@ -37,3 +37,4 @@ workspace = true default = [] axum-tracing = ["axum"] jemalloc-profiling = ["dep:jemalloc_pprof"] +tokio-console = ["dep:console-subscriber"] diff --git a/crates/observe/build.rs b/crates/observe/build.rs index b0ce3de9b9..9c0b1f64be 100644 --- a/crates/observe/build.rs +++ b/crates/observe/build.rs @@ -1,4 +1,6 @@ fn main() { // Make build system aware of custom config flags to avoid clippy warnings + // tokio_unstable is only used when explicitly compiled with --cfg + // tokio_unstable (e.g., in the playground environment) println!("cargo::rustc-check-cfg=cfg(tokio_unstable)"); } diff --git a/crates/observe/src/tracing.rs b/crates/observe/src/tracing.rs index eb9ae0f316..d3c98eff37 100644 --- a/crates/observe/src/tracing.rs +++ b/crates/observe/src/tracing.rs @@ -59,19 +59,7 @@ pub fn initialize_reentrant(config: &Config) { fn set_tracing_subscriber(config: &Config) { let initial_filter = config.env_filter.to_string(); - // The `tracing` APIs are heavily generic to enable zero overhead. Unfortunately - // this leads to very annoying type constraints which can only be satisfied - // by literally copy and pasting the code so the compiler doesn't try to - // infer types that satisfy both the tokio-console and the regular case. - // It's tempting to resolve this mess by first configuring the `fmt_layer` and - // only then the `console_subscriber`. However, this setup was the only way - // I found that: - // 1. actually makes `tokio-console` work - // 2. prints logs if `tokio-console` is disabled - // 3. does NOT skip the next log following a `tracing::event!()`. These calls - // happen for example under the hood in `sqlx`. I don't understand what's - // actually causing that but at this point I'm just happy if all the features - // work correctly. + // The `tracing` APIs are heavily generic to enable zero overhead. macro_rules! fmt_layer { ($env_filter:expr_2021, $stderr_threshold:expr_2021, $use_json_format:expr_2021) => {{ @@ -109,11 +97,6 @@ fn set_tracing_subscriber(config: &Config) { }}; } - let enable_tokio_console: bool = std::env::var("TOKIO_CONSOLE") - .unwrap_or("false".to_string()) - .parse() - .unwrap(); - let (env_filter, reload_handle) = tracing_subscriber::reload::Layer::new(EnvFilter::new(&initial_filter)); @@ -161,13 +144,33 @@ fn set_tracing_subscriber(config: &Config) { )) .with(tracing_layer); - if cfg!(tokio_unstable) && enable_tokio_console { - subscriber.with(console_subscriber::spawn()).init(); - tracing::info!("started program with support for tokio-console"); - } else { + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + { + let enable_tokio_console: bool = std::env::var("TOKIO_CONSOLE") + .unwrap_or("false".to_string()) + .parse() + .unwrap(); + + if enable_tokio_console { + subscriber.with(console_subscriber::spawn()).init(); + tracing::info!("started program with support for tokio-console"); + } else { + subscriber.init(); + tracing::info!( + "started program without support for tokio-console (TOKIO_CONSOLE=false)" + ); + } + } + + #[cfg(not(all(tokio_unstable, feature = "tokio-console")))] + { subscriber.init(); - tracing::info!("started program without support for tokio-console"); + tracing::info!( + "started program without support for tokio-console (not compiled with tokio_unstable \ + cfg and tokio-console feature)" + ); } + if cfg!(unix) { spawn_reload_handler(initial_filter, reload_handle); } diff --git a/playground/Dockerfile b/playground/Dockerfile index fd05b0fef6..06ede98e1c 100644 --- a/playground/Dockerfile +++ b/playground/Dockerfile @@ -31,7 +31,7 @@ CMD ["migrate"] FROM chef AS builder COPY --from=planner /src/recipe.json recipe.json COPY --from=chef /.cargo /.cargo -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo chef cook --release --recipe-path recipe.json +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo chef cook --release --features observe/tokio-console --recipe-path recipe.json # Copy only the library crates for now COPY --from=chef /.cargo /.cargo @@ -48,9 +48,9 @@ COPY ./crates/chain/ ./crates/chain COPY ./crates/ethrpc/ ./crates/ethrpc COPY ./crates/observe/ ./crates/observe COPY ./crates/order-validation/ ./crates/order-validation -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package shared +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package shared COPY ./crates/solver/ ./crates/solver -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package solver +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package solver # Create an base image for all the binaries FROM docker.io/debian:bookworm-slim AS base @@ -61,14 +61,14 @@ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked apt-get update && \ FROM builder AS alerter-build COPY --from=chef /.cargo /.cargo COPY ./crates/alerter/ ./crates/alerter -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package alerter +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package alerter FROM base AS alerter COPY --from=alerter-build /src/target/release/alerter /usr/local/bin/alerter ENTRYPOINT [ "alerter" ] FROM builder AS autopilot-build -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package autopilot +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package autopilot FROM base AS autopilot COPY --from=chef /.cargo /.cargo @@ -78,7 +78,7 @@ ENTRYPOINT [ "autopilot" ] FROM builder AS driver-build COPY ./crates/driver/ ./crates/driver -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package driver +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package driver FROM base AS driver COPY --from=driver-build /src/target/release/driver /usr/local/bin/driver @@ -87,7 +87,7 @@ ENTRYPOINT [ "driver" ] FROM builder AS orderbook-build COPY --from=chef /.cargo /.cargo COPY ./crates/orderbook/ ./crates/orderbook -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package orderbook +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package orderbook FROM base AS orderbook COPY --from=orderbook-build /src/target/release/orderbook /usr/local/bin/orderbook @@ -96,7 +96,7 @@ ENTRYPOINT [ "orderbook" ] FROM builder AS refunder-build COPY --from=chef /.cargo /.cargo COPY ./crates/refunder/ ./crates/refunder -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package refunder +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package refunder FROM base AS refunder COPY --from=refunder-build /src/target/release/refunder /usr/local/bin/refunder @@ -105,7 +105,7 @@ ENTRYPOINT [ "refunder" ] FROM builder AS solvers-build COPY --from=chef /.cargo /.cargo COPY ./crates/solvers/ ./crates/solvers -RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --package solvers +RUN CARGO_PROFILE_RELEASE_DEBUG=1 cargo build --release --features observe/tokio-console --package solvers FROM base AS solvers COPY --from=solvers-build /src/target/release/solvers /usr/local/bin/solvers From 62c457cdf61a3f28965fa9e7c776caa772b3ddd5 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 8 Dec 2025 10:41:57 +0000 Subject: [PATCH 2/4] Redundant file --- .cargo/config.toml | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index 77972bb7bd..0000000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -# rustflags configuration removed - tokio_unstable only enabled in playground From ab41c0cd30cf90b78c90a49b715823879281631b Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 8 Dec 2025 14:34:19 +0000 Subject: [PATCH 3/4] Limit parallel requests --- crates/shared/src/recent_block_cache.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 9b6780b744..10e71e1097 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -30,7 +30,7 @@ use { cached::{Cached, SizedCache}, ethcontract::BlockNumber, ethrpc::block_stream::CurrentBlockWatcher, - futures::{FutureExt, StreamExt}, + futures::{FutureExt, StreamExt, TryStreamExt}, itertools::Itertools, prometheus::IntCounterVec, std::{ @@ -246,15 +246,13 @@ where } async fn fetch_inner_many(&self, keys: HashSet, block: Block) -> Result> { - let fetched = - futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block))) - .await; - let fetched: Vec<_> = fetched - .into_iter() - .filter_map(|res| res.ok()) - .flatten() - .collect(); - Ok(fetched) + let futures = keys.into_iter().map(|key| self.fetch_inner(key, block)); + // only process a limited number of requests in parallel to avoid creating + // a ton of tracing spans at the same time since tracing never deallocates + // span memory to reuse it later + let stream = futures::stream::iter(futures).buffered(50); + let fetched_items: Vec<_> = stream.try_collect().await?; + Ok(fetched_items.into_iter().flatten().collect()) } // Sometimes nodes requests error when we try to get state from what we think is From 8bbdc2514b7183b34960eb1b44311dd2a0dbaa89 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 8 Dec 2025 17:03:15 +0000 Subject: [PATCH 4/4] Revert "Limit parallel requests" This reverts commit ab41c0cd30cf90b78c90a49b715823879281631b. --- crates/shared/src/recent_block_cache.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 10e71e1097..9b6780b744 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -30,7 +30,7 @@ use { cached::{Cached, SizedCache}, ethcontract::BlockNumber, ethrpc::block_stream::CurrentBlockWatcher, - futures::{FutureExt, StreamExt, TryStreamExt}, + futures::{FutureExt, StreamExt}, itertools::Itertools, prometheus::IntCounterVec, std::{ @@ -246,13 +246,15 @@ where } async fn fetch_inner_many(&self, keys: HashSet, block: Block) -> Result> { - let futures = keys.into_iter().map(|key| self.fetch_inner(key, block)); - // only process a limited number of requests in parallel to avoid creating - // a ton of tracing spans at the same time since tracing never deallocates - // span memory to reuse it later - let stream = futures::stream::iter(futures).buffered(50); - let fetched_items: Vec<_> = stream.try_collect().await?; - Ok(fetched_items.into_iter().flatten().collect()) + let fetched = + futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block))) + .await; + let fetched: Vec<_> = fetched + .into_iter() + .filter_map(|res| res.ok()) + .flatten() + .collect(); + Ok(fetched) } // Sometimes nodes requests error when we try to get state from what we think is