Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ pub struct Config {
#[partial_struct(ty = "PartialLog")]
#[partial_struct(serde(default))]
pub log: Log,

/// Ntp-related configuration
#[partial_struct(ty = "PartialNtp")]
#[partial_struct(serde(default))]
pub ntp: Ntp,
}

/// Log-specific configuration.
Expand Down Expand Up @@ -276,6 +281,20 @@ pub struct Mining {
pub enabled: bool,
}

/// NTP-related configuration
#[derive(PartialStruct, Debug, Clone, PartialEq)]
#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))]
pub struct Ntp {
/// Period that indicate the validity of a ntp timestamp
pub update_period: Duration,

/// Server to query ntp information
pub servers: Vec<String>,

/// Enable NTP for clock synchronization
pub enabled: bool,
}

impl Config {
pub fn from_partial(config: &PartialConfig) -> Self {
let defaults: &dyn Defaults = match config.environment {
Expand Down Expand Up @@ -317,6 +336,7 @@ impl Config {
mining: Mining::from_partial(&config.mining, defaults),
wallet: Wallet::from_partial(&config.wallet, defaults),
rocksdb: Rocksdb::from_partial(&config.rocksdb, defaults),
ntp: Ntp::from_partial(&config.ntp, defaults),
}
}
}
Expand Down Expand Up @@ -476,6 +496,25 @@ impl Mining {
}
}

impl Ntp {
pub fn from_partial(config: &PartialNtp, defaults: &dyn Defaults) -> Self {
Ntp {
update_period: config
.update_period
.to_owned()
.unwrap_or_else(|| defaults.ntp_update_period()),
servers: config
.servers
.clone()
.unwrap_or_else(|| defaults.ntp_server()),
enabled: config
.enabled
.to_owned()
.unwrap_or_else(|| defaults.ntp_enabled()),
}
}
}

/// Wallet-specific configuration.
#[derive(PartialStruct, Serialize, Debug, Clone, PartialEq)]
#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))]
Expand Down
17 changes: 17 additions & 0 deletions config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ pub trait Defaults {
fn rocksdb_enable_statistics(&self) -> bool {
false
}

fn ntp_update_period(&self) -> Duration {
Duration::from_secs(600)
}

fn ntp_server(&self) -> Vec<String> {
vec![
"0.pool.ntp.org:123".to_string(),
"1.pool.ntp.org:123".to_string(),
"2.pool.ntp.org:123".to_string(),
"3.pool.ntp.org:123".to_string(),
]
}

fn ntp_enabled(&self) -> bool {
true
}
}

/// Struct that will implement all the mainnet defaults
Expand Down
1 change: 1 addition & 0 deletions data_structures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ secp256k1 = "0.15.5"
vrf = "0.2.2"
witnet_reputation = { path = "../reputation", features = ["serde"] }
witnet_protected = { path = "../protected" }
chrono = "0.4.9"

[dependencies.partial_struct]
path = "../partial_struct"
Expand Down
33 changes: 29 additions & 4 deletions node/src/actors/epoch_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use witnet_data_structures::{
chain::{Epoch, EpochConstants},
error::EpochCalculationError,
};
use witnet_util::timestamp::{duration_between_timestamps, get_timestamp, get_timestamp_nanos};
use witnet_util::timestamp::{
duration_between_timestamps, get_timestamp, get_timestamp_nanos, update_global_timestamp,
};

use crate::actors::messages::{EpochNotification, EpochResult};
use crate::config_mngr;
Expand Down Expand Up @@ -126,6 +128,13 @@ impl EpochManager {
// Start checkpoint monitoring process
actor.checkpoint_monitor(ctx);

// Start ntp update process
if config.ntp.enabled {
let ntp_addr = config.ntp.servers[0].clone();
update_global_timestamp(ntp_addr.as_str());
actor.update_ntp_timestamp(ctx, config.ntp.update_period, ntp_addr);
}

fut::ok(())
})
.map_err(|err, _, _| {
Expand Down Expand Up @@ -165,17 +174,22 @@ impl EpochManager {
Err(_) => return,
};

let last_checked_epoch = act.last_checked_epoch.unwrap_or(0);

// Send message to actors which subscribed to all epochs
for subscription in &mut act.subscriptions_all {
subscription.send_notification(current_epoch);
if current_epoch > last_checked_epoch {
for subscription in &mut act.subscriptions_all {
// Only send new epoch notification
subscription.send_notification(current_epoch);
}
}

// Get all the checkpoints that had some subscription but were skipped for some
// reason (process sent to background, checkpoint monitor process had no
// resources to execute in time...)
let epoch_checkpoints: Vec<_> = act
.subscriptions_epoch
.range(act.last_checked_epoch.unwrap_or(0)..=current_epoch)
.range(last_checked_epoch..=current_epoch)
.map(|(k, _v)| *k)
.collect();

Expand Down Expand Up @@ -209,6 +223,17 @@ impl EpochManager {
},
);
}

/// Method to monitor checkpoints and execute some actions on each
fn update_ntp_timestamp(&self, ctx: &mut Context<Self>, period: Duration, addr: String) {
// Wait until next checkpoint to execute the periodic function
ctx.run_later(period, move |act, ctx| {
update_global_timestamp(addr.as_str());

// Reschedule update ntp process
act.update_ntp_timestamp(ctx, period, addr);
});
}
}

