diff --git a/chain/ethereum/src/codec.rs b/chain/ethereum/src/codec.rs index 5ee25c2d8e4..9d4e8686d81 100644 --- a/chain/ethereum/src/codec.rs +++ b/chain/ethereum/src/codec.rs @@ -284,6 +284,9 @@ impl Into for &Block { effective_gas_price: None, }) }) + // Transaction receipts will be shared along the code, so we put them into an + // Arc here to avoid excessive cloning. + .map(Arc::new) .collect(), }, // Comment (437a9f17-67cc-478f-80a3-804fe554b227): This Some() will avoid calls in the triggers_in_block diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 62115a00457..2fbf897c563 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1048,9 +1048,10 @@ impl EthereumAdapterTrait for EthereumAdapter { ) }) .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); - graph::tokio_stream::StreamExt::collect::, IngestorError>>( - receipt_stream, - ).boxed() + graph::tokio_stream::StreamExt::collect::< + Result>, IngestorError>, + >(receipt_stream) + .boxed() }; let block_future = @@ -1489,7 +1490,9 @@ pub(crate) fn parse_log_triggers( .logs .iter() .filter(move |log| log_filter.matches(log)) - .map(move |log| EthereumTrigger::Log(Arc::new(log.clone()), Some(receipt.clone()))) + .map(move |log| { + EthereumTrigger::Log(Arc::new(log.clone()), Some(receipt.cheap_clone())) + }) }) .collect() } @@ -1706,7 +1709,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry( hashes: Vec, block_hash: H256, logger: Logger, -) -> Result, IngestorError> { +) -> Result>, IngestorError> { let retry_log_message = format!( "batch eth_getTransactionReceipt RPC call for block {:?}", block_hash @@ -1731,7 +1734,7 @@ async fn fetch_transaction_receipts_in_batch( hashes: Vec, block_hash: H256, logger: Logger, -) -> Result, IngestorError> { +) -> Result>, IngestorError> { let batching_web3 = Web3::new(Batch::new(web3.transport().clone())); let eth = batching_web3.eth(); let receipt_futures = hashes @@ -1750,7 +1753,7 @@ async fn fetch_transaction_receipts_in_batch( let mut collected = vec![]; for receipt in receipt_futures.into_iter() { - collected.push(receipt.await?) + collected.push(Arc::new(receipt.await?)) } Ok(collected) } @@ -1761,7 +1764,7 @@ async fn fetch_transaction_receipt_with_retry( transaction_hash: H256, block_hash: H256, logger: Logger, -) -> Result { +) -> Result, IngestorError> { let logger = logger.cheap_clone(); let retry_log_message = format!( "eth_getTransactionReceipt RPC call for transaction {:?}", @@ -1776,6 +1779,7 @@ async fn fetch_transaction_receipt_with_retry( .and_then(move |some_receipt| { resolve_transaction_receipt(some_receipt, transaction_hash, block_hash, logger) }) + .map(Arc::new) } fn resolve_transaction_receipt( @@ -1897,10 +1901,10 @@ async fn get_transaction_receipts_for_transaction_hashes( adapter: &EthereumAdapter, transaction_hashes_by_block: &HashMap>, logger: Logger, -) -> Result, anyhow::Error> { +) -> Result>, anyhow::Error> { use std::collections::hash_map::Entry::Vacant; - let mut receipts_by_hash: HashMap = HashMap::new(); + let mut receipts_by_hash: HashMap> = HashMap::new(); // Return early if input set is empty if transaction_hashes_by_block.is_empty() { diff --git a/chain/ethereum/src/runtime/abi.rs b/chain/ethereum/src/runtime/abi.rs index af236a18dfb..572dfb77f9f 100644 --- a/chain/ethereum/src/runtime/abi.rs +++ b/chain/ethereum/src/runtime/abi.rs @@ -578,7 +578,7 @@ where } impl ToAscObj> - for (EthereumEventData, Option) + for (EthereumEventData, Option<&TransactionReceipt>) where T: AscType + AscIndexId, B: AscType + AscIndexId, @@ -665,7 +665,7 @@ impl ToAscObj for Log { } } -impl ToAscObj for TransactionReceipt { +impl ToAscObj for &TransactionReceipt { fn to_asc_obj( &self, heap: &mut H, diff --git a/chain/ethereum/src/trigger.rs b/chain/ethereum/src/trigger.rs index 9c4720045ad..c06f251cfc0 100644 --- a/chain/ethereum/src/trigger.rs +++ b/chain/ethereum/src/trigger.rs @@ -47,7 +47,7 @@ pub enum MappingTrigger { transaction: Arc, log: Arc, params: Vec, - receipt: Option, + receipt: Option>, }, Call { block: Arc, @@ -143,7 +143,7 @@ impl blockchain::MappingTrigger for MappingTrigger { >, _, _, - >(heap, &(ethereum_event_data, receipt), gas)? + >(heap, &(ethereum_event_data, receipt.as_deref()), gas)? .erase() } else if api_version >= API_VERSION_0_0_6 { asc_new::< @@ -217,7 +217,7 @@ impl blockchain::MappingTrigger for MappingTrigger { pub enum EthereumTrigger { Block(BlockPtr, EthereumBlockTriggerType), Call(Arc), - Log(Arc, Option), + Log(Arc, Option>), } impl PartialEq for EthereumTrigger { diff --git a/graph/src/components/ethereum/types.rs b/graph/src/components/ethereum/types.rs index a763825d88b..7689bed70de 100644 --- a/graph/src/components/ethereum/types.rs +++ b/graph/src/components/ethereum/types.rs @@ -101,7 +101,7 @@ pub fn evaluate_transaction_status(receipt_status: Option) -> bool { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct EthereumBlock { pub block: Arc, - pub transaction_receipts: Vec, + pub transaction_receipts: Vec>, } #[derive(Debug, Default, Clone, PartialEq)]