From be507bdcd77e288ae61ccbc2897c66ac0e6c1ac5 Mon Sep 17 00:00:00 2001 From: tilacog Date: Thu, 7 Apr 2022 14:50:18 -0300 Subject: [PATCH 1/2] chain, graph: Pass `TransactionReceipts` around inside `Arc`s --- chain/ethereum/src/codec.rs | 3 +++ chain/ethereum/src/ethereum_adapter.rs | 9 +++++++-- chain/ethereum/src/runtime/abi.rs | 4 ++-- chain/ethereum/src/trigger.rs | 6 +++--- graph/src/components/ethereum/types.rs | 2 +- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/chain/ethereum/src/codec.rs b/chain/ethereum/src/codec.rs index 5ee25c2d8e4..9d4e8686d81 100644 --- a/chain/ethereum/src/codec.rs +++ b/chain/ethereum/src/codec.rs @@ -284,6 +284,9 @@ impl Into for &Block { effective_gas_price: None, }) }) + // Transaction receipts will be shared along the code, so we put them into an + // Arc here to avoid excessive cloning. + .map(Arc::new) .collect(), }, // Comment (437a9f17-67cc-478f-80a3-804fe554b227): This Some() will avoid calls in the triggers_in_block diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 62115a00457..1f53167eefa 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1055,6 +1055,9 @@ impl EthereumAdapterTrait for EthereumAdapter { let block_future = futures03::TryFutureExt::map_ok(receipts_future, move |transaction_receipts| { + // TODO: FIXME: This allocates an additional vector. It would be more efficient if + // we get a `Vec>` directly instead of doing this. + let transaction_receipts = transaction_receipts.into_iter().map(Arc::new).collect(); EthereumBlock { block: Arc::new(block), transaction_receipts, @@ -1489,7 +1492,9 @@ pub(crate) fn parse_log_triggers( .logs .iter() .filter(move |log| log_filter.matches(log)) - .map(move |log| EthereumTrigger::Log(Arc::new(log.clone()), Some(receipt.clone()))) + .map(move |log| { + EthereumTrigger::Log(Arc::new(log.clone()), Some(receipt.cheap_clone())) + }) }) .collect() } @@ -1885,7 +1890,7 @@ async fn get_logs_and_transactions( let optional_receipt = log .transaction_hash .and_then(|txn| transaction_receipts_by_hash.get(&txn).cloned()); - let value = EthereumTrigger::Log(Arc::new(log), optional_receipt); + let value = EthereumTrigger::Log(Arc::new(log), optional_receipt.map(Arc::new)); log_triggers.push(value); } diff --git a/chain/ethereum/src/runtime/abi.rs b/chain/ethereum/src/runtime/abi.rs index af236a18dfb..572dfb77f9f 100644 --- a/chain/ethereum/src/runtime/abi.rs +++ b/chain/ethereum/src/runtime/abi.rs @@ -578,7 +578,7 @@ where } impl ToAscObj> - for (EthereumEventData, Option) + for (EthereumEventData, Option<&TransactionReceipt>) where T: AscType + AscIndexId, B: AscType + AscIndexId, @@ -665,7 +665,7 @@ impl ToAscObj for Log { } } -impl ToAscObj for TransactionReceipt { +impl ToAscObj for &TransactionReceipt { fn to_asc_obj( &self, heap: &mut H, diff --git a/chain/ethereum/src/trigger.rs b/chain/ethereum/src/trigger.rs index 9c4720045ad..c06f251cfc0 100644 --- a/chain/ethereum/src/trigger.rs +++ b/chain/ethereum/src/trigger.rs @@ -47,7 +47,7 @@ pub enum MappingTrigger { transaction: Arc, log: Arc, params: Vec, - receipt: Option, + receipt: Option>, }, Call { block: Arc, @@ -143,7 +143,7 @@ impl blockchain::MappingTrigger for MappingTrigger { >, _, _, - >(heap, &(ethereum_event_data, receipt), gas)? + >(heap, &(ethereum_event_data, receipt.as_deref()), gas)? .erase() } else if api_version >= API_VERSION_0_0_6 { asc_new::< @@ -217,7 +217,7 @@ impl blockchain::MappingTrigger for MappingTrigger { pub enum EthereumTrigger { Block(BlockPtr, EthereumBlockTriggerType), Call(Arc), - Log(Arc, Option), + Log(Arc, Option>), } impl PartialEq for EthereumTrigger { diff --git a/graph/src/components/ethereum/types.rs b/graph/src/components/ethereum/types.rs index a763825d88b..7689bed70de 100644 --- a/graph/src/components/ethereum/types.rs +++ b/graph/src/components/ethereum/types.rs @@ -101,7 +101,7 @@ pub fn evaluate_transaction_status(receipt_status: Option) -> bool { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct EthereumBlock { pub block: Arc, - pub transaction_receipts: Vec, + pub transaction_receipts: Vec>, } #[derive(Debug, Default, Clone, PartialEq)] From 21ed984e6190e06f033a372b6b8356c816da233c Mon Sep 17 00:00:00 2001 From: tilacog Date: Thu, 7 Apr 2022 15:02:01 -0300 Subject: [PATCH 2/2] chain/ethereum: Return Arc from more functions This commit preventively puts `TransactionReceipt` into `Arc`s to avoid an unecessary vector allocation. --- chain/ethereum/src/ethereum_adapter.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 1f53167eefa..2fbf897c563 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1048,16 +1048,14 @@ impl EthereumAdapterTrait for EthereumAdapter { ) }) .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); - graph::tokio_stream::StreamExt::collect::, IngestorError>>( - receipt_stream, - ).boxed() + graph::tokio_stream::StreamExt::collect::< + Result>, IngestorError>, + >(receipt_stream) + .boxed() }; let block_future = futures03::TryFutureExt::map_ok(receipts_future, move |transaction_receipts| { - // TODO: FIXME: This allocates an additional vector. It would be more efficient if - // we get a `Vec>` directly instead of doing this. - let transaction_receipts = transaction_receipts.into_iter().map(Arc::new).collect(); EthereumBlock { block: Arc::new(block), transaction_receipts, @@ -1711,7 +1709,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry( hashes: Vec, block_hash: H256, logger: Logger, -) -> Result, IngestorError> { +) -> Result>, IngestorError> { let retry_log_message = format!( "batch eth_getTransactionReceipt RPC call for block {:?}", block_hash @@ -1736,7 +1734,7 @@ async fn fetch_transaction_receipts_in_batch( hashes: Vec, block_hash: H256, logger: Logger, -) -> Result, IngestorError> { +) -> Result>, IngestorError> { let batching_web3 = Web3::new(Batch::new(web3.transport().clone())); let eth = batching_web3.eth(); let receipt_futures = hashes @@ -1755,7 +1753,7 @@ async fn fetch_transaction_receipts_in_batch( let mut collected = vec![]; for receipt in receipt_futures.into_iter() { - collected.push(receipt.await?) + collected.push(Arc::new(receipt.await?)) } Ok(collected) } @@ -1766,7 +1764,7 @@ async fn fetch_transaction_receipt_with_retry( transaction_hash: H256, block_hash: H256, logger: Logger, -) -> Result { +) -> Result, IngestorError> { let logger = logger.cheap_clone(); let retry_log_message = format!( "eth_getTransactionReceipt RPC call for transaction {:?}", @@ -1781,6 +1779,7 @@ async fn fetch_transaction_receipt_with_retry( .and_then(move |some_receipt| { resolve_transaction_receipt(some_receipt, transaction_hash, block_hash, logger) }) + .map(Arc::new) } fn resolve_transaction_receipt( @@ -1890,7 +1889,7 @@ async fn get_logs_and_transactions( let optional_receipt = log .transaction_hash .and_then(|txn| transaction_receipts_by_hash.get(&txn).cloned()); - let value = EthereumTrigger::Log(Arc::new(log), optional_receipt.map(Arc::new)); + let value = EthereumTrigger::Log(Arc::new(log), optional_receipt); log_triggers.push(value); } @@ -1902,10 +1901,10 @@ async fn get_transaction_receipts_for_transaction_hashes( adapter: &EthereumAdapter, transaction_hashes_by_block: &HashMap>, logger: Logger, -) -> Result, anyhow::Error> { +) -> Result>, anyhow::Error> { use std::collections::hash_map::Entry::Vacant; - let mut receipts_by_hash: HashMap = HashMap::new(); + let mut receipts_by_hash: HashMap> = HashMap::new(); // Return early if input set is empty if transaction_hashes_by_block.is_empty() {