Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/beefy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
sp-core = { version = "4.1.0-dev", path = "../../primitives/core" }
sp-keystore = { version = "0.10.0", path = "../../primitives/keystore" }
sp-runtime = { version = "4.1.0-dev", path = "../../primitives/runtime" }
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }

sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
Expand Down
9 changes: 6 additions & 3 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use sc_network_gossip::{GossipEngine, Network as GossipNetwork};

use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
pub use sp_consensus::SyncOracle;
use sp_keystore::SyncCryptoStorePtr;
use sp_runtime::traits::Block;

Expand Down Expand Up @@ -112,7 +113,7 @@ where
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
N: GossipNetwork<B> + Clone + Send + 'static,
N: GossipNetwork<B> + Clone + SyncOracle + Send + Sync + 'static,
{
/// BEEFY client
pub client: Arc<C>,
Expand Down Expand Up @@ -143,7 +144,7 @@ where
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
N: GossipNetwork<B> + Clone + Send + 'static,
N: GossipNetwork<B> + Clone + SyncOracle + Send + Sync + 'static,
{
let BeefyParams {
client,
Expand All @@ -157,6 +158,7 @@ where
protocol_name,
} = beefy_params;

let sync_oracle = network.clone();
let gossip_validator = Arc::new(gossip::GossipValidator::new());
let gossip_engine = GossipEngine::new(network, protocol_name, gossip_validator.clone(), None);

Expand Down Expand Up @@ -184,9 +186,10 @@ where
gossip_validator,
min_block_delta,
metrics,
sync_oracle,
};

let worker = worker::BeefyWorker::<_, _, _>::new(worker_params);
let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params);

worker.run().await
}
24 changes: 18 additions & 6 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ use crate::{
metric_inc, metric_set,
metrics::Metrics,
notification::{BeefyBestBlockSender, BeefySignedCommitmentSender},
round, Client,
round, Client, SyncOracle,
};

pub(crate) struct WorkerParams<B, BE, C>
pub(crate) struct WorkerParams<B, BE, C, SO>
where
B: Block,
{
Expand All @@ -63,14 +63,16 @@ where
pub gossip_validator: Arc<GossipValidator<B>>,
pub min_block_delta: u32,
pub metrics: Option<Metrics>,
pub sync_oracle: SO,
}

/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B, C, BE>
pub(crate) struct BeefyWorker<B, C, BE, SO>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE>,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
client: Arc<C>,
backend: Arc<BE>,
Expand All @@ -91,24 +93,27 @@ where
beefy_best_block_sender: BeefyBestBlockSender<B>,
/// Validator set id for the last signed commitment
last_signed_id: u64,
/// Handle to the sync oracle
sync_oracle: SO,
// keep rustc happy
_backend: PhantomData<BE>,
}

impl<B, C, BE> BeefyWorker<B, C, BE>
impl<B, C, BE, SO> BeefyWorker<B, C, BE, SO>
where
B: Block + Codec,
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
/// Return a new BEEFY worker instance.
///
/// Note that a BEEFY worker is only fully functional if a corresponding
/// BEEFY pallet has been deployed on-chain.
///
/// The BEEFY pallet is needed in order to keep track of the BEEFY authority set.
pub(crate) fn new(worker_params: WorkerParams<B, BE, C>) -> Self {
pub(crate) fn new(worker_params: WorkerParams<B, BE, C, SO>) -> Self {
let WorkerParams {
client,
backend,
Expand All @@ -119,6 +124,7 @@ where
gossip_validator,
min_block_delta,
metrics,
sync_oracle,
} = worker_params;

BeefyWorker {
Expand All @@ -136,17 +142,19 @@ where
best_beefy_block: None,
last_signed_id: 0,
beefy_best_block_sender,
sync_oracle,
_backend: PhantomData,
}
}
}

impl<B, C, BE> BeefyWorker<B, C, BE>
impl<B, C, BE, SO> BeefyWorker<B, C, BE, SO>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
/// Return `true`, if we should vote on block `number`
fn should_vote_on(&self, number: NumberFor<B>) -> bool {
Expand Down Expand Up @@ -400,6 +408,10 @@ where
));

loop {
if self.sync_oracle.is_major_syncing() {
debug!(target: "beefy", "Waiting for major sync to complete.");
self.sync_oracle.wait_for_major_syncing().await;
}
let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));

Expand Down
2 changes: 2 additions & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ either = "1.5.3"
fnv = "1.0.6"
fork-tree = { version = "3.0.0", path = "../../utils/fork-tree" }
futures = "0.3.9"
tokio = { version = "1.15.0", features = [ "sync" ] }
tokio-stream = { version = "0.1.8", features = [ "sync" ] }
futures-timer = "3.0.2"
asynchronous-codec = "0.5"
hex = "0.4.0"
Expand Down
49 changes: 47 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use crate::{
sync::{Status as SyncStatus, SyncState},
NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready,
},
transactions, transport, DhtEvent, ExHashT, NetworkStateInfo, NetworkStatus, ReputationChange,
transactions, transport, utils, DhtEvent, ExHashT, NetworkStateInfo, NetworkStatus,
ReputationChange,
};

use codec::Encode as _;
Expand Down Expand Up @@ -85,6 +86,7 @@ use std::{
},
task::Poll,
};
use tokio::sync::watch;

