From fda3ba73473604302e446f20f775925a994f4188 Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 20 Jan 2023 13:18:11 +0100 Subject: [PATCH 1/2] Network data for sync --- bin/node/src/service.rs | 4 +- bin/runtime/src/lib.rs | 4 +- finality-aleph/src/sync/data.rs | 97 ++++++++++++++++++++++++ finality-aleph/src/sync/mod.rs | 5 +- finality-aleph/src/sync/substrate/mod.rs | 5 +- primitives/src/lib.rs | 2 + 6 files changed, 109 insertions(+), 8 deletions(-) create mode 100644 finality-aleph/src/sync/data.rs diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index 056149b205..1d789c8f0d 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -5,8 +5,8 @@ use std::{ sync::Arc, }; -use aleph_primitives::AlephSessionApi; -use aleph_runtime::{self, opaque::Block, RuntimeApi, MAX_BLOCK_SIZE}; +use aleph_primitives::{AlephSessionApi, MAX_BLOCK_SIZE}; +use aleph_runtime::{self, opaque::Block, RuntimeApi}; use finality_aleph::{ run_nonvalidator_node, run_validator_node, AlephBlockImport, AlephConfig, JustificationNotification, Metrics, MillisecsPerBlock, Protocol, ProtocolNaming, SessionPeriod, diff --git a/bin/runtime/src/lib.rs b/bin/runtime/src/lib.rs index 4231ea0867..586c6bb434 100644 --- a/bin/runtime/src/lib.rs +++ b/bin/runtime/src/lib.rs @@ -32,7 +32,7 @@ pub use primitives::Balance; use primitives::{ staking::MAX_NOMINATORS_REWARDED_PER_VALIDATOR, wrap_methods, ApiError as AlephApiError, AuthorityId as AlephId, SessionAuthorityData, Version as FinalityVersion, ADDRESSES_ENCODING, - DEFAULT_BAN_REASON_LENGTH, DEFAULT_SESSIONS_PER_ERA, DEFAULT_SESSION_PERIOD, + DEFAULT_BAN_REASON_LENGTH, DEFAULT_SESSIONS_PER_ERA, DEFAULT_SESSION_PERIOD, MAX_BLOCK_SIZE, MILLISECS_PER_BLOCK, TOKEN, }; use sp_api::impl_runtime_apis; @@ -132,8 +132,6 @@ pub const NORMAL_DISPATCH_RATIO: Perbill = Perbill::from_percent(75); // The whole process for a single block should take 1s, of which 400ms is for creation, // 200ms for propagation and 400ms for validation. Hence the block weight should be within 400ms. pub const MAX_BLOCK_WEIGHT: Weight = WEIGHT_PER_MILLIS.saturating_mul(400); -// We agreed to 5MB as the block size limit. -pub const MAX_BLOCK_SIZE: u32 = 5 * 1024 * 1024; // The storage deposit is roughly 1 TOKEN per 1kB pub const DEPOSIT_PER_BYTE: Balance = MILLI_AZERO; diff --git a/finality-aleph/src/sync/data.rs b/finality-aleph/src/sync/data.rs new file mode 100644 index 0000000000..9b14ddf0cb --- /dev/null +++ b/finality-aleph/src/sync/data.rs @@ -0,0 +1,97 @@ +use std::mem::size_of; + +use aleph_primitives::MAX_BLOCK_SIZE; +use codec::{Decode, Encode, Error as CodecError, Input as CodecInput}; +use log::warn; + +use crate::{sync::Justification, Version}; + +/// The representation of the database state to be sent to other nodes. +/// In the first version this only contains the top justification. +#[derive(Clone, Debug, Encode, Decode)] +pub struct State { + top_justification: J::Unverified, +} + +/// Data to be sent over the network. +#[derive(Clone, Debug, Encode, Decode)] +pub struct NetworkData { + state: State, +} + +/// Version wrapper around the network data. +#[derive(Clone, Debug)] +pub enum VersionedNetworkData { + // Most likely from the future. + Other(Version, Vec), + V1(NetworkData), +} + +// We need 32 bits, since blocks can be quite sizeable. +type ByteCount = u32; + +// We want to be able to safely send at least 10 blocks at once, so this gives uss a bit of wiggle +// room. +const MAX_SYNC_MESSAGE_SIZE: u32 = MAX_BLOCK_SIZE * 11; + +fn encode_with_version(version: Version, payload: &[u8]) -> Vec { + let size = payload.len().try_into().unwrap_or(ByteCount::MAX); + + if size > MAX_SYNC_MESSAGE_SIZE { + warn!( + "Versioned sync message v{:?} too big during Encode. Size is {:?}. Should be {:?} at max.", + version, + payload.len(), + MAX_SYNC_MESSAGE_SIZE + ); + } + + let mut result = Vec::with_capacity(version.size_hint() + size.size_hint() + payload.len()); + + version.encode_to(&mut result); + size.encode_to(&mut result); + result.extend_from_slice(payload); + + result +} + +impl Encode for VersionedNetworkData { + fn size_hint(&self) -> usize { + use VersionedNetworkData::*; + let version_size = size_of::(); + let byte_count_size = size_of::(); + version_size + + byte_count_size + + match self { + Other(_, payload) => payload.len(), + V1(data) => data.size_hint(), + } + } + + fn encode(&self) -> Vec { + use VersionedNetworkData::*; + match self { + Other(version, payload) => encode_with_version(*version, payload), + V1(data) => encode_with_version(Version(1), &data.encode()), + } + } +} + +impl Decode for VersionedNetworkData { + fn decode(input: &mut I) -> Result { + use VersionedNetworkData::*; + let version = Version::decode(input)?; + let num_bytes = ByteCount::decode(input)?; + match version { + Version(1) => Ok(V1(NetworkData::decode(input)?)), + _ => { + if num_bytes > MAX_SYNC_MESSAGE_SIZE { + Err("Sync message has unknown version and is encoded as more than the maximum size.")?; + }; + let mut payload = vec![0; num_bytes as usize]; + input.read(payload.as_mut_slice())?; + Ok(Other(version, payload)) + } + } + } +} diff --git a/finality-aleph/src/sync/mod.rs b/finality-aleph/src/sync/mod.rs index 207efc5d8f..1baf8f9c42 100644 --- a/finality-aleph/src/sync/mod.rs +++ b/finality-aleph/src/sync/mod.rs @@ -3,6 +3,9 @@ use std::{ hash::Hash, }; +use codec::Codec; + +mod data; mod substrate; mod task_queue; mod ticker; @@ -37,7 +40,7 @@ pub trait Header: Clone { /// The verified justification of a block, including a header. pub trait Justification: Clone { type Header: Header; - type Unverified; + type Unverified: Clone + Codec + Debug; /// The header of the block. fn header(&self) -> &Self::Header; diff --git a/finality-aleph/src/sync/substrate/mod.rs b/finality-aleph/src/sync/substrate/mod.rs index cb5f1274cc..fb6da42384 100644 --- a/finality-aleph/src/sync/substrate/mod.rs +++ b/finality-aleph/src/sync/substrate/mod.rs @@ -1,6 +1,7 @@ use std::hash::{Hash, Hasher}; use aleph_primitives::BlockNumber; +use codec::{Decode, Encode}; use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One}; use crate::{ @@ -15,7 +16,7 @@ mod verification; pub use verification::SessionVerifier; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)] pub struct BlockId> { hash: H::Hash, number: H::Number, @@ -58,7 +59,7 @@ impl> Header for H { } /// A justification, including the related header. -#[derive(Clone)] +#[derive(Clone, Debug, Encode, Decode)] pub struct Justification> { header: H, raw_justification: AlephJustification, diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 94220796dd..61753d47cb 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -39,6 +39,8 @@ pub type SessionCount = u32; pub type BlockCount = u32; pub const MILLISECS_PER_BLOCK: u64 = 1000; +// We agreed to 5MB as the block size limit. +pub const MAX_BLOCK_SIZE: u32 = 5 * 1024 * 1024; // Quick sessions for testing purposes #[cfg(feature = "short_session")] From fd41a1f6f707f329b413cb9de42375edfcb21ada Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 20 Jan 2023 16:24:52 +0100 Subject: [PATCH 2/2] Actually remembered we need one more kind of message in this version --- finality-aleph/src/sync/data.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/finality-aleph/src/sync/data.rs b/finality-aleph/src/sync/data.rs index 9b14ddf0cb..ccf5b263dd 100644 --- a/finality-aleph/src/sync/data.rs +++ b/finality-aleph/src/sync/data.rs @@ -15,8 +15,13 @@ pub struct State { /// Data to be sent over the network. #[derive(Clone, Debug, Encode, Decode)] -pub struct NetworkData { - state: State, +pub enum NetworkData { + /// A periodic state broadcast, so that neighbouring nodes can request what they are missing, + /// send what we are missing, and sometines just use the justifications to update their own + /// state. + StateBroadcast(State), + /// A series of justifications, sent to a node that is clearly behind. + Justifications(Vec, State), } /// Version wrapper around the network data.