Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions crates/core/src/helpers/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ pub fn calculate_absolute_timestamp_clock(
time_jump_in_absolute_slots - (time_jump_in_epochs * epoch_info.slots_in_epoch)
};

// timestamp_target is in milliseconds, we need to convert it to seconds
let timestamp_target_seconds = timestamp_target / 1000;

Ok(Clock {
slot: time_jump_in_relative_slots,
epoch_start_timestamp: timestamp_target as i64,
epoch_start_timestamp: timestamp_target_seconds as i64,
epoch: epoch_info.epoch + time_jump_in_epochs,
leader_schedule_epoch: 0,
unix_timestamp: timestamp_target as i64,
unix_timestamp: timestamp_target_seconds as i64,
})
}

Expand Down Expand Up @@ -98,12 +101,15 @@ pub fn calculate_absolute_slot_clock(
let epoch = new_absolute_slot / epoch_info.slots_in_epoch;
let slot = new_absolute_slot - epoch * epoch_info.slots_in_epoch;

// timestamp_target is in milliseconds, we need to convert it to seconds
let timestamp_target_seconds = timestamp_target / 1000;

Ok(Clock {
slot,
epoch_start_timestamp: timestamp_target as i64,
epoch_start_timestamp: timestamp_target_seconds as i64,
epoch,
leader_schedule_epoch: 0,
unix_timestamp: timestamp_target as i64,
unix_timestamp: timestamp_target_seconds as i64,
})
}

Expand Down Expand Up @@ -145,12 +151,15 @@ pub fn calculate_absolute_epoch_clock(
let time_jump_in_ms = time_jump_in_absolute_slots * slot_time;
let timestamp_target = current_updated_at + time_jump_in_ms;

// timestamp_target is in milliseconds, we need to convert it to seconds
let timestamp_target_seconds = timestamp_target / 1000;

Ok(Clock {
slot: 0,
epoch_start_timestamp: timestamp_target as i64,
epoch_start_timestamp: timestamp_target_seconds as i64,
epoch: new_epoch,
leader_schedule_epoch: 0,
unix_timestamp: timestamp_target as i64,
unix_timestamp: timestamp_target_seconds as i64,
})
}

Expand Down Expand Up @@ -226,8 +235,8 @@ mod tests {
calculate_absolute_timestamp_clock(target_time, current_time, slot_time, &epoch_info)
.unwrap();

assert_eq!(clock.unix_timestamp, target_time as i64);
assert_eq!(clock.epoch_start_timestamp, target_time as i64);
assert_eq!(clock.unix_timestamp, target_time as i64 / 1_000);
assert_eq!(clock.epoch_start_timestamp, target_time as i64 / 1_000);
assert_eq!(clock.epoch, 1); // Should stay in same epoch
assert_eq!(clock.slot, 1000 + (1_000_000 / 400)); // Should advance by time difference
}
Expand All @@ -243,8 +252,8 @@ mod tests {
calculate_absolute_timestamp_clock(target_time, current_time, slot_time, &epoch_info)
.unwrap();

assert_eq!(clock.unix_timestamp, target_time as i64);
assert_eq!(clock.epoch_start_timestamp, target_time as i64);
assert_eq!(clock.unix_timestamp, target_time as i64 / 1000);
assert_eq!(clock.epoch_start_timestamp, target_time as i64 / 1000);
// With 10 seconds and 400ms slot time, we advance 25,000 slots
// Starting from slot 431,000 in epoch 1, we should stay in epoch 1
// since 431,000 + 25,000 = 456,000 < 864,000 (end of epoch 1)
Expand Down Expand Up @@ -286,7 +295,7 @@ mod tests {
assert_eq!(clock.epoch, target_slot / epoch_info.slots_in_epoch);
assert_eq!(
clock.unix_timestamp,
(current_time + (target_slot - epoch_info.absolute_slot) * slot_time) as i64
(current_time + (target_slot - epoch_info.absolute_slot) * slot_time) as i64 / 1_000
);
}

Expand All @@ -308,7 +317,10 @@ mod tests {

assert_eq!(clock.slot, 0); // First slot of new epoch
assert_eq!(clock.epoch, 2); // New epoch
assert_eq!(clock.unix_timestamp, (current_time + slot_time) as i64);
assert_eq!(
clock.unix_timestamp,
(current_time + slot_time) as i64 / 1_000
);
}

#[test]
Expand Down Expand Up @@ -355,6 +367,7 @@ mod tests {
(current_time
+ (target_epoch * epoch_info.slots_in_epoch - epoch_info.absolute_slot) * slot_time)
as i64
/ 1_000
);
}

