diff --git a/src/serve/grpc/sync.rs b/src/serve/grpc/sync.rs index dd4aec402..abe91a597 100644 --- a/src/serve/grpc/sync.rs +++ b/src/serve/grpc/sync.rs @@ -17,7 +17,7 @@ use crate::wal::{self, ChainPoint, RawBlock, WalReader as _}; fn u5c_to_chain_point(block_ref: u5c::sync::BlockRef) -> Result { Ok(wal::ChainPoint::Specific( - block_ref.index, + block_ref.slot, super::convert::bytes_to_hash32(&block_ref.hash)?, )) } @@ -37,11 +37,19 @@ fn raw_to_anychain(mapper: &Mapper, body: &BlockBody) -> u5c::sync: } fn raw_to_blockref(raw: &wal::RawBlock) -> u5c::sync::BlockRef { - let RawBlock { slot, hash, .. } = raw; + let RawBlock { + slot, hash, body, .. + } = raw; + + let height = MultiEraBlock::decode(body) + .map_err(|e| panic!("corrupt WAL entry: {:?}", e)) + .unwrap() + .number(); u5c::sync::BlockRef { - index: *slot, + slot: *slot, hash: hash.to_vec().into(), + height, } } @@ -65,19 +73,21 @@ fn wal_log_to_tip_response( } } -fn point_to_reset_tip_response(point: ChainPoint) -> u5c::sync::FollowTipResponse { +fn point_to_reset_tip_response(point: ChainPoint, height: u64) -> u5c::sync::FollowTipResponse { match point { ChainPoint::Origin => u5c::sync::FollowTipResponse { action: u5c::sync::follow_tip_response::Action::Reset(BlockRef { + slot: 0, hash: vec![].into(), - index: 0, + height: 0, }) .into(), }, ChainPoint::Specific(slot, hash) => u5c::sync::FollowTipResponse { action: u5c::sync::follow_tip_response::Action::Reset(BlockRef { + slot, hash: hash.to_vec().into(), - index: slot, + height, }) .into(), }, @@ -122,11 +132,18 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl { .r#ref .iter() .map(|br| { - self.chain - .get_block_by_slot(&br.index) - .map_err(|_| Status::internal("Failed to query chain service."))? - .map(|body| raw_to_anychain(&self.mapper, &body)) - .ok_or(Status::not_found(format!("Failed to find block: {:?}", br))) + let body = match br { + BlockRef { hash, .. } if !hash.is_empty() => self.chain.get_block_by_hash(hash), + BlockRef { slot, .. } if *slot != 0 => self.chain.get_block_by_slot(slot), + BlockRef { height, .. } if *height != 0 => { + self.chain.get_block_by_number(height) + } + _ => self.chain.get_block_by_slot(&br.slot), + } + .map_err(|_err| Status::internal("Failed to query chain service."))? + .ok_or(Status::not_found(format!("Failed to find block: {:?}", br)))?; + + Ok(raw_to_anychain(&self.mapper, &body)) }) .collect::, Status>>()?; @@ -200,7 +217,16 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl { // the consumer knows what intersection was found and can reset their state // This would also mimic ouroboros giving a `Rollback` as the first message. - let reset = once(async { Ok(point_to_reset_tip_response(point)) }); + let raw_block = self + .wal + .read_block(&point) + .map_err(|_err| Status::internal("can't read WAL"))?; + + let height = MultiEraBlock::decode(&raw_block.body) + .map_err(|e| Status::internal(format!("corrupt WAL entry: {:?}", e)))? + .number(); + + let reset = once(async move { Ok(point_to_reset_tip_response(point, height)) }); let forward = wal::WalStream::start(self.wal.clone(), from_seq, self.cancellation_token.clone()) @@ -222,14 +248,17 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl { .map_err(|e| Status::internal(format!("Unable to read WAL: {:?}", e)))? .ok_or(Status::internal("chain has no data."))?; - let hash = MultiEraBlock::decode(&body) - .map_err(|_| Status::internal("Failed to decode tip block."))? - .hash(); + let block = MultiEraBlock::decode(&body) + .map_err(|_| Status::internal("Failed to decode tip block."))?; + + let hash = block.hash(); + let height = block.number(); let response = u5c::sync::ReadTipResponse { tip: Some(BlockRef { - index: slot, + slot, hash: hash.to_vec().into(), + height, }), }; diff --git a/src/serve/grpc/watch.rs b/src/serve/grpc/watch.rs index c2b6df113..2cacf0fb1 100644 --- a/src/serve/grpc/watch.rs +++ b/src/serve/grpc/watch.rs @@ -1,11 +1,12 @@ use crate::{ + model::BlockBody, state::LedgerStore, wal::{self, ChainPoint, WalReader as _}, }; use futures_core::Stream; use futures_util::StreamExt; -use pallas::interop::utxorpc as interop; use pallas::interop::utxorpc::spec as u5c; +use pallas::interop::utxorpc::{self as interop, Mapper}; use pallas::{ interop::utxorpc::spec::watch::any_chain_tx_pattern::Chain, ledger::{addresses::Address, traverse::MultiEraBlock}, @@ -14,6 +15,15 @@ use std::pin::Pin; use tokio_util::sync::CancellationToken; use tonic::{Request, Response, Status}; +fn raw_to_anychain(mapper: &Mapper, body: &BlockBody) -> u5c::watch::AnyChainBlock { + let block = mapper.map_block_cbor(body); + + u5c::watch::AnyChainBlock { + native_bytes: body.to_vec().into(), + chain: u5c::watch::any_chain_block::Chain::Cardano(block).into(), + } +} + fn outputs_match_address( pattern: &u5c::cardano::AddressPattern, outputs: &[u5c::cardano::TxOutput], @@ -166,6 +176,7 @@ fn block_to_txs( .is_none_or(|predicate| apply_predicate(predicate, tx)) }) .map(|x| u5c::watch::AnyChainTx { + block: Some(raw_to_anychain(mapper, body)), chain: Some(u5c::watch::any_chain_tx::Chain::Cardano(x)), }) .collect() @@ -229,7 +240,7 @@ impl u5c::watch::watch_service_server::WatchService for WatchServiceImpl { let intersect = inner_req .intersect .iter() - .map(|x| ChainPoint::Specific(x.index, x.hash.to_vec().as_slice().into())) + .map(|x| ChainPoint::Specific(x.slot, x.hash.to_vec().as_slice().into())) .collect::>(); let from_seq = if intersect.is_empty() {