From 697993d216353b8ec17d8d438e039a643bea1c76 Mon Sep 17 00:00:00 2001 From: jjy Date: Tue, 26 Sep 2023 15:37:08 +0800 Subject: [PATCH] fix: check monitor threads are still working after ibc tests --- Cargo.lock | 1 + crates/relayer/Cargo.toml | 1 + crates/relayer/src/chain/axon/monitor.rs | 16 +++++++++++++--- tools/ibc-test/src/framework/binary/channel.rs | 6 +++++- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 702df4781..985663034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3707,6 +3707,7 @@ dependencies = [ "tiny-bip39", "tiny-keccak", "tokio", + "tokio-stream", "toml 0.5.11", "tonic", "tracing", diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index 4ef5e839d..e0c1222aa 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -49,6 +49,7 @@ tokio = { version = "1.0", features = [ "sync", "parking_lot", ] } +tokio-stream = "0.1" serde_json = { version = "1" } bytes = "1.4.0" prost = { version = "0.11" } diff --git a/crates/relayer/src/chain/axon/monitor.rs b/crates/relayer/src/chain/axon/monitor.rs index c1113a0bd..f42de44e5 100644 --- a/crates/relayer/src/chain/axon/monitor.rs +++ b/crates/relayer/src/chain/axon/monitor.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use super::contract::*; use crate::chain::axon::cache_ics_tx_hash_with_event; @@ -10,6 +11,7 @@ use ethers::prelude::*; use ethers::providers::Middleware; use ethers::types::Address; use ibc_relayer_types::Height; +use tokio_stream::StreamExt; use OwnableIBCHandler as Contract; use OwnableIBCHandlerEvents as ContractEvents; @@ -155,20 +157,28 @@ impl AxonEventMonitor { } fn run_loop(&mut self) -> Next { + const TIMEOUT_MILLIS: u64 = 300; + let contract = Arc::new(Contract::new( self.contract_address, Arc::clone(&self.client), )); let events = contract.events().from_block(self.start_block_number); - if let Ok(stream) = self.rt.block_on(events.stream()) { - let mut meta_stream = stream.with_meta(); + if let Ok(mut meta_stream) = self.rt.block_on(async { + events.stream().await.map(|stream| { + let meta_stream = stream.with_meta().timeout_repeating(tokio::time::interval( + Duration::from_millis(TIMEOUT_MILLIS), + )); + meta_stream + }) + }) { debug!("setup IBC contract events streaming process"); loop { if let Next::Abort = self.update_subscribe(true) { return Next::Abort; } - if let Some(ret) = self.rt.block_on(meta_stream.next()) { + if let Some(Ok(ret)) = self.rt.block_on(meta_stream.next()) { match ret { Ok((event, meta)) => { self.process_event(event, meta).unwrap_or_else(|e| { diff --git a/tools/ibc-test/src/framework/binary/channel.rs b/tools/ibc-test/src/framework/binary/channel.rs index 887b44497..76a84d5ed 100644 --- a/tools/ibc-test/src/framework/binary/channel.rs +++ b/tools/ibc-test/src/framework/binary/channel.rs @@ -108,6 +108,10 @@ where chains: ConnectedChains, channel: ConnectedChannel, ) -> Result<(), Error> { - self.test.run(config, relayer, chains, channel) + self.test.run(config, relayer, chains.clone(), channel)?; + log::info!("check monitor threads are still working"); + let _ = chains.handle_a().subscribe().map_err(Error::relayer)?; + let _ = chains.handle_b().subscribe().map_err(Error::relayer)?; + Ok(()) } }