diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index c9db4f0f79..8826d0be3a 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -95,6 +95,46 @@ impl Persistence { LeaderLock::new(self.postgres.pool.clone(), key, Duration::from_millis(200)) } + /// Spawns a background task that listens for new order notifications from + /// PostgreSQL and notifies via the provided Notify. + pub fn spawn_order_listener(&self, notify: Arc) { + let pool = self.postgres.pool.clone(); + tokio::spawn(async move { + loop { + let mut listener = match sqlx::postgres::PgListener::connect_with(&pool).await { + Ok(listener) => listener, + Err(err) => { + tracing::error!(?err, "failed to create PostgreSQL listener"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + tracing::info!("connected to PostgreSQL for order notifications"); + + if let Err(err) = listener.listen("new_order").await { + tracing::error!(?err, "failed to listen on 'new_order' channel"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + loop { + match listener.recv().await { + Ok(notification) => { + let order_uid = notification.payload(); + tracing::debug!(order_uid, "received order notification from postgres"); + notify.notify_one(); + } + Err(err) => { + tracing::error!(?err, "error receiving notification from postgres"); + break; + } + } + } + } + }); + } + /// Fetches the ID that should be used for the next auction. pub async fn get_next_auction_id(&self) -> Result { let _timer = Metrics::get() diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 8d964b3e1b..dafe902d63 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -92,6 +92,8 @@ pub struct RunLoop { maintenance: Arc, competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, winner_selection: winner_selection::Arbitrator, + /// Notifier that wakes the main loop on new blocks or orders + wake_notify: Arc, } impl RunLoop { @@ -111,6 +113,13 @@ impl RunLoop { let max_winners = config.max_winners_per_auction.get(); let weth = eth.contracts().wrapped_native_token(); + // Create notifier that wakes the main loop on new blocks or orders + let wake_notify = Arc::new(tokio::sync::Notify::new()); + + // Spawn background tasks to listen for events + persistence.spawn_order_listener(wake_notify.clone()); + Self::spawn_block_listener(eth.current_block().clone(), wake_notify.clone()); + Self { config, eth, @@ -124,6 +133,7 @@ impl RunLoop { maintenance, competition_updates_sender, winner_selection: winner_selection::Arbitrator { max_winners, weth }, + wake_notify, } } @@ -156,6 +166,10 @@ impl RunLoop { .update_caches(&mut last_block, leader_lock_tracker.is_leader()) .await; + // Wait for a new block or order before proceeding (we do this *after* caches + // are loaded to reduce auction start time) + self_arc.wake_notify.notified().await; + // caches are warmed up, we're ready to do leader work if let Some(startup) = self_arc.probes.startup.as_ref() { startup.store(true, Ordering::Release); @@ -181,6 +195,21 @@ impl RunLoop { leader_lock_tracker.release().await; } + /// Spawns a background task that listens for new blocks from the + /// blockchain. + fn spawn_block_listener( + current_block: ethrpc::block_stream::CurrentBlockWatcher, + notify: Arc, + ) { + tokio::spawn(async move { + loop { + ethrpc::block_stream::next_block(¤t_block).await; + tracing::debug!("received new block"); + notify.notify_one(); + } + }); + } + async fn update_caches(&self, prev_block: &mut Option, is_leader: bool) -> BlockInfo { let current_block = *self.eth.current_block().borrow(); let time_since_last_block = current_block.observed_at.elapsed(); diff --git a/crates/e2e/tests/e2e/order_cancellation.rs b/crates/e2e/tests/e2e/order_cancellation.rs index 774c12d59b..98bc7b89ae 100644 --- a/crates/e2e/tests/e2e/order_cancellation.rs +++ b/crates/e2e/tests/e2e/order_cancellation.rs @@ -214,8 +214,10 @@ async fn order_cancellation(web3: Web3) { // Cancel one of them. cancel_order(order_uids[0]).await; - onchain.mint_block().await; wait_for_condition(TIMEOUT, || async { + // continue minting another block to make sure the autopilot eventually + // refreshes its cache + onchain.mint_block().await; services.get_auction().await.auction.orders.len() == 2 }) .await @@ -238,8 +240,10 @@ async fn order_cancellation(web3: Web3) { // Cancel the other two. cancel_orders(vec![order_uids[1], order_uids[2]]).await; - onchain.mint_block().await; wait_for_condition(TIMEOUT, || async { + // continue minting another block to make sure the autopilot eventually + // refreshes its cache + onchain.mint_block().await; services.get_auction().await.auction.orders.is_empty() }) .await diff --git a/crates/e2e/tests/e2e/tracking_insufficient_funds.rs b/crates/e2e/tests/e2e/tracking_insufficient_funds.rs index eea7456e6b..1d38b8af94 100644 --- a/crates/e2e/tests/e2e/tracking_insufficient_funds.rs +++ b/crates/e2e/tests/e2e/tracking_insufficient_funds.rs @@ -144,8 +144,8 @@ async fn test(web3: Web3) { .send_and_watch() .await .unwrap(); - onchain.mint_block().await; let orders_updated = || async { + onchain.mint_block().await; let events_a = crate::database::events_of_order(services.db(), &uid_a).await; let events_b = crate::database::events_of_order(services.db(), &uid_b).await; let order_b_correct_events = events_b.into_iter().map(|e| e.label).collect::>() @@ -166,8 +166,8 @@ async fn test(web3: Web3) { .send_and_watch() .await .unwrap(); - onchain.mint_block().await; let orders_updated = || async { + onchain.mint_block().await; let events_a = crate::database::events_of_order(services.db(), &uid_b).await; let events_b = crate::database::events_of_order(services.db(), &uid_b).await; events_a.last().map(|o| o.label) == Some(OrderEventLabel::Traded) diff --git a/database/sql/V094__add_order_notification_trigger.sql b/database/sql/V094__add_order_notification_trigger.sql new file mode 100644 index 0000000000..ca721454ce --- /dev/null +++ b/database/sql/V094__add_order_notification_trigger.sql @@ -0,0 +1,15 @@ +-- Create a trigger function that notifies on new orders +-- The notification payload contains the hex-encoded order UID +CREATE OR REPLACE FUNCTION notify_new_order() +RETURNS trigger AS $$ +BEGIN + PERFORM pg_notify('new_order', encode(NEW.uid, 'hex')); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create a trigger that fires after each insert on the orders table +CREATE TRIGGER order_insert_notify +AFTER INSERT ON orders +FOR EACH ROW +EXECUTE FUNCTION notify_new_order(); diff --git a/playground/docker-compose.fork.yml b/playground/docker-compose.fork.yml index 6b68b9bef7..17f401cfb2 100644 --- a/playground/docker-compose.fork.yml +++ b/playground/docker-compose.fork.yml @@ -110,7 +110,7 @@ services: environment: - DB_WRITE_URL=postgres://db:5432/?user=${POSTGRES_USER}&password=${POSTGRES_PASSWORD} - DB_READ_URL=postgres://db:5432/?user=${POSTGRES_USER}&password=${POSTGRES_PASSWORD} - - LOG_FILTER=warn,autopilot=info,shared=info,shared::price_estimation=info + - LOG_FILTER=warn,autopilot=debug,shared=info,shared::price_estimation=info - NODE_URL=http://chain:8545 - SIMULATION_NODE_URL=http://chain:8545 - SETTLE_INTERVAL=15s