Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions core/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use cnetwork::NodeId;
use ctypes::{BlockHash, TxHash};
use ctypes::BlockHash;

/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify: Send + Sync {
Expand All @@ -28,9 +27,4 @@ pub trait ChainNotify: Send + Sync {
fn new_blocks(&self, _imported: Vec<BlockHash>, _invalid: Vec<BlockHash>, _enacted: Vec<BlockHash>) {
// does nothing by default
}

/// fires when new transactions are received from a peer
fn transactions_received(&self, _hashes: Vec<TxHash>, _peer_id: NodeId) {
// does nothing by default
}
}
15 changes: 3 additions & 12 deletions core/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::MemPoolMinFees;
use cdb::{new_journaldb, Algorithm, AsHashDB};
use cio::IoChannel;
use ckey::{Address, NetworkId, PlatformAddress};
use cnetwork::NodeId;
use cstate::{ActionHandler, FindActionHandler, StateDB, StateResult, TopLevelState, TopStateView};
use ctimer::{TimeoutHandler, TimerApi, TimerScheduleError, TimerToken};
use ctypes::header::Header;
Expand Down Expand Up @@ -138,12 +137,6 @@ impl Client {
self.notify.write().push(target);
}

pub fn transactions_received(&self, hashes: &[TxHash], peer_id: NodeId) {
self.notify(|notify| {
notify.transactions_received(hashes.to_vec(), peer_id);
});
}

pub fn new_blocks(&self, imported: &[BlockHash], invalid: &[BlockHash], enacted: &[BlockHash]) {
self.notify(|notify| notify.new_blocks(imported.to_vec(), invalid.to_vec(), enacted.to_vec()));
}
Expand Down Expand Up @@ -202,13 +195,11 @@ impl Client {
}

/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: NodeId) -> usize {
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
ctrace!(EXTERNAL_TX, "Importing queued");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let transactions: Vec<UnverifiedTransaction> =
transactions.iter().filter_map(|bytes| Rlp::new(bytes).as_val().ok()).collect();
let hashes: Vec<_> = transactions.iter().map(UnverifiedTransaction::hash).collect();
self.transactions_received(&hashes, peer_id);
let results = self.importer.miner.import_external_transactions(self, transactions);
results.len()
}
Expand Down Expand Up @@ -535,14 +526,14 @@ impl BlockChainClient for Client {
Ok(())
}

fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: NodeId) {
fn queue_transactions(&self, transactions: Vec<Bytes>) {
let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
ctrace!(EXTERNAL_TX, "Queue size: {}", queue_size);
if queue_size > MAX_MEM_POOL_SIZE {
cwarn!(EXTERNAL_TX, "Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) {
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::transaction::{LocalizedTransaction, PendingVerifiedTransactions, Veri
use crate::types::{BlockId, BlockStatus, TransactionId, VerificationQueueInfo as BlockQueueInfo};
use cdb::DatabaseError;
use ckey::{Address, NetworkId, PlatformAddress};
use cnetwork::NodeId;
use cstate::{FindActionHandler, StateResult, TopLevelState, TopStateView};
use ctypes::header::Header;
use ctypes::transaction::ShardTransaction;
Expand Down Expand Up @@ -198,7 +197,7 @@ pub trait BlockChainClient: Sync + Send + AccountData + BlockChainTrait + Import
fn queue_own_transaction(&self, transaction: VerifiedTransaction) -> Result<(), GenericError>;

/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: NodeId);
fn queue_transactions(&self, transactions: Vec<Bytes>);

/// Delete all pending transactions.
fn delete_all_pending_transactions(&self);
Expand Down
3 changes: 1 addition & 2 deletions core/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use ckey::{
public_to_address, Address, Ed25519KeyPair as KeyPair, Ed25519Private as Private, Ed25519Public as Public,
Generator, KeyPairTrait, NetworkId, PlatformAddress, Random,
};
use cnetwork::NodeId;
use cstate::tests::helpers::empty_top_state_with_metadata;
use cstate::{FindActionHandler, StateDB, TopLevelState};
use ctimer::{TimeoutHandler, TimerToken};
Expand Down Expand Up @@ -508,7 +507,7 @@ impl BlockChainClient for TestBlockChainClient {
Ok(())
}

fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: NodeId) {
fn queue_transactions(&self, transactions: Vec<Bytes>) {
// import right here
let transactions = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, transactions);
Expand Down
7 changes: 3 additions & 4 deletions core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::miner::Miner;
use crate::scheme::Scheme;
use crate::BlockId;
use cio::{IoContext, IoHandler, IoHandlerResult, IoService};
use cnetwork::NodeId;
use ctimer::TimerApi;
use ctypes::BlockHash;
use kvdb::KeyValueDB;
Expand Down Expand Up @@ -71,7 +70,7 @@ pub enum ClientIoMessage {
/// A header is ready
HeaderVerified,
/// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>, NodeId),
NewTransactions(Vec<Bytes>),
/// Block generation is required
NewBlockRequired {
parent_block: BlockId,
Expand All @@ -95,8 +94,8 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
ClientIoMessage::HeaderVerified => {
self.client.import_verified_headers();
}
ClientIoMessage::NewTransactions(transactions, peer_id) => {
self.client.import_queued_transactions(&transactions, peer_id);
ClientIoMessage::NewTransactions(transactions) => {
self.client.import_queued_transactions(&transactions);
}
ClientIoMessage::NewBlockRequired {
parent_block,
Expand Down
1 change: 0 additions & 1 deletion sync/src/transaction/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ impl NetworkExtension<Never> for Extension {

self.client.queue_transactions(
transactions.iter().map(|unverified| unverified.rlp_bytes().to_vec()).collect(),
*token,
);
if let Some(peer) = self.peers.get_mut(token) {
let transactions: Vec<_> = transactions
Expand Down