pub use behaviour::{
IfDisconnected, InboundFailure, OutboundFailure, RequestFailure, ResponseFailure,
Expand Down Expand Up @@ -113,6 +115,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
/// Are we actively catching up with the chain?
is_major_syncing: Arc<AtomicBool>,
/// A channel that receives notifications about the major sync state
major_sync_stream: utils::MajorSyncStream<Option<bool>>,
/// Local copy of the `PeerId` of the local node.
local_peer_id: PeerId,
/// The `KeyPair` that defines the `PeerId` of the local node.
Expand Down Expand Up @@ -249,6 +253,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {

let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let (major_sync_sender, major_sync_stream) = utils::MajorSyncStream::new(None);

// Build the swarm.
let client = params.chain.clone();
Expand Down Expand Up @@ -415,6 +420,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
external_addresses: external_addresses.clone(),
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
major_sync_stream,
peerset: peerset_handle,
local_peer_id,
local_identity,
Expand All @@ -438,6 +444,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
external_addresses,
num_connected,
is_major_syncing,
major_sync_sender: Arc::new(major_sync_sender),
network_service: swarm,
service,
import_queue: params.import_queue,
Expand Down Expand Up @@ -1277,6 +1284,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
}
}

#[async_trait::async_trait]
impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for NetworkService<B, H> {
fn is_major_syncing(&mut self) -> bool {
Self::is_major_syncing(self)
Expand All @@ -1285,8 +1293,25 @@ impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for NetworkServic
fn is_offline(&mut self) -> bool {
self.num_connected.load(Ordering::Relaxed) == 0
}
}

async fn wait_for_major_syncing(&mut self) -> () {
self.major_sync_stream
.clone()
.filter(|val| {
if let Some(val) = val {
future::ready(!val)
}
// if the stream yields None, we know it's still at the initial value so we discard
// it
else {
future::ready(false)
}
})
.next()
.await;
}
}
#[async_trait::async_trait]
impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for &'a NetworkService<B, H> {
fn is_major_syncing(&mut self) -> bool {
NetworkService::is_major_syncing(self)
Expand All @@ -1295,6 +1320,23 @@ impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for &'a Netwo
fn is_offline(&mut self) -> bool {
self.num_connected.load(Ordering::Relaxed) == 0
}

async fn wait_for_major_syncing(&mut self) -> () {
self.major_sync_stream
.clone()
.filter(|val| {
if let Some(val) = val {
future::ready(!val)
}
// if the stream yields None, we know it's still at the initial value so we discard
// it
else {
future::ready(false)
}
})
.next()
.await;
}
}

impl<B: BlockT, H: ExHashT> sc_consensus::JustificationSyncLink<B> for NetworkService<B, H> {
Expand Down Expand Up @@ -1457,6 +1499,8 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
num_connected: Arc<AtomicUsize>,
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
is_major_syncing: Arc<AtomicBool>,
/// A channel that sends updates about the major sync state
major_sync_sender: Arc<watch::Sender<Option<bool>>>,
/// The network service that can be extracted and shared through the codebase.
service: Arc<NetworkService<B, H>>,
/// The *actual* network.
Expand Down Expand Up @@ -2076,6 +2120,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.tx_handler_controller.set_gossip_enabled(!is_major_syncing);

this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
let _ = this.major_sync_sender.send(Some(is_major_syncing));

if let Some(metrics) = this.metrics.as_ref() {
for (proto, buckets) in this.network_service.behaviour_mut().num_entries_per_kbucket() {
Expand Down
46 changes: 44 additions & 2 deletions client/network/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use futures::{stream::unfold, FutureExt, Stream, StreamExt};
use futures::{ready, stream::unfold, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use linked_hash_set::LinkedHashSet;
use std::{hash::Hash, num::NonZeroUsize, time::Duration};
use std::{hash::Hash, num::NonZeroUsize, task::Poll, time::Duration};
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;

/// Creates a stream that returns a new value every `duration`.
pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
Expand Down Expand Up @@ -57,6 +59,46 @@ impl<T: Hash + Eq> LruHashSet<T> {
}
}

/// A clone-able stream that yields [`T`].
/// Every instance of the stream will recieve the same data.
/// The stream is implemented on top of a spmc channel
/// see [`tokio::sync::watch::Receiver`]
pub struct MajorSyncStream<T: Clone + Sync + Send + 'static> {
consumer: watch::Receiver<T>,
inner: WatchStream<T>,
}

impl<T: Clone + Sync + Send + 'static> Clone for MajorSyncStream<T> {
fn clone(&self) -> Self {
let consumer = self.consumer.clone();
let inner = WatchStream::new(consumer.clone());
Self { consumer, inner }
}
}

impl<T: Clone + Sync + Send + 'static> MajorSyncStream<T> {
pub fn new(init: T) -> (watch::Sender<T>, Self) {
let (tx, rx) = watch::channel(init);
let consumer = rx.clone();
let inner = WatchStream::new(consumer.clone());
(tx, Self { consumer, inner })
}
}

impl<T: Clone + Sync + Send + 'static> Stream for MajorSyncStream<T> {
type Item = T;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match ready!(self.inner.poll_next_unpin(cx)) {
Some(item) => Poll::Ready(Some(item)),
_ => Poll::Ready(None),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading