Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions src/serve/grpc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::wal::{self, ChainPoint, RawBlock, WalReader as _};

fn u5c_to_chain_point(block_ref: u5c::sync::BlockRef) -> Result<wal::ChainPoint, Status> {
Ok(wal::ChainPoint::Specific(
block_ref.index,
block_ref.slot,
super::convert::bytes_to_hash32(&block_ref.hash)?,
))
}
Expand All @@ -37,11 +37,19 @@ fn raw_to_anychain(mapper: &Mapper<LedgerStore>, body: &BlockBody) -> u5c::sync:
}

fn raw_to_blockref(raw: &wal::RawBlock) -> u5c::sync::BlockRef {
let RawBlock { slot, hash, .. } = raw;
let RawBlock {
slot, hash, body, ..
} = raw;

let height = MultiEraBlock::decode(body)
.map_err(|e| panic!("corrupt WAL entry: {:?}", e))
.unwrap()
.number();

u5c::sync::BlockRef {
index: *slot,
slot: *slot,
hash: hash.to_vec().into(),
height,
}
}

Expand All @@ -65,19 +73,21 @@ fn wal_log_to_tip_response(
}
}

fn point_to_reset_tip_response(point: ChainPoint) -> u5c::sync::FollowTipResponse {
fn point_to_reset_tip_response(point: ChainPoint, height: u64) -> u5c::sync::FollowTipResponse {
match point {
ChainPoint::Origin => u5c::sync::FollowTipResponse {
action: u5c::sync::follow_tip_response::Action::Reset(BlockRef {
slot: 0,
hash: vec![].into(),
index: 0,
height: 0,
})
.into(),
},
ChainPoint::Specific(slot, hash) => u5c::sync::FollowTipResponse {
action: u5c::sync::follow_tip_response::Action::Reset(BlockRef {
slot,
hash: hash.to_vec().into(),
index: slot,
height,
})
.into(),
},
Expand Down Expand Up @@ -122,11 +132,18 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl {
.r#ref
.iter()
.map(|br| {
self.chain
.get_block_by_slot(&br.index)
.map_err(|_| Status::internal("Failed to query chain service."))?
.map(|body| raw_to_anychain(&self.mapper, &body))
.ok_or(Status::not_found(format!("Failed to find block: {:?}", br)))
let body = match br {
BlockRef { hash, .. } if !hash.is_empty() => self.chain.get_block_by_hash(hash),
BlockRef { slot, .. } if *slot != 0 => self.chain.get_block_by_slot(slot),
BlockRef { height, .. } if *height != 0 => {
self.chain.get_block_by_number(height)
}
_ => self.chain.get_block_by_slot(&br.slot),
}
.map_err(|_err| Status::internal("Failed to query chain service."))?
.ok_or(Status::not_found(format!("Failed to find block: {:?}", br)))?;

Ok(raw_to_anychain(&self.mapper, &body))
})
.collect::<Result<Vec<u5c::sync::AnyChainBlock>, Status>>()?;

Expand Down Expand Up @@ -200,7 +217,16 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl {
// the consumer knows what intersection was found and can reset their state
// This would also mimic ouroboros giving a `Rollback` as the first message.

let reset = once(async { Ok(point_to_reset_tip_response(point)) });
let raw_block = self
.wal
.read_block(&point)
.map_err(|_err| Status::internal("can't read WAL"))?;

let height = MultiEraBlock::decode(&raw_block.body)
.map_err(|e| Status::internal(format!("corrupt WAL entry: {:?}", e)))?
.number();

let reset = once(async move { Ok(point_to_reset_tip_response(point, height)) });

let forward =
wal::WalStream::start(self.wal.clone(), from_seq, self.cancellation_token.clone())
Expand All @@ -222,14 +248,17 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl {
.map_err(|e| Status::internal(format!("Unable to read WAL: {:?}", e)))?
.ok_or(Status::internal("chain has no data."))?;

let hash = MultiEraBlock::decode(&body)
.map_err(|_| Status::internal("Failed to decode tip block."))?
.hash();
let block = MultiEraBlock::decode(&body)
.map_err(|_| Status::internal("Failed to decode tip block."))?;

let hash = block.hash();
let height = block.number();

let response = u5c::sync::ReadTipResponse {
tip: Some(BlockRef {
index: slot,
slot,
hash: hash.to_vec().into(),
height,
}),
};

Expand Down
15 changes: 13 additions & 2 deletions src/serve/grpc/watch.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{
model::BlockBody,
state::LedgerStore,
wal::{self, ChainPoint, WalReader as _},
};
use futures_core::Stream;
use futures_util::StreamExt;
use pallas::interop::utxorpc as interop;
use pallas::interop::utxorpc::spec as u5c;
use pallas::interop::utxorpc::{self as interop, Mapper};
use pallas::{
interop::utxorpc::spec::watch::any_chain_tx_pattern::Chain,
ledger::{addresses::Address, traverse::MultiEraBlock},
Expand All @@ -14,6 +15,15 @@ use std::pin::Pin;
use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status};

fn raw_to_anychain(mapper: &Mapper<LedgerStore>, body: &BlockBody) -> u5c::watch::AnyChainBlock {
let block = mapper.map_block_cbor(body);

u5c::watch::AnyChainBlock {
native_bytes: body.to_vec().into(),
chain: u5c::watch::any_chain_block::Chain::Cardano(block).into(),
}
}

fn outputs_match_address(
pattern: &u5c::cardano::AddressPattern,
outputs: &[u5c::cardano::TxOutput],
Expand Down Expand Up @@ -166,6 +176,7 @@ fn block_to_txs(
.is_none_or(|predicate| apply_predicate(predicate, tx))
})
.map(|x| u5c::watch::AnyChainTx {
block: Some(raw_to_anychain(mapper, body)),
chain: Some(u5c::watch::any_chain_tx::Chain::Cardano(x)),
})
.collect()
Expand Down Expand Up @@ -229,7 +240,7 @@ impl u5c::watch::watch_service_server::WatchService for WatchServiceImpl {
let intersect = inner_req
.intersect
.iter()
.map(|x| ChainPoint::Specific(x.index, x.hash.to_vec().as_slice().into()))
.map(|x| ChainPoint::Specific(x.slot, x.hash.to_vec().as_slice().into()))
.collect::<Vec<ChainPoint>>();

let from_seq = if intersect.is_empty() {
Expand Down