Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/src/accountant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ use std::path::Path;
use std::rc::Rc;
use std::time::SystemTime;
use web3::types::H256;
use crate::accountant::scanners::pending_payable_scanner::utils::PendingPayableScanResult;
use crate::accountant::scanners::scan_schedulers::{PayableSequenceScanner, ScanRescheduleAfterEarlyStop, ScanSchedulers};
use crate::accountant::scanners::scanners_utils::payable_scanner_utils::OperationOutcome;
use crate::accountant::scanners::scanners_utils::pending_payable_scanner_utils::PendingPayableScanResult;
use crate::blockchain::blockchain_interface::blockchain_interface_web3::lower_level_interface_web3::TransactionReceiptResult;

pub const CRASH_KEY: &str = "ACCOUNTANT";
Expand Down
1,176 changes: 31 additions & 1,145 deletions node/src/accountant/scanners/mod.rs

Large diffs are not rendered by default.

804 changes: 804 additions & 0 deletions node/src/accountant/scanners/pending_payable_scanner/mod.rs

Large diffs are not rendered by default.

191 changes: 191 additions & 0 deletions node/src/accountant/scanners/pending_payable_scanner/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use crate::accountant::PendingPayableId;
use crate::blockchain::blockchain_bridge::PendingPayableFingerprint;
use masq_lib::logger::Logger;
use masq_lib::ui_gateway::NodeToUiMessage;
use std::time::SystemTime;

#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct PendingPayableScanReport {
pub still_pending: Vec<PendingPayableId>,
pub failures: Vec<PendingPayableId>,
pub confirmed: Vec<PendingPayableFingerprint>,
}

impl PendingPayableScanReport {
pub fn requires_payments_retry(&self) -> bool {
todo!("complete my within GH-642")
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum PendingPayableScanResult {
NoPendingPayablesLeft(Option<NodeToUiMessage>),
PaymentRetryRequired,
}

pub fn elapsed_in_ms(timestamp: SystemTime) -> u128 {
timestamp
.elapsed()
.expect("time calculation for elapsed failed")
.as_millis()
}

pub fn handle_none_status(
mut scan_report: PendingPayableScanReport,
fingerprint: PendingPayableFingerprint,
max_pending_interval: u64,
logger: &Logger,
) -> PendingPayableScanReport {
info!(
logger,
"Pending transaction {:?} couldn't be confirmed at attempt \
{} at {}ms after its sending",
fingerprint.hash,
fingerprint.attempt,
elapsed_in_ms(fingerprint.timestamp)
);
let elapsed = fingerprint
.timestamp
.elapsed()
.expect("we should be older now");
let elapsed = elapsed.as_secs();
if elapsed > max_pending_interval {
error!(
logger,
"Pending transaction {:?} has exceeded the maximum pending time \
({}sec) with the age {}sec and the confirmation process is going to be aborted now \
at the final attempt {}; manual resolution is required from the \
user to complete the transaction.",
fingerprint.hash,
max_pending_interval,
elapsed,
fingerprint.attempt
);
scan_report.failures.push(fingerprint.into())
} else {
scan_report.still_pending.push(fingerprint.into())
}
scan_report
}

pub fn handle_status_with_success(
mut scan_report: PendingPayableScanReport,
fingerprint: PendingPayableFingerprint,
logger: &Logger,
) -> PendingPayableScanReport {
info!(
logger,
"Transaction {:?} has been added to the blockchain; detected locally at attempt \
{} at {}ms after its sending",
fingerprint.hash,
fingerprint.attempt,
elapsed_in_ms(fingerprint.timestamp)
);
scan_report.confirmed.push(fingerprint);
scan_report
}

//TODO: failures handling is going to need enhancement suggested by GH-693
pub fn handle_status_with_failure(
mut scan_report: PendingPayableScanReport,
fingerprint: PendingPayableFingerprint,
logger: &Logger,
) -> PendingPayableScanReport {
error!(
logger,
"Pending transaction {:?} announced as a failure, interpreting attempt \
{} after {}ms from the sending",
fingerprint.hash,
fingerprint.attempt,
elapsed_in_ms(fingerprint.timestamp)
);
scan_report.failures.push(fingerprint.into());
scan_report
}

pub fn handle_none_receipt(
mut scan_report: PendingPayableScanReport,
payable: PendingPayableFingerprint,
error_msg: &str,
logger: &Logger,
) -> PendingPayableScanReport {
debug!(
logger,
"Interpreting a receipt for transaction {:?} but {}; attempt {}, {}ms since sending",
payable.hash,
error_msg,
payable.attempt,
elapsed_in_ms(payable.timestamp)
);

scan_report
.still_pending
.push(PendingPayableId::new(payable.rowid, payable.hash));
scan_report
}

#[cfg(test)]
mod tests {

#[test]
fn requires_payments_retry_says_yes() {
todo!("complete this test with GH-604")
// let cases = vec![
// PendingPayableScanReport {
// still_pending: vec![PendingPayableId::new(12, make_tx_hash(456))],
// failures: vec![],
// confirmed: vec![],
// },
// PendingPayableScanReport {
// still_pending: vec![],
// failures: vec![PendingPayableId::new(456, make_tx_hash(1234))],
// confirmed: vec![],
// },
// PendingPayableScanReport {
// still_pending: vec![PendingPayableId::new(12, make_tx_hash(456))],
// failures: vec![PendingPayableId::new(456, make_tx_hash(1234))],
// confirmed: vec![],
// },
// PendingPayableScanReport {
// still_pending: vec![PendingPayableId::new(12, make_tx_hash(456))],
// failures: vec![PendingPayableId::new(456, make_tx_hash(1234))],
// confirmed: vec![make_pending_payable_fingerprint()],
// },
// PendingPayableScanReport {
// still_pending: vec![PendingPayableId::new(12, make_tx_hash(456))],
// failures: vec![],
// confirmed: vec![make_pending_payable_fingerprint()],
// },
// PendingPayableScanReport {
// still_pending: vec![],
// failures: vec![PendingPayableId::new(456, make_tx_hash(1234))],
// confirmed: vec![make_pending_payable_fingerprint()],
// },
// ];
//
// cases.into_iter().enumerate().for_each(|(idx, case)| {
// let result = case.requires_payments_retry();
// assert_eq!(
// result, true,
// "We expected true, but got false for case of idx {}",
// idx
// )
// })
}

#[test]
fn requires_payments_retry_says_no() {
todo!("complete this test with GH-604")
// let report = PendingPayableScanReport {
// still_pending: vec![],
// failures: vec![],
// confirmed: vec![make_pending_payable_fingerprint()],
// };
//
// let result = report.requires_payments_retry();
//
// assert_eq!(result, false)
}
}
195 changes: 195 additions & 0 deletions node/src/accountant/scanners/receivable_scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

pub mod utils;

use crate::accountant::db_access_objects::banned_dao::BannedDao;
use crate::accountant::db_access_objects::receivable_dao::ReceivableDao;
use crate::accountant::scanners::receivable_scanner::utils::balance_and_age;
use crate::accountant::scanners::{
PrivateScanner, Scanner, ScannerCommon, StartScanError, StartableScanner,
};
use crate::accountant::{ReceivedPayments, ResponseSkeleton, ScanForReceivables};
use crate::blockchain::blockchain_bridge::{BlockMarker, RetrieveTransactions};
use crate::db_config::persistent_configuration::PersistentConfiguration;
use crate::sub_lib::accountant::{FinancialStatistics, PaymentThresholds};
use crate::sub_lib::wallet::Wallet;
use crate::time_marking_methods;
use masq_lib::logger::Logger;
use masq_lib::messages::{ScanType, ToMessageBody, UiScanResponse};
use masq_lib::ui_gateway::{MessageTarget, NodeToUiMessage};
use std::cell::RefCell;
use std::rc::Rc;
use std::time::SystemTime;

pub struct ReceivableScanner {
pub common: ScannerCommon,
pub receivable_dao: Box<dyn ReceivableDao>,
pub banned_dao: Box<dyn BannedDao>,
pub persistent_configuration: Box<dyn PersistentConfiguration>,
pub financial_statistics: Rc<RefCell<FinancialStatistics>>,
}

impl
PrivateScanner<
ScanForReceivables,
RetrieveTransactions,
ReceivedPayments,
Option<NodeToUiMessage>,
> for ReceivableScanner
{
}

impl StartableScanner<ScanForReceivables, RetrieveTransactions> for ReceivableScanner {
fn start_scan(
&mut self,
earning_wallet: &Wallet,
timestamp: SystemTime,
response_skeleton_opt: Option<ResponseSkeleton>,
logger: &Logger,
) -> Result<RetrieveTransactions, StartScanError> {
self.mark_as_started(timestamp);
info!(logger, "Scanning for receivables to {}", earning_wallet);
self.scan_for_delinquencies(timestamp, logger);

Ok(RetrieveTransactions {
recipient: earning_wallet.clone(),
response_skeleton_opt,
})
}
}

impl Scanner<ReceivedPayments, Option<NodeToUiMessage>> for ReceivableScanner {
fn finish_scan(&mut self, msg: ReceivedPayments, logger: &Logger) -> Option<NodeToUiMessage> {
self.handle_new_received_payments(&msg, logger);
self.mark_as_ended(logger);

msg.response_skeleton_opt
.map(|response_skeleton| NodeToUiMessage {
target: MessageTarget::ClientId(response_skeleton.client_id),
body: UiScanResponse {}.tmb(response_skeleton.context_id),
})
}

time_marking_methods!(Receivables);

as_any_ref_in_trait_impl!();
as_any_mut_in_trait_impl!();
}

impl ReceivableScanner {
pub fn new(
receivable_dao: Box<dyn ReceivableDao>,
banned_dao: Box<dyn BannedDao>,
persistent_configuration: Box<dyn PersistentConfiguration>,
payment_thresholds: Rc<PaymentThresholds>,
financial_statistics: Rc<RefCell<FinancialStatistics>>,
) -> Self {
Self {
common: ScannerCommon::new(payment_thresholds),
receivable_dao,
banned_dao,
persistent_configuration,
financial_statistics,
}
}

fn handle_new_received_payments(
&mut self,
received_payments_msg: &ReceivedPayments,
logger: &Logger,
) {
if received_payments_msg.transactions.is_empty() {
info!(
logger,
"No newly received payments were detected during the scanning process."
);
let new_start_block = received_payments_msg.new_start_block;
if let BlockMarker::Value(start_block_number) = new_start_block {
match self
.persistent_configuration
.set_start_block(Some(start_block_number))
{
Ok(()) => debug!(logger, "Start block updated to {}", start_block_number),
Err(e) => panic!(
"Attempt to set new start block to {} failed due to: {:?}",
start_block_number, e
),
}
}
} else {
let mut txn = self.receivable_dao.as_mut().more_money_received(
received_payments_msg.timestamp,
&received_payments_msg.transactions,
);
let new_start_block = received_payments_msg.new_start_block;
if let BlockMarker::Value(start_block_number) = new_start_block {
match self
.persistent_configuration
.set_start_block_from_txn(Some(start_block_number), &mut txn)
{
Ok(()) => debug!(logger, "Start block updated to {}", start_block_number),
Err(e) => panic!(
"Attempt to set new start block to {} failed due to: {:?}",
start_block_number, e
),
}
} else {
unreachable!("Failed to get start_block while transactions were present");
}
match txn.commit() {
Ok(_) => {
debug!(logger, "Received payments have been commited to database");
}
Err(e) => panic!("Commit of received transactions failed: {:?}", e),
}
let total_newly_paid_receivable = received_payments_msg
.transactions
.iter()
.fold(0, |so_far, now| so_far + now.wei_amount);

self.financial_statistics
.borrow_mut()
.total_paid_receivable_wei += total_newly_paid_receivable;
}
}

pub fn scan_for_delinquencies(&self, timestamp: SystemTime, logger: &Logger) {
info!(logger, "Scanning for delinquencies");
self.find_and_ban_delinquents(timestamp, logger);
self.find_and_unban_reformed_nodes(timestamp, logger);
}

fn find_and_ban_delinquents(&self, timestamp: SystemTime, logger: &Logger) {
self.receivable_dao
.new_delinquencies(timestamp, self.common.payment_thresholds.as_ref())
.into_iter()
.for_each(|account| {
self.banned_dao.ban(&account.wallet);
let (balance_str_wei, age) = balance_and_age(timestamp, &account);
info!(
logger,
"Wallet {} (balance: {} gwei, age: {} sec) banned for delinquency",
account.wallet,
balance_str_wei,
age.as_secs()
)
});
}

fn find_and_unban_reformed_nodes(&self, timestamp: SystemTime, logger: &Logger) {
self.receivable_dao
.paid_delinquencies(self.common.payment_thresholds.as_ref())
.into_iter()
.for_each(|account| {
self.banned_dao.unban(&account.wallet);
let (balance_str_wei, age) = balance_and_age(timestamp, &account);
info!(
logger,
"Wallet {} (balance: {} gwei, age: {} sec) is no longer delinquent: unbanned",
account.wallet,
balance_str_wei,
age.as_secs()
)
});
}
}
Loading