Expand All @@ -378,7 +391,7 @@ mod tests {
assert_eq!(clock.slot, 0);
assert_eq!(clock.epoch, 1);
// When staying in the same epoch, no time should advance
assert_eq!(clock.unix_timestamp, current_time as i64);
assert_eq!(clock.unix_timestamp, current_time as i64 / 1_000);
}

#[test]
Expand Down Expand Up @@ -416,7 +429,10 @@ mod tests {
let clock =
calculate_time_travel_clock(&config, current_time, slot_time, &epoch_info).unwrap();

assert_eq!(clock.unix_timestamp, (current_time + 1_000_000) as i64);
assert_eq!(
clock.unix_timestamp,
(current_time + 1_000_000) as i64 / 1_000
);
assert_eq!(clock.epoch, 1);
}

Expand Down Expand Up @@ -473,7 +489,7 @@ mod tests {
calculate_absolute_timestamp_clock(target_time, current_time, slot_time, &epoch_info)
.unwrap();

assert_eq!(clock.unix_timestamp, target_time as i64);
assert_eq!(clock.unix_timestamp, target_time as i64 / 1_000);
assert!(clock.epoch > 1); // Should advance many epochs
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn start_block_production_runloop(
SimnetCommand::UpdateInternalClock(_, clock) => {
svm_locker.with_svm_writer(|svm_writer| {
svm_writer.inner.set_sysvar(&clock);
svm_writer.updated_at = clock.unix_timestamp as u64;
svm_writer.updated_at = clock.unix_timestamp as u64 * 1_000;
svm_writer.latest_epoch_info.absolute_slot = clock.slot;
svm_writer.latest_epoch_info.epoch = clock.epoch;
svm_writer.latest_epoch_info.slot_index = clock.slot;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/surfnet/locker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2807,7 +2807,7 @@ impl SurfnetSvmLocker {
calculate_time_travel_clock(&config, updated_at, slot_time, &epoch_info)
.map_err(|e| SurfpoolError::internal(e.to_string()))?;

let formated_time = chrono::DateTime::from_timestamp(clock_update.unix_timestamp / 1000, 0)
let formated_time = chrono::DateTime::from_timestamp(clock_update.unix_timestamp, 0)
.unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap())
.format("%Y-%m-%d %H:%M:%S")
.to_string();
Expand Down
23 changes: 3 additions & 20 deletions crates/core/src/surfnet/svm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl SurfnetSvm {
let clock: Clock = Clock {
slot: self.latest_epoch_info.absolute_slot,
epoch: self.latest_epoch_info.epoch,
unix_timestamp: self.updated_at as i64,
unix_timestamp: self.updated_at as i64 / 1_000,
epoch_start_timestamp: 0, // todo
leader_schedule_epoch: 0, // todo
};
Expand All @@ -274,8 +274,6 @@ impl SurfnetSvm {
/// # Returns
/// A `TransactionResult` indicating success or failure.
pub fn airdrop(&mut self, pubkey: &Pubkey, lamports: u64) -> TransactionResult {
self.updated_at = Utc::now().timestamp_millis() as u64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally this updated_at field was used to track an "idle" surfnet for the MCP server. This field was update whenever a user action was taken (getting an account, updating an account, etc). What is the purpose of the field now? Are we breaking things with the MCP by removing these calls? Can we remove the updated_at field altogether now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's going to break the MCP. blocks are still produced in the background, updating this value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was meant to track user interaction, not block production. Again, to determine if a network is "idle", as in a user isn't using it. At least, that's my understanding of what you and Arthur implemented for this. We needed to know if a network was being used by the user so you could kill it after 15m if not. If that's the use case, block production updating this value would keep the network online indefinitely.

If the MCP doesn't need this field, I guess I'm questioning what "updated_at" means/represents, and if the field can be removed or renamed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting I thought that field was always there.
We're currently producing blocks every 400ms, and being able to track the clock at the ms scale is better for time travel precision (maybe not mission critical).
To your point, I don't think we were ever able to convince ourselves that the 15m timeout was working as expected, and this field being updated every time a block is being confirmed would definitely prevent the auto-termination.
updated_at feels like the right name to me for what is being done here.

We could introduce a last_used_at (sec instead of milliseconds) to fix the auto-termination?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay cool, I can reset my understanding of that field. This is where we introduced that field btw: f3e7e7a#diff-3c2b6db2274e8aa434e77af36810870fb9e07b4fa4419a913ccc835784b6589cR136-R138

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I remember now yeah.


let res = self.inner.airdrop(pubkey, lamports);
let (status_tx, _rx) = unbounded();
if let Ok(ref tx_result) = res {
Expand Down Expand Up @@ -352,7 +350,6 @@ impl SurfnetSvm {
/// * `lamports` - The amount of lamports to airdrop.
/// * `addresses` - Slice of recipient public keys.
pub fn airdrop_pubkeys(&mut self, lamports: u64, addresses: &[Pubkey]) {
self.updated_at = Utc::now().timestamp_millis() as u64;
for recipient in addresses {
let _ = self.airdrop(recipient, lamports);
let _ = self.simnet_events_tx.send(SimnetEvent::info(format!(
Expand Down Expand Up @@ -403,7 +400,7 @@ impl SurfnetSvm {
recent_blockhashes::{IterItem, MAX_ENTRIES, RecentBlockhashes},
slot_hashes::SlotHashes,
};
self.updated_at = Utc::now().timestamp_millis() as u64;

// Backup the current block hashes
let recent_blockhashes_backup = self.inner.get_sysvar::<RecentBlockhashes>();
let num_blockhashes_expected = recent_blockhashes_backup.len().min(MAX_ENTRIES);
Expand Down Expand Up @@ -492,8 +489,6 @@ impl SurfnetSvm {
/// # Returns
/// `Ok(())` on success, or an error if the operation fails.
pub fn set_account(&mut self, pubkey: &Pubkey, account: Account) -> SurfpoolResult<()> {
self.updated_at = Utc::now().timestamp_millis() as u64;

self.inner
.set_account(*pubkey, account.clone())
.map_err(|e| SurfpoolError::set_account(*pubkey, e))?;
Expand Down Expand Up @@ -674,7 +669,6 @@ impl SurfnetSvm {
estimation_result.error_message
)));
}
self.updated_at = Utc::now().timestamp_millis() as u64;
self.transactions_processed += 1;

if !self.check_blockhash_is_recent(tx.message.recent_blockhash()) {
Expand Down Expand Up @@ -783,7 +777,6 @@ impl SurfnetSvm {
/// # Returns
/// `Ok(Vec<Signature>)` with confirmed signatures, or `Err(SurfpoolError)` on error.
fn confirm_transactions(&mut self) -> Result<(Vec<Signature>, HashSet<Pubkey>), SurfpoolError> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let mut confirmed_transactions = vec![];
let slot = self.latest_epoch_info.slot_index;

Expand Down Expand Up @@ -833,7 +826,6 @@ impl SurfnetSvm {
/// # Returns
/// `Ok(())` on success, or `Err(SurfpoolError)` on error.
fn finalize_transactions(&mut self) -> Result<(), SurfpoolError> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let current_slot = self.latest_epoch_info.absolute_slot;
let mut requeue = VecDeque::new();
while let Some((finalized_at, tx, status_tx)) =
Expand Down Expand Up @@ -878,7 +870,6 @@ impl SurfnetSvm {
/// # Arguments
/// * `account_update` - The account update result to process.
pub fn write_account_update(&mut self, account_update: GetAccountResult) {
self.updated_at = Utc::now().timestamp_millis() as u64;
match account_update {
GetAccountResult::FoundAccount(pubkey, account, do_update_account) => {
if do_update_account {
Expand Down Expand Up @@ -922,7 +913,6 @@ impl SurfnetSvm {
}

pub fn confirm_current_block(&mut self) -> Result<(), SurfpoolError> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let slot = self.get_latest_absolute_slot();
// Confirm processed transactions
let (confirmed_signatures, all_mutated_account_keys) = self.confirm_transactions()?;
Expand Down Expand Up @@ -986,7 +976,7 @@ impl SurfnetSvm {
let clock: Clock = Clock {
slot: self.latest_epoch_info.absolute_slot,
epoch: self.latest_epoch_info.epoch,
unix_timestamp: Utc::now().timestamp(),
unix_timestamp: self.updated_at as i64 / 1_000,
epoch_start_timestamp: 0, // todo
leader_schedule_epoch: 0, // todo
};
Expand Down Expand Up @@ -1014,7 +1004,6 @@ impl SurfnetSvm {
signature: &Signature,
subscription_type: SignatureSubscriptionType,
) -> Receiver<(Slot, Option<TransactionError>)> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let (tx, rx) = unbounded();
self.signature_subscriptions
.entry(*signature)
Expand All @@ -1028,7 +1017,6 @@ impl SurfnetSvm {
account_pubkey: &Pubkey,
encoding: Option<UiAccountEncoding>,
) -> Receiver<UiAccount> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let (tx, rx) = unbounded();
self.account_subscriptions
.entry(*account_pubkey)
Expand All @@ -1051,7 +1039,6 @@ impl SurfnetSvm {
slot: Slot,
err: Option<TransactionError>,
) {
self.updated_at = Utc::now().timestamp_millis() as u64;
let mut remaining = vec![];
if let Some(subscriptions) = self.signature_subscriptions.remove(signature) {
for (subscription_type, tx) in subscriptions {
Expand Down Expand Up @@ -1323,14 +1310,12 @@ impl SurfnetSvm {
}

pub fn subscribe_for_slot_updates(&mut self) -> Receiver<SlotInfo> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let (tx, rx) = unbounded();
self.slot_subscriptions.push(tx);
rx
}

pub fn notify_slot_subscribers(&mut self, slot: Slot, parent: Slot, root: Slot) {
self.updated_at = Utc::now().timestamp_millis() as u64;
self.slot_subscriptions
.retain(|tx| tx.send(SlotInfo { slot, parent, root }).is_ok());
}
Expand Down Expand Up @@ -1369,7 +1354,6 @@ impl SurfnetSvm {
commitment_level: &CommitmentLevel,
filter: &RpcTransactionLogsFilter,
) -> Receiver<(Slot, RpcLogsResponse)> {
self.updated_at = Utc::now().timestamp_millis() as u64;
let (tx, rx) = unbounded();
self.logs_subscriptions
.push((commitment_level.clone(), filter.clone(), tx));
Expand All @@ -1383,7 +1367,6 @@ impl SurfnetSvm {
logs: Vec<String>,
commitment_level: CommitmentLevel,
) {
self.updated_at = Utc::now().timestamp_millis() as u64;
for (expected_level, _filter, tx) in self.logs_subscriptions.iter() {
if expected_level.eq(&commitment_level) {
let message = RpcLogsResponse {
Expand Down