/// Trait that must follow all notifications that will be sent back to subscriber actors
Expand Down
5 changes: 5 additions & 0 deletions util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ workspace = ".."
[dependencies]
chrono = "0.4.9"
failure = "0.1.6"
lazy_static = "1.4.0"
log = "0.4.8"
ntp = { git = "https://github.com/witnet/ntp" }
time = "0.1.42"
serde = { version = "1.0.101", features = ["derive"] }
106 changes: 95 additions & 11 deletions util/src/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,106 @@
use chrono::prelude::*;
use chrono::{prelude::*, TimeZone};
use lazy_static::lazy_static;
use ntp;
use serde::{Deserialize, Serialize};
use std::sync::RwLock;
use std::time::Duration;

/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch
pub fn get_timestamp() -> i64 {
// Get UTC current datetime
let utc: DateTime<Utc> = Utc::now();
/// NTP Timestamp difference
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Clone)]
pub struct NTPDiff {
/// Difference between NTP and system timestamp
pub ntp_diff: Duration,
/// Flag to indicate if NTP is bigger or smaller
/// than system timestamp
pub bigger: bool,
}

// Return number of non-leap seconds since Unix epoch
utc.timestamp()
lazy_static! {
static ref NTP_TS: RwLock<NTPDiff> = RwLock::new(NTPDiff::default());
}

/// Function to get timestamp from system as UTC Unix timestamp, seconds and nanoseconds since Unix epoch
pub fn get_timestamp_nanos() -> (i64, u32) {
/// Get NTP timestamp
fn get_ntp_diff() -> NTPDiff {
NTP_TS.read().expect("Timestamp with poisoned lock").clone()
}

/// Set NTP timestamp
fn get_mut_ntp_diff() -> std::sync::RwLockWriteGuard<'static, NTPDiff> {
NTP_TS.write().expect("Timestamp with poisoned lock")
}

/// Get Local timestamp
pub fn get_local_timestamp() -> (i64, u32) {
// Get UTC current datetime
let utc: DateTime<Utc> = Utc::now();

// Return number of non-leap seconds since Unix epoch and the number of nanoseconds since the last second boundary
(utc.timestamp(), utc.timestamp_subsec_nanos())
let utc_secs = utc.timestamp();
let utc_subsec_nanos = utc.timestamp_subsec_nanos();

(utc_secs, utc_subsec_nanos)
}

/// Update NTP timestamp
pub fn update_global_timestamp(addr: &str) {
match get_timestamp_ntp(addr) {
Ok(ntp) => {
let utc = get_local_timestamp();
let mut ntp_diff = get_mut_ntp_diff();

if let Some(diff) = duration_between_timestamps(utc, ntp) {
ntp_diff.ntp_diff = diff;
ntp_diff.bigger = true;
} else {
let diff = duration_between_timestamps(ntp, utc).unwrap();
ntp_diff.ntp_diff = diff;
ntp_diff.bigger = false;
}
}
Err(e) => {
log::warn!("NTP request failed: {}", e);
}
}
}

fn local_time(timestamp: ntp::protocol::TimestampFormat) -> chrono::DateTime<chrono::Local> {
let unix_time = ntp::unix_time::Instant::from(timestamp);
chrono::Local.timestamp(unix_time.secs(), unix_time.subsec_nanos() as u32)
}
/// Get NTP timestamp from an addr specified
pub fn get_timestamp_ntp(addr: &str) -> Result<(i64, u32), std::io::Error> {
ntp::request(addr).map(|p| {
let ts = local_time(p.receive_timestamp);

(ts.timestamp(), ts.timestamp_subsec_nanos())
})
}
/// Function to get timestamp from system/ntp server as UTC Unix timestamp, seconds since Unix epoch
pub fn get_timestamp() -> i64 {
get_timestamp_nanos().0
}

/// Function to get timestamp from system/ntp server as UTC Unix timestamp, seconds and nanoseconds since Unix epoch
pub fn get_timestamp_nanos() -> (i64, u32) {
let utc_ts = get_local_timestamp();
let ntp_diff = get_ntp_diff();

// Apply difference respect to NTP timestamp
let utc_dur = Duration::new(utc_ts.0 as u64, utc_ts.1);
let result = if ntp_diff.bigger {
utc_dur.checked_add(ntp_diff.ntp_diff)
} else {
utc_dur.checked_sub(ntp_diff.ntp_diff)
};

match result {
Some(x) => (x.as_secs() as i64, x.subsec_nanos()),
None => panic!(
"Error: Overflow in timestamp\n\
UTC timestamp: {} secs, {} nanosecs\n\
NTP diff: {:?}",
utc_ts.0, utc_ts.1, ntp_diff
),
}
}

/// Duration needed to wait from now until the target timestamp
Expand Down