From 2823683d72ce29713613eaea894c74760a57bb8b Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 05:58:16 +0300 Subject: [PATCH 01/16] add logs --- src/libs/blockchain/routines/Sync.ts | 4 +++ src/libs/consensus/routines/consensusTime.ts | 26 +++++++++++++------ .../consensus/v2/types/secretaryManager.ts | 19 +++++++++++--- src/utilities/mainLoop.ts | 5 ++-- src/utilities/sharedState.ts | 5 +++- 5 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/libs/blockchain/routines/Sync.ts b/src/libs/blockchain/routines/Sync.ts index 916626e11..361e98086 100644 --- a/src/libs/blockchain/routines/Sync.ts +++ b/src/libs/blockchain/routines/Sync.ts @@ -250,6 +250,7 @@ async function verifyLastBlockIntegrity( } async function downloadBlock(peer: Peer, blockToAsk: number) { + log.only("Downloading block: " + blockToAsk) const blockRequest: RPCRequest = { method: "nodeCall", params: [ @@ -264,6 +265,7 @@ async function downloadBlock(peer: Peer, blockToAsk: number) { const blockResponse = await peer.longCall(blockRequest, false, 250, 3, [ 404, ]) + log.only("Block response: " + blockResponse.result) // INFO: Handle max retries reached if (blockResponse.result === 400) { @@ -290,6 +292,8 @@ async function downloadBlock(peer: Peer, blockToAsk: number) { log.info("[downloadBlock] Block received: " + block.hash) await Chain.insertBlock(block, [], null, false) + log.only("Block inserted successfully") + log.only("Last block number: " + getSharedState.lastBlockNumber + " Last block hash: " + getSharedState.lastBlockHash) log.info( "[fastSync] Block inserted successfully at the head of the chain!", ) diff --git a/src/libs/consensus/routines/consensusTime.ts b/src/libs/consensus/routines/consensusTime.ts index 507e7db45..4757c72ae 100644 --- a/src/libs/consensus/routines/consensusTime.ts +++ b/src/libs/consensus/routines/consensusTime.ts @@ -12,7 +12,9 @@ export async function checkConsensusTime( ): Promise { // Safeguard to prevent the consensus time from being checked before the last block is forged if (getSharedState.inConsensusLoop) { - log.warning("[CONSENSUS TIME] Cannot check consensus time while in consensus loop, skipping") + log.warning( + "[CONSENSUS TIME] Cannot check consensus time while in consensus loop, skipping", + ) return false } let isConsensusTime = false @@ -27,16 +29,21 @@ export async function checkConsensusTime( // REVIEW Using the UTC timestamp as per mainLoop.ts settings const currentTimestamp = getNetworkTimestamp() // Date.now() const delta = currentTimestamp - lastTimestamp - const consensusIntervalTime = - getSharedState.getConsensusTime() || 10 // 10 seconds, use 10000 for 10 seconds in ms + const consensusIntervalTime = getSharedState.getConsensusTime() log.debug("[CONSENSUS TIME] lastTimestamp: " + lastTimestamp, true) log.debug("[CONSENSUS TIME] currentTimestamp: " + currentTimestamp, true) log.debug("[CONSENSUS TIME] delta: " + delta, true) - log.debug("[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, true) + log.debug( + "[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, + true, + ) //process.exit(0) // If the delta is greater than the consensus interval time, then the consensus time has passed - log.info("[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, false) + log.info( + "[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, + false, + ) if (delta >= consensusIntervalTime) { isConsensusTime = true log.info("[CONSENSUS TIME] Consensus time reached", true) @@ -48,7 +55,12 @@ export async function checkConsensusTime( const minDelta = consensusIntervalTime - flextime if (delta > minDelta && delta < maxDelta) { isConsensusTime = true - log.info("[CONSENSUS TIME] Consensus time reached (with flexible time and delta: " + delta + ")", true) + log.info( + "[CONSENSUS TIME] Consensus time reached (with flexible time and delta: " + + delta + + ")", + true, + ) } } } @@ -58,5 +70,3 @@ export async function checkConsensusTime( // We can return the result return isConsensusTime } - - diff --git a/src/libs/consensus/v2/types/secretaryManager.ts b/src/libs/consensus/v2/types/secretaryManager.ts index 9ff0b2115..f41112c8b 100644 --- a/src/libs/consensus/v2/types/secretaryManager.ts +++ b/src/libs/consensus/v2/types/secretaryManager.ts @@ -45,25 +45,35 @@ export default class SecretaryManager { secretaryKey: "", blockRef: lastBlockNumber + 1, } + log.only("🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒") + log.only("Initializing shard") + log.only("CVSA: " + cVSA) + log.only("Last block number: " + lastBlockNumber) // Reusing the method to create the members this.shard.members = await getShard(cVSA) // this.ourKey = getSharedState.identity.ed25519.publicKey.toString("hex") this.ourKey = getSharedState.publicKeyHex + log.only( + "Shard members: " + + JSON.stringify(this.shard.members.map(m => m.identity)), + ) + if ( !this.shard.members.map(peer => peer.identity).includes(this.ourKey) ) { + log.error("We are not in the shard") throw new NotInShardError("We are not in the shard") } // Assigning the secretary and its key this.shard.secretaryKey = this.secretary.identity - log.debug("INITIALIZED SHARD:") - log.debug( + log.only("INITIALIZED SHARD:") + log.only( "SHARD: " + JSON.stringify(this.shard.members.map(m => m.identity)), ) - log.debug("SECRETARY: " + this.secretary.identity) + log.only("SECRETARY: " + this.secretary.identity) // INFO: If some nodes crash, kill the node for debugging! // if (this.shard.members.length < 3 && this.shard.blockRef > 24000) { @@ -75,8 +85,9 @@ export default class SecretaryManager { // INFO: Start the secretary routine if (this.checkIfWeAreSecretary()) { + log.only("We are the secretary. starting the secretary routine") this.secretaryRoutine().then(() => { - log.debug("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") + log.only("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") }) } diff --git a/src/utilities/mainLoop.ts b/src/utilities/mainLoop.ts index 3dec4c177..33784b4a4 100644 --- a/src/utilities/mainLoop.ts +++ b/src/utilities/mainLoop.ts @@ -67,7 +67,7 @@ async function mainLoopCycle() { log.info("[MAINLOOP]: about to check if its time for consensus") if (!isConsensusTimeReached) { - log.info("[MAINLOOP]: is not consensus time") + log.only("[MAINLOOP]: is not consensus time") //await sendNodeOnlineTx() } @@ -93,9 +93,8 @@ async function mainLoopCycle() { ) { // Set the startingConsensus flag to true to avoid conflicts with starting loops getSharedState.startingConsensus = true - log.info("[MAIN LOOP] Consensus time reached and sync status is true") + log.only("[MAIN LOOP] Consensus time reached and sync status is true") // Wait for the peer routine to finish if it is still running - log.info("[MAIN LOOP] Waiting for the peer routine to finish") let timer = 0 while (getSharedState.peerRoutineRunning > 0) { await sleep(100) diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index a57871b0b..27eef5884 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -201,8 +201,11 @@ export default class SharedState { return Number(process.env.CONSENSUS_CHECK_INTERVAL) } + /** + * @returns The block time in seconds + */ public getConsensusTime(): number { - return Number(process.env.CONSENSUS_TIME) + return Number(process.env.CONSENSUS_TIME) || this.block_time } public async getConnectionString(): Promise { From 9c3fed71948b0cbfde1f05ecb970cd92e6212d44 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 06:17:35 +0300 Subject: [PATCH 02/16] more logging --- src/libs/consensus/v2/types/secretaryManager.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libs/consensus/v2/types/secretaryManager.ts b/src/libs/consensus/v2/types/secretaryManager.ts index f41112c8b..19ab6fe87 100644 --- a/src/libs/consensus/v2/types/secretaryManager.ts +++ b/src/libs/consensus/v2/types/secretaryManager.ts @@ -57,7 +57,7 @@ export default class SecretaryManager { log.only( "Shard members: " + - JSON.stringify(this.shard.members.map(m => m.identity)), + JSON.stringify(this.shard.members.map(m => m.connection.string)), ) if ( @@ -71,7 +71,7 @@ export default class SecretaryManager { log.only("INITIALIZED SHARD:") log.only( - "SHARD: " + JSON.stringify(this.shard.members.map(m => m.identity)), + "SHARD: " + JSON.stringify(this.shard.members.map(m => m.connection.string)), ) log.only("SECRETARY: " + this.secretary.identity) @@ -85,7 +85,7 @@ export default class SecretaryManager { // INFO: Start the secretary routine if (this.checkIfWeAreSecretary()) { - log.only("We are the secretary. starting the secretary routine") + log.only("⬜️ We are the secretary ⬜️. starting the secretary routine") this.secretaryRoutine().then(() => { log.only("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") }) From a698a3cb92fb2ad5fc558509017d183d59456243 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 06:57:01 +0300 Subject: [PATCH 03/16] more logging --- src/libs/consensus/v2/routines/manageProposeBlockHash.ts | 4 +++- src/libs/peer/PeerManager.ts | 2 +- src/utilities/sharedState.ts | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/libs/consensus/v2/routines/manageProposeBlockHash.ts b/src/libs/consensus/v2/routines/manageProposeBlockHash.ts index be4602b67..dea05002b 100644 --- a/src/libs/consensus/v2/routines/manageProposeBlockHash.ts +++ b/src/libs/consensus/v2/routines/manageProposeBlockHash.ts @@ -6,6 +6,7 @@ import { RPCResponse } from "@kynesyslabs/demosdk/types" import _ from "lodash" import ensureCandidateBlockFormed from "./ensureCandidateBlockFormed" import { hexToUint8Array, ucrypto } from "@kynesyslabs/demosdk/encryption" +import PeerManager from "@/libs/peer/PeerManager" export default async function manageProposeBlockHash( blockHash: string, @@ -20,9 +21,10 @@ export default async function manageProposeBlockHash( // Checking if the validator that sent us the block hash is in the shard const shard = getSharedState.lastShard const validator = shard.find(validator => validator === peerId) + const peer = PeerManager.getInstance().getPeer(peerId) if (!validator) { log.error( - "[manageProposeBlockHash] Validator is not in the shard: refusing the block hash", + "[manageProposeBlockHash] Validator (" + peer.connection.string + ") is not in the shard: refusing the block hash", ) response.result = 401 response.response = getSharedState.publicKeyHex diff --git a/src/libs/peer/PeerManager.ts b/src/libs/peer/PeerManager.ts index 2ea59b012..cf0efd2be 100644 --- a/src/libs/peer/PeerManager.ts +++ b/src/libs/peer/PeerManager.ts @@ -41,7 +41,7 @@ export default class PeerManager { // Loading the peer list from the demos_peer.json loadPeerList() { - let rawPeerList: string = "{}" + let rawPeerList = "{}" try { rawPeerList = fs.readFileSync(getSharedState.peerListFile, "utf8") diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index 27eef5884..a792e82d2 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -22,7 +22,7 @@ export default class SharedState { version_name = "Entangled Polymer" signingAlgorithm = "ed25519" as SigningAlgorithm - block_time = 10 // TODO Get it from the genesis (or see Consensus module) + block_time = 20 // TODO Get it from the genesis (or see Consensus module) currentTimestamp = 0 currentUTCTime = 0 From ff94c6e51092112c7c4a3a3e76a932e8c9e94eba Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 07:05:43 +0300 Subject: [PATCH 04/16] log our connection string --- src/libs/network/manageConsensusRoutines.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 5aa2c902e..fa7115111 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -86,7 +86,7 @@ export default async function manageConsensusRoutines( if (!isInShard) { response.result = 400 response.response = - "We are not in the shard, cannot proceed with the routine" + "We are not in the shard(" + getSharedState.connectionString + "), cannot proceed with the routine" return response } From 21351966065a0bd43dffc31c42a1a7b9e711b666 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 07:28:33 +0300 Subject: [PATCH 05/16] log exposed url --- src/libs/network/manageConsensusRoutines.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index fa7115111..869888efc 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -86,7 +86,7 @@ export default async function manageConsensusRoutines( if (!isInShard) { response.result = 400 response.response = - "We are not in the shard(" + getSharedState.connectionString + "), cannot proceed with the routine" + "We are not in the shard(" + getSharedState.exposedUrl + "), cannot proceed with the routine" return response } From bb0e1c1893e4b4c8c04f56369a82fd1720215d25 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 08:14:21 +0300 Subject: [PATCH 06/16] more logging --- src/libs/network/manageConsensusRoutines.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 869888efc..5d708aafe 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -15,6 +15,7 @@ import log from "src/utilities/logger" import Cryptography from "../crypto/cryptography" import SecretaryManager from "../consensus/v2/types/secretaryManager" import { Waiter } from "src/utilities/waiter" +import { PeerManager } from "../peer" export interface ConsensusMethod { method: @@ -87,6 +88,21 @@ export default async function manageConsensusRoutines( response.result = 400 response.response = "We are not in the shard(" + getSharedState.exposedUrl + "), cannot proceed with the routine" + + log.error("πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’") + log.error("We are not in the shard(" + getSharedState.exposedUrl + "), cannot proceed with the routine") + log.error("current validator seed: " + getSharedState.currentValidatorSeed) + log.error("calculated shard: " + JSON.stringify(shard.map(m => m.connection.string), null, 2)) + const sharedStateLastShard = [] + + for (const pubkey of getSharedState.lastShard) { + const peer = PeerManager.getInstance().getPeer(pubkey) + sharedStateLastShard.push(peer.connection.string) + } + + log.error("shared state last shard: " + JSON.stringify(sharedStateLastShard, null, 2)) + log.error("last block number: " + getSharedState.lastBlockNumber) + log.error("πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’") return response } From ad5c8819bfb248ee089883cd55fe0a933c44b87a Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 08:55:34 +0300 Subject: [PATCH 07/16] try: prevent overwriting cvsa in sharedstate --- src/libs/consensus/v2/routines/createBlock.ts | 1 + src/libs/consensus/v2/routines/getCommonValidatorSeed.ts | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/libs/consensus/v2/routines/createBlock.ts b/src/libs/consensus/v2/routines/createBlock.ts index 27ea34c5d..7f47f5a61 100644 --- a/src/libs/consensus/v2/routines/createBlock.ts +++ b/src/libs/consensus/v2/routines/createBlock.ts @@ -8,6 +8,7 @@ import Peer from "src/libs/peer/Peer" import hashGCRTables from "src/libs/blockchain/gcr/gcr_routines/hashGCR" import getCommonValidatorSeed from "./getCommonValidatorSeed" import { ucrypto, uint8ArrayToHex } from "@kynesyslabs/demosdk/encryption" + export async function createBlock( orderedTransactions: Transaction[], commonValidatorSeed: string, diff --git a/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts b/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts index 5464af8ca..bf5fadfd7 100644 --- a/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts +++ b/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts @@ -122,7 +122,11 @@ export default async function getCommonValidatorSeed( const commonValidatorSeed = Hashing.sha256(hashString) // NOTE The common validator seed is set in the sharedState as soon as it is computed - getSharedState.currentValidatorSeed = commonValidatorSeed + // NOTE: This should only happen when calculating the CVSA based on the last forged block (aka: when using this function's default parameters) + if (!lastBlock) { + getSharedState.currentValidatorSeed = commonValidatorSeed + } + logger(`Common validator seed: ${commonValidatorSeed}`) return { commonValidatorSeed, lastBlockNumber } } From 65360d09f941f05dbf704d6ad92315517ec41d5e Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 09:01:54 +0300 Subject: [PATCH 08/16] remove sharedState cvs --- .../v2/routines/getCommonValidatorSeed.ts | 6 ++-- src/libs/network/manageConsensusRoutines.ts | 33 +++++++++++++++---- src/utilities/sharedState.ts | 1 - 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts b/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts index bf5fadfd7..a3317def7 100644 --- a/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts +++ b/src/libs/consensus/v2/routines/getCommonValidatorSeed.ts @@ -123,9 +123,9 @@ export default async function getCommonValidatorSeed( // NOTE The common validator seed is set in the sharedState as soon as it is computed // NOTE: This should only happen when calculating the CVSA based on the last forged block (aka: when using this function's default parameters) - if (!lastBlock) { - getSharedState.currentValidatorSeed = commonValidatorSeed - } + // if (updateSharedState) { + // getSharedState.currentValidatorSeed = commonValidatorSeed + // } logger(`Common validator seed: ${commonValidatorSeed}`) return { commonValidatorSeed, lastBlockNumber } diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 5d708aafe..abcd4ecef 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -73,7 +73,8 @@ export default async function manageConsensusRoutines( } // Also refuses the routine if we are not in the shard - const shard = await getShard(getSharedState.currentValidatorSeed) + const { commonValidatorSeed } = await getCommonValidatorSeed() + const shard = await getShard(commonValidatorSeed) const ourId = getSharedState.publicKeyHex let isInShard = false @@ -87,12 +88,27 @@ export default async function manageConsensusRoutines( if (!isInShard) { response.result = 400 response.response = - "We are not in the shard(" + getSharedState.exposedUrl + "), cannot proceed with the routine" + "We are not in the shard(" + + getSharedState.exposedUrl + + "), cannot proceed with the routine" log.error("πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’") - log.error("We are not in the shard(" + getSharedState.exposedUrl + "), cannot proceed with the routine") - log.error("current validator seed: " + getSharedState.currentValidatorSeed) - log.error("calculated shard: " + JSON.stringify(shard.map(m => m.connection.string), null, 2)) + log.error( + "We are not in the shard(" + + getSharedState.exposedUrl + + "), cannot proceed with the routine", + ) + log.error( + "current validator seed: " + commonValidatorSeed, + ) + log.error( + "calculated shard: " + + JSON.stringify( + shard.map(m => m.connection.string), + null, + 2, + ), + ) const sharedStateLastShard = [] for (const pubkey of getSharedState.lastShard) { @@ -100,7 +116,10 @@ export default async function manageConsensusRoutines( sharedStateLastShard.push(peer.connection.string) } - log.error("shared state last shard: " + JSON.stringify(sharedStateLastShard, null, 2)) + log.error( + "shared state last shard: " + + JSON.stringify(sharedStateLastShard, null, 2), + ) log.error("last block number: " + getSharedState.lastBlockNumber) log.error("πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’") return response @@ -168,7 +187,7 @@ export default async function manageConsensusRoutines( case "getCommonValidatorSeed": response.result = 200 await getCommonValidatorSeed() // NOTE This is generated each time and stored in the shared state - response.response = getSharedState.currentValidatorSeed + response.response = commonValidatorSeed break // SECTION: New Secretary Manager class handlers diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index a792e82d2..e5e50f41f 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -78,7 +78,6 @@ export default class SharedState { // SECTION shared state variables shard: Peer[] lastShard: string[] // ? Should be used by PoRBFT.ts consensus and should contain all the public keys of the nodes in the last shard - currentValidatorSeed: string identity: Identity keypair: { publicKey: From b4dda28f6cb6a9f964f9c18203ed0eff386b0534 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 09:21:35 +0300 Subject: [PATCH 09/16] remove lastShard from sharedState --- src/libs/consensus/v2/routines/getShard.ts | 65 +++++++++++++++++-- .../v2/routines/manageProposeBlockHash.ts | 13 +++- src/libs/network/manageConsensusRoutines.ts | 7 +- src/utilities/sharedState.ts | 2 +- 4 files changed, 72 insertions(+), 15 deletions(-) diff --git a/src/libs/consensus/v2/routines/getShard.ts b/src/libs/consensus/v2/routines/getShard.ts index 1e558690f..b0c7a3b22 100644 --- a/src/libs/consensus/v2/routines/getShard.ts +++ b/src/libs/consensus/v2/routines/getShard.ts @@ -5,10 +5,39 @@ import { getSharedState } from "src/utilities/sharedState" import log from "src/utilities/logger" import Chain from "src/libs/blockchain/chain" -export default async function getShard(seed: string): Promise { +export default async function getShard( + seed: string, + debug = false, +): Promise { // ! we need to get the peers from the last 3 blocks too const allPeers = await PeerManager.getInstance().getOnlinePeers() + if (debug) { + log.debug( + "peers: " + + JSON.stringify( + allPeers.map(peer => ({ + url: peer.connection.string, + status: peer.sync.status, + })), + null, + 2, + ), + ) + } const peers = allPeers.filter(peer => peer.sync.status) + if (debug) { + log.debug( + "peers: " + + JSON.stringify( + peers.map(peer => ({ + url: peer.connection.string, + status: peer.sync.status, + })), + null, + 2, + ), + ) + } // const peerIdentites = peers.map(peer => peer.identity) @@ -58,6 +87,19 @@ export default async function getShard(seed: string): Promise { // REVIEW: sort available peers by .identity (which is a hex string) // before choosing the peers for a uniform sample across nodes availablePeers.sort((a, b) => a.identity.localeCompare(b.identity)) + if (debug) { + log.debug( + "availablePeers: " + + JSON.stringify( + availablePeers.map(peer => ({ + url: peer.connection.string, + status: peer.sync.status, + })), + null, + 2, + ), + ) + } log.debug("availablePeers: " + JSON.stringify(availablePeers, null, 2)) // REVIEW: check if this is the right way to do it // NOTE Choosing the secretary by randomly ordering the list: the first one is the secretary @@ -66,17 +108,30 @@ export default async function getShard(seed: string): Promise { shard.push(availablePeers[index]) availablePeers.splice(index, 1) } + if (debug) { + log.debug( + "shard: " + + JSON.stringify( + shard.map(peer => ({ + url: peer.connection.string, + status: peer.sync.status, + })), + null, + 2, + ), + ) + } // Setting the last shard - getSharedState.lastShard = shard.map(peer => peer.identity) - if (getSharedState.lastShard.length < 3) { + // getSharedState.lastShard = shard.map(peer => peer.identity) + if (shard.length < 3) { log.warning( "There are less than 3 peers in the last shard: this could be a security issue", ) } - log.info(`Last shard: ${getSharedState.lastShard}`) + log.info(`Last shard: ${shard.map(peer => peer.identity)}`) log.custom( "last_shard", - JSON.stringify(getSharedState.lastShard, null, 2), + JSON.stringify(shard.map(peer => peer.identity), null, 2), false, true, ) diff --git a/src/libs/consensus/v2/routines/manageProposeBlockHash.ts b/src/libs/consensus/v2/routines/manageProposeBlockHash.ts index dea05002b..861784054 100644 --- a/src/libs/consensus/v2/routines/manageProposeBlockHash.ts +++ b/src/libs/consensus/v2/routines/manageProposeBlockHash.ts @@ -7,6 +7,8 @@ import _ from "lodash" import ensureCandidateBlockFormed from "./ensureCandidateBlockFormed" import { hexToUint8Array, ucrypto } from "@kynesyslabs/demosdk/encryption" import PeerManager from "@/libs/peer/PeerManager" +import getCommonValidatorSeed from "./getCommonValidatorSeed" +import getShard from "./getShard" export default async function manageProposeBlockHash( blockHash: string, @@ -19,12 +21,17 @@ export default async function manageProposeBlockHash( log.info("Validation Data: \n" + JSON.stringify(validationData, null, 2)) log.info("Peer ID: " + peerId) // Checking if the validator that sent us the block hash is in the shard - const shard = getSharedState.lastShard - const validator = shard.find(validator => validator === peerId) + // const shard = getSharedState.lastShard + const { commonValidatorSeed } = await getCommonValidatorSeed() + const shard = await getShard(commonValidatorSeed) + + const validator = shard.find(validator => validator.identity === peerId) const peer = PeerManager.getInstance().getPeer(peerId) if (!validator) { log.error( - "[manageProposeBlockHash] Validator (" + peer.connection.string + ") is not in the shard: refusing the block hash", + "[manageProposeBlockHash] Validator (" + + peer.connection.string + + ") is not in the shard: refusing the block hash", ) response.result = 401 response.response = getSharedState.publicKeyHex diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index abcd4ecef..99c5196d1 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -109,12 +109,7 @@ export default async function manageConsensusRoutines( 2, ), ) - const sharedStateLastShard = [] - - for (const pubkey of getSharedState.lastShard) { - const peer = PeerManager.getInstance().getPeer(pubkey) - sharedStateLastShard.push(peer.connection.string) - } + const sharedStateLastShard = shard.map(m => m.identity) log.error( "shared state last shard: " + diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index e5e50f41f..6559da214 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -77,7 +77,7 @@ export default class SharedState { peerRoutineRunning = 0 // SECTION shared state variables shard: Peer[] - lastShard: string[] // ? Should be used by PoRBFT.ts consensus and should contain all the public keys of the nodes in the last shard + // lastShard: string[] // ? Should be used by PoRBFT.ts consensus and should contain all the public keys of the nodes in the last shard identity: Identity keypair: { publicKey: From cfbe7e5458e5149891f072f1b0fc96b08cf9a701 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 09:22:31 +0300 Subject: [PATCH 10/16] debug log getShard --- src/libs/network/manageConsensusRoutines.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 99c5196d1..0832924fb 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -74,7 +74,7 @@ export default async function manageConsensusRoutines( // Also refuses the routine if we are not in the shard const { commonValidatorSeed } = await getCommonValidatorSeed() - const shard = await getShard(commonValidatorSeed) + const shard = await getShard(commonValidatorSeed, true) const ourId = getSharedState.publicKeyHex let isInShard = false From cb5d6f2ef88780c6bc70c35cf1dcd0ba0b4df978 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Tue, 30 Sep 2025 10:56:41 +0300 Subject: [PATCH 11/16] more tweaks --- src/libs/consensus/v2/PoRBFT.ts | 4 ++ src/libs/consensus/v2/routines/getShard.ts | 16 ++++---- .../consensus/v2/types/secretaryManager.ts | 4 +- src/libs/network/manageConsensusRoutines.ts | 37 ++++++++++++++++--- 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/libs/consensus/v2/PoRBFT.ts b/src/libs/consensus/v2/PoRBFT.ts index 04700b28c..4e6349db3 100644 --- a/src/libs/consensus/v2/PoRBFT.ts +++ b/src/libs/consensus/v2/PoRBFT.ts @@ -179,7 +179,9 @@ export async function consensusRoutine(): Promise { } // INFO: CONSENSUS ACTION 5: Forge the block + log.only("Forging block 🏁") const block = await forgeBlock(tempMempool, peerlist) // NOTE The GCR hash is calculated here and added to the block + log.only("Block forged βœ…") // REVIEW Set last consensus time to the current block timestamp getSharedState.lastConsensusTime = block.content.timestamp @@ -205,7 +207,9 @@ export async function consensusRoutine(): Promise { } // INFO: CONSENSUS ACTION 7: End the consensus routine + log.only("Sending validator phase 7 to the secretary 🏁") await updateValidatorPhase(7) + log.only("Validator phase 7 sent to the secretary βœ…") } catch (error) { if (error instanceof NotInShardError) { log.info( diff --git a/src/libs/consensus/v2/routines/getShard.ts b/src/libs/consensus/v2/routines/getShard.ts index b0c7a3b22..e79793ef0 100644 --- a/src/libs/consensus/v2/routines/getShard.ts +++ b/src/libs/consensus/v2/routines/getShard.ts @@ -12,7 +12,7 @@ export default async function getShard( // ! we need to get the peers from the last 3 blocks too const allPeers = await PeerManager.getInstance().getOnlinePeers() if (debug) { - log.debug( + log.only( "peers: " + JSON.stringify( allPeers.map(peer => ({ @@ -26,7 +26,7 @@ export default async function getShard( } const peers = allPeers.filter(peer => peer.sync.status) if (debug) { - log.debug( + log.only( "peers: " + JSON.stringify( peers.map(peer => ({ @@ -80,15 +80,15 @@ export default async function getShard( console.log("[getShard] maxShardSize: ", maxShardSize) const shard: Peer[] = [] log.custom("last_shard", "Shard seed is: " + seed) - getSharedState.lastShardSeed = seed - const random = Alea(seed) + // getSharedState.lastShardSeed = seed + const deterministicRandomness = Alea(seed) const availablePeers = [...peers] // REVIEW: sort available peers by .identity (which is a hex string) // before choosing the peers for a uniform sample across nodes availablePeers.sort((a, b) => a.identity.localeCompare(b.identity)) if (debug) { - log.debug( + log.only( "availablePeers: " + JSON.stringify( availablePeers.map(peer => ({ @@ -100,16 +100,16 @@ export default async function getShard( ), ) } - log.debug("availablePeers: " + JSON.stringify(availablePeers, null, 2)) + log.only("availablePeers: " + JSON.stringify(availablePeers.map(peer => peer.connection.string), null, 2)) // REVIEW: check if this is the right way to do it // NOTE Choosing the secretary by randomly ordering the list: the first one is the secretary for (let i = 0; i < maxShardSize && availablePeers.length > 0; i++) { - const index = Math.floor(random() * availablePeers.length) + const index = Math.floor(deterministicRandomness() * availablePeers.length) shard.push(availablePeers[index]) availablePeers.splice(index, 1) } if (debug) { - log.debug( + log.only( "shard: " + JSON.stringify( shard.map(peer => ({ diff --git a/src/libs/consensus/v2/types/secretaryManager.ts b/src/libs/consensus/v2/types/secretaryManager.ts index 19ab6fe87..9485074bb 100644 --- a/src/libs/consensus/v2/types/secretaryManager.ts +++ b/src/libs/consensus/v2/types/secretaryManager.ts @@ -13,8 +13,8 @@ import getCommonValidatorSeed from "../routines/getCommonValidatorSeed" // ANCHOR SecretaryManager export default class SecretaryManager { - private _greenlight_timeout = 15000 // 15 seconds - private _set_validator_phase_timeout = 10000 // 10 seconds + private _greenlight_timeout = 30000 // 15 seconds + private _set_validator_phase_timeout = 15000 // 10 seconds private static instance: SecretaryManager // Internal variables diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 0832924fb..52c6f2f09 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -16,6 +16,7 @@ import Cryptography from "../crypto/cryptography" import SecretaryManager from "../consensus/v2/types/secretaryManager" import { Waiter } from "src/utilities/waiter" import { PeerManager } from "../peer" +import Chain from "../blockchain/chain" export interface ConsensusMethod { method: @@ -74,7 +75,7 @@ export default async function manageConsensusRoutines( // Also refuses the routine if we are not in the shard const { commonValidatorSeed } = await getCommonValidatorSeed() - const shard = await getShard(commonValidatorSeed, true) + const shard = await getShard(commonValidatorSeed) const ourId = getSharedState.publicKeyHex let isInShard = false @@ -86,6 +87,23 @@ export default async function manageConsensusRoutines( } if (!isInShard) { + // INFO: If is a greenlight request, return 200 + if (payload.method == "greenlight") { + response.result = 200 + response.response = "Greenlight received too late, ignoring" + return response + } + + if (payload.method == "setValidatorPhase") { + response.result = 200 + response.response = + "Set validator phase received too late, ignoring" + response.extra = { + greenlight: true, + } + return response + } + response.result = 400 response.response = "We are not in the shard(" + @@ -93,14 +111,13 @@ export default async function manageConsensusRoutines( "), cannot proceed with the routine" log.error("πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’") + log.error("Payload: " + JSON.stringify(payload, null, 2)) log.error( "We are not in the shard(" + getSharedState.exposedUrl + "), cannot proceed with the routine", ) - log.error( - "current validator seed: " + commonValidatorSeed, - ) + log.error("current validator seed: " + commonValidatorSeed) log.error( "calculated shard: " + JSON.stringify( @@ -109,7 +126,7 @@ export default async function manageConsensusRoutines( 2, ), ) - const sharedStateLastShard = shard.map(m => m.identity) + const sharedStateLastShard = shard.map(m => m.connection.string) log.error( "shared state last shard: " + @@ -117,6 +134,16 @@ export default async function manageConsensusRoutines( ) log.error("last block number: " + getSharedState.lastBlockNumber) log.error("πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’πŸš’") + + // INFO: Check if seed is from past consensus round + // const lastBlockMinus1 = await Chain.getBlockByNumber(getSharedState.lastBlockNumber - 1) + // const { commonValidatorSeed: pastCommonValidatorSeed } = await getCommonValidatorSeed(lastBlockMinus1) + // if (pastCommonValidatorSeed == commonValidatorSeed) { + // log.error("Seed is from past consensus round") + // response.result = 400 + // response.response = "Seed is from past consensus round" + // return response + // } return response } From 48c948f0f75af5e22ed149ef98a4bbf76acd7013 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Fri, 3 Oct 2025 14:55:22 +0300 Subject: [PATCH 12/16] fix: consensus timeouts due to secretary routine abort error on secretary node + use fs promises instead of writeFileSync + include older transactions in getMempool when given blocknumber + optimize nested for loops in broadcastBlockHash.ts and peerGossip.ts + disable abort of SET_WAIT_STATUS + add yieldToEventLoop in mainLoop + move mainLoopSleepTime to sharedState + update waiter class to prehold waiter key with return value --- src/index.ts | 2 +- src/libs/blockchain/gcr/gcr.ts | 4 +- src/libs/blockchain/mempool_v2.ts | 5 +- src/libs/consensus/v2/PoRBFT.ts | 4 +- .../v2/routines/broadcastBlockHash.ts | 11 ++- src/libs/consensus/v2/routines/getShard.ts | 1 - .../consensus/v2/types/secretaryManager.ts | 90 +++++++++++++------ src/libs/identity/identity.ts | 2 +- src/libs/identity/tools/twitter.ts | 4 +- src/libs/network/manageConsensusRoutines.ts | 35 +++++--- src/libs/peer/Peer.ts | 2 +- src/libs/peer/routines/peerGossip.ts | 60 ++++++------- src/libs/utils/keyMaker.ts | 6 +- src/model/datasource.ts | 1 - src/utilities/backupAndRestore.ts | 2 +- src/utilities/cli_libraries/wallet.ts | 8 +- src/utilities/commandLine.ts | 2 +- src/utilities/logger.ts | 4 +- src/utilities/mainLoop.ts | 17 +++- src/utilities/selfPeer.ts | 2 +- src/utilities/sharedState.ts | 3 +- src/utilities/waiter.ts | 19 ++-- 22 files changed, 173 insertions(+), 111 deletions(-) diff --git a/src/index.ts b/src/index.ts index fc16680f7..88226e292 100644 --- a/src/index.ts +++ b/src/index.ts @@ -242,7 +242,7 @@ async function preMainLoop() { getSharedState.connectionString = ourselves log.info("Our connection string is: " + ourselves) // And saves the public key file - fs.writeFileSync( + await fs.promises.writeFile( "publickey_" + getSharedState.signingAlgorithm + "_" + publicKeyHex, publicKeyHex + "\n", ) diff --git a/src/libs/blockchain/gcr/gcr.ts b/src/libs/blockchain/gcr/gcr.ts index 746476e8c..cb0d0b8fd 100644 --- a/src/libs/blockchain/gcr/gcr.ts +++ b/src/libs/blockchain/gcr/gcr.ts @@ -87,7 +87,7 @@ export class OperationsRegistry { } // INFO Adding an operation to the registry - add(operation: Operation) { + async add(operation: Operation) { this.operations.push({ operation: operation, status: "pending", @@ -97,7 +97,7 @@ export class OperationsRegistry { }, timestamp: Date.now(), }) - fs.writeFileSync(this.path, JSON.stringify(this.operations)) + await fs.promises.writeFile(this.path, JSON.stringify(this.operations)) } // INFO Getting the full list of operations currently in the registry diff --git a/src/libs/blockchain/mempool_v2.ts b/src/libs/blockchain/mempool_v2.ts index e41f7582b..4e78f0e91 100644 --- a/src/libs/blockchain/mempool_v2.ts +++ b/src/libs/blockchain/mempool_v2.ts @@ -1,4 +1,4 @@ -import { FindManyOptions, In, QueryFailedError, Repository } from "typeorm" +import { FindManyOptions, In, LessThanOrEqual, QueryFailedError, Repository } from "typeorm" import Datasource from "@/model/datasource" import TxUtils from "./transaction" @@ -17,6 +17,7 @@ export default class Mempool { /** * Returns the mempool. If `blockNumber` is not provided, returns all transactions. + * When `blockNumber` is transaction past from a previous block number are included. * * @param blockNumber - The block number to filter by */ @@ -31,7 +32,7 @@ export default class Mempool { if (blockNumber) { options.where = { - blockNumber: blockNumber, + blockNumber: LessThanOrEqual(blockNumber), } } diff --git a/src/libs/consensus/v2/PoRBFT.ts b/src/libs/consensus/v2/PoRBFT.ts index 4e6349db3..6f9ff84cd 100644 --- a/src/libs/consensus/v2/PoRBFT.ts +++ b/src/libs/consensus/v2/PoRBFT.ts @@ -253,8 +253,8 @@ export async function consensusRoutine(): Promise { // await Mempool.joinTemporaryMempool() // ? Is await ok here? } - log.debug("πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹") - log.debug("[consensusRoutine] CONSENSUS ROUTINE ENDED πŸ”₯πŸ”₯πŸ”₯") + log.only("πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹") + log.only("[consensusRoutine] CONSENSUS ROUTINE ENDED πŸ”₯πŸ”₯πŸ”₯") } // SECTION: Consensus functions! diff --git a/src/libs/consensus/v2/routines/broadcastBlockHash.ts b/src/libs/consensus/v2/routines/broadcastBlockHash.ts index d00f59746..006a39d8e 100644 --- a/src/libs/consensus/v2/routines/broadcastBlockHash.ts +++ b/src/libs/consensus/v2/routines/broadcastBlockHash.ts @@ -56,9 +56,9 @@ export async function broadcastBlockHash( const incomingSignatures: { [key: string]: string } = response.extra["signatures"] - for (const [identity, signature] of Object.entries( + const signatureVerificationPromises = Object.entries( incomingSignatures, - )) { + ).map(async ([identity, signature]) => { const isValid = await ucrypto.verify({ algorithm: getSharedState.signingAlgorithm, message: new TextEncoder().encode(block.hash), @@ -71,7 +71,7 @@ export async function broadcastBlockHash( log.debug( `Signature ${signature} from ${identity} added to the candidate block`, ) - continue + return { identity, signature, isValid: true } } log.error( @@ -82,7 +82,10 @@ export async function broadcastBlockHash( log.error( "Signature verification failed. Signature not added.", ) - } + return { identity, signature, isValid: false } + }) + + await Promise.all(signatureVerificationPromises) pro++ } else { log.error( diff --git a/src/libs/consensus/v2/routines/getShard.ts b/src/libs/consensus/v2/routines/getShard.ts index e79793ef0..bd12709e5 100644 --- a/src/libs/consensus/v2/routines/getShard.ts +++ b/src/libs/consensus/v2/routines/getShard.ts @@ -100,7 +100,6 @@ export default async function getShard( ), ) } - log.only("availablePeers: " + JSON.stringify(availablePeers.map(peer => peer.connection.string), null, 2)) // REVIEW: check if this is the right way to do it // NOTE Choosing the secretary by randomly ordering the list: the first one is the secretary for (let i = 0; i < maxShardSize && availablePeers.length > 0; i++) { diff --git a/src/libs/consensus/v2/types/secretaryManager.ts b/src/libs/consensus/v2/types/secretaryManager.ts index 9485074bb..3f02802ad 100644 --- a/src/libs/consensus/v2/types/secretaryManager.ts +++ b/src/libs/consensus/v2/types/secretaryManager.ts @@ -57,7 +57,9 @@ export default class SecretaryManager { log.only( "Shard members: " + - JSON.stringify(this.shard.members.map(m => m.connection.string)), + JSON.stringify( + this.shard.members.map(m => m.connection.string), + ), ) if ( @@ -71,7 +73,10 @@ export default class SecretaryManager { log.only("INITIALIZED SHARD:") log.only( - "SHARD: " + JSON.stringify(this.shard.members.map(m => m.connection.string)), + "SHARD: " + + JSON.stringify( + this.shard.members.map(m => m.connection.string), + ), ) log.only("SECRETARY: " + this.secretary.identity) @@ -85,7 +90,9 @@ export default class SecretaryManager { // INFO: Start the secretary routine if (this.checkIfWeAreSecretary()) { - log.only("⬜️ We are the secretary ⬜️. starting the secretary routine") + log.only( + "⬜️ We are the secretary ⬜️. starting the secretary routine", + ) this.secretaryRoutine().then(() => { log.only("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") }) @@ -620,13 +627,13 @@ export default class SecretaryManager { return false } - log.debug("Received green light for phase: " + validatorPhase) - log.debug("---- DIAGNOSTICS ----") - log.debug("Our phase: " + this.ourValidatorPhase.currentPhase) - log.debug("Our blockRef: " + this.shard.blockRef) - log.debug("Secretary timestamp: " + secretaryBlockTimestamp) - log.debug("Secretary: " + this.secretary.identity) - log.debug("---- END DIAGNOSTICS ----") + log.only("Received green light for phase: " + validatorPhase) + log.only("---- DIAGNOSTICS ----") + log.only("Our phase: " + this.ourValidatorPhase.currentPhase) + log.only("Our blockRef: " + this.shard.blockRef) + log.only("Secretary timestamp: " + secretaryBlockTimestamp) + log.only("Secretary: " + this.secretary.identity) + log.only("---- END DIAGNOSTICS ----") if (secretaryBlockTimestamp < this.blockTimestamp) { log.debug( @@ -644,13 +651,36 @@ export default class SecretaryManager { this.blockTimestamp = secretaryBlockTimestamp } - const waiterKey = Waiter.keys.GREEN_LIGHT + validatorPhase + const waiterKey = + Waiter.keys.GREEN_LIGHT + this.shard.blockRef + validatorPhase + log.only("Waiter key: " + waiterKey) + + if (Waiter.isWaiting(waiterKey)) { + Waiter.resolve(waiterKey, secretaryBlockTimestamp) + this.ourValidatorPhase.waitStatus = false + return true + } - if (this.ourValidatorPhase.currentPhase + 1 == validatorPhase) { - log.debug(`[SECRETARY ROUTINE] Pre-holding the key: ${waiterKey}`) - Waiter.preHold(waiterKey) + if (this.ourValidatorPhase.currentPhase <= validatorPhase) { + log.only(`[SECRETARY ROUTINE] Pre-holding the key: ${waiterKey}`) + Waiter.preHold(waiterKey, secretaryBlockTimestamp) + return true } + if (this.ourValidatorPhase.currentPhase > validatorPhase){ + // INFO: Older greenlight received, ignoring it + return true + } + + log.only("We don't know what to do with this green light") + log.only("Validator phase: " + validatorPhase) + log.only("Our phase: " + this.ourValidatorPhase.currentPhase) + log.only("Secretary block timestamp: " + secretaryBlockTimestamp) + log.only("Block timestamp: " + this.blockTimestamp) + process.exit(1) + + return false + // log.debug("Our phase: " + this.ourValidatorPhase.currentPhase) // if (validatorPhase > this.ourValidatorPhase.currentPhase) { // // INFO: This node has already timed out @@ -698,13 +728,20 @@ export default class SecretaryManager { // // await this.simulateSecretaryGoingOffline() // } + log.only("Sending our validator phase to the secretary") + log.only("Our phase: " + this.ourValidatorPhase.currentPhase) + log.only("Shard block ref: " + this.shard.blockRef) + const waiterKey = - Waiter.keys.GREEN_LIGHT + this.ourValidatorPhase.currentPhase - const greenlight: Promise = Waiter.wait( + Waiter.keys.GREEN_LIGHT + this.shard.blockRef + this.ourValidatorPhase.currentPhase + const greenlight: Promise = Waiter.wait( waiterKey, this._greenlight_timeout, ) + log.only("Greenlight waiter created") + log.only("Waiter key: " + waiterKey) + const sendStatus = async () => { const request: RPCRequest = { method: "consensus_routine", @@ -846,25 +883,26 @@ export default class SecretaryManager { } public async endConsensusRoutine() { + log.only("Ending the consensus routine") SecretaryManager.instance.runSecretaryRoutine = false SecretaryManager.instance = null - + // TODO: Abort all waiters - + // INFO: Resolve all hanging waiters if (Waiter.isWaiting(Waiter.keys.GREEN_LIGHT)) { - log.debug( - "GREEN_LIGHT waiter found WHEN ENDING THE CONSENSUS ..., resolving it", + log.only( + "GREEN_LIGHT waiter found WHEN ENDING THE CONSENSUS ..., KILLING IT it", ) Waiter.abort(Waiter.keys.GREEN_LIGHT) } - if (Waiter.isWaiting(Waiter.keys.SET_WAIT_STATUS)) { - log.debug( - "SET_WAIT_STATUS waiter found WHEN ENDING THE CONSENSUS ..., resolving it", - ) - Waiter.abort(Waiter.keys.SET_WAIT_STATUS) - } + // if (Waiter.isWaiting(Waiter.keys.SET_WAIT_STATUS)) { + // log.only( + // "SET_WAIT_STATUS waiter found WHEN ENDING THE CONSENSUS ..., KILLING IT it", + // ) + // Waiter.abort(Waiter.keys.SET_WAIT_STATUS) + // } } // SECTION Methods called by the shard members diff --git a/src/libs/identity/identity.ts b/src/libs/identity/identity.ts index f36932929..7da39e622 100644 --- a/src/libs/identity/identity.ts +++ b/src/libs/identity/identity.ts @@ -144,7 +144,7 @@ export default class Identity { // INFO: If the identity file does not exist, create a new one const mnemonic = demos.newMnemonic() this.masterSeed = await this.mnemonicToSeed(mnemonic) - fs.writeFileSync(getSharedState.identityFile, mnemonic, { + await fs.promises.writeFile(getSharedState.identityFile, mnemonic, { encoding: "utf8", }) } diff --git a/src/libs/identity/tools/twitter.ts b/src/libs/identity/tools/twitter.ts index 3a1882230..c2ab4ea2f 100644 --- a/src/libs/identity/tools/twitter.ts +++ b/src/libs/identity/tools/twitter.ts @@ -524,7 +524,7 @@ export class Twitter { ) if (res.status === 200) { - fs.writeFileSync( + await fs.promises.writeFile( `data/twitter/${userId}.json`, JSON.stringify(res.data, null, 2), ) @@ -543,7 +543,7 @@ export class Twitter { ) if (res.status === 200) { - fs.writeFileSync( + await fs.promises.writeFile( `data/twitter/${userId}_followers.json`, JSON.stringify(res.data, null, 2), ) diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 52c6f2f09..d7432264a 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -38,6 +38,12 @@ export default async function manageConsensusRoutines( sender: string, payload: ConsensusMethod, ): Promise { + log.only("πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘ RECEIVED CONSENSUS CALL πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘") + + const peer = PeerManager.getInstance().getPeer(sender) + log.only("Sender: " + peer.connection.string) + log.only("Payload: " + JSON.stringify(payload, null, 2)) + log.only("-----------------------------") let response = _.cloneDeep(emptyResponse) /* REVIEW @@ -54,6 +60,8 @@ export default async function manageConsensusRoutines( const isConsensusRunning = isConsensusAlreadyRunning() const inConsensus = isConsensusTime || isConsensusRunning + log.only("inConsensus: " + inConsensus) + if (!inConsensus) { response.result = 400 response.response = @@ -86,22 +94,27 @@ export default async function manageConsensusRoutines( } } - if (!isInShard) { + log.only("isInShard: " + isInShard) + + inShardCheck: if (!isInShard) { // INFO: If is a greenlight request, return 200 if (payload.method == "greenlight") { - response.result = 200 - response.response = "Greenlight received too late, ignoring" - return response + // response.result = 200 + // response.response = "Greenlight received too late, ignoring" + + // return response + break inShardCheck } if (payload.method == "setValidatorPhase") { - response.result = 200 - response.response = - "Set validator phase received too late, ignoring" - response.extra = { - greenlight: true, - } - return response + // response.result = 200 + // response.response = + // "Set validator phase received too late, ignoring" + // response.extra = { + // greenlight: true, + // } + // return response + break inShardCheck } response.result = 400 diff --git a/src/libs/peer/Peer.ts b/src/libs/peer/Peer.ts index 4c9c31c52..16c3288c0 100644 --- a/src/libs/peer/Peer.ts +++ b/src/libs/peer/Peer.ts @@ -162,7 +162,7 @@ export default class Peer { ? `${request.method}.${request.params[0].method}` : request.method log.error( - "[PEER] [LONG CALL] Max retries reached for method: " + + "[PEER] [LONG CALL] [" + this.connection.string + "] Max retries reached for method: " + methodString + " - " + response, diff --git a/src/libs/peer/routines/peerGossip.ts b/src/libs/peer/routines/peerGossip.ts index da979df68..c9dd60570 100644 --- a/src/libs/peer/routines/peerGossip.ts +++ b/src/libs/peer/routines/peerGossip.ts @@ -129,50 +129,46 @@ async function peersGossipProcess( log.custom("peerGossip", "Peerlists merged", false) } -/** - * Merges multiple peer lists into a single, ordered, unique peer list. - * @param {Peer[][]} peerlists - Array of peer lists to merge. - * @returns {Promise} - Returns true when merge is complete. - */ async function mergePeerlists(peerlists: Peer[][]): Promise { - // let mergedPeerlist: Peer[] = [] const peerMap = new Map() - for (const peerlist of peerlists) { - for (const peer of peerlist) { - if (!peer) { - log.warning("[peerGossip] Peer is undefined, skipping") - continue - } + // Flatten and filter all peers at once, then process in batches + const allPeers = peerlists.flat().filter(peer => { + if (!peer) { + log.warning("[peerGossip] Peer is undefined, skipping") + return false + } + return true + }) - if (peerMap.has(peer.identity)) { - const existingPeer = peerMap.get(peer.identity) - // INFO: Update the peer + // Process peers in batches to yield to event loop + const batchSize = 10 + for (let i = 0; i < allPeers.length; i += batchSize) { + const batch = allPeers.slice(i, i + batchSize) + + for (const peer of batch) { + const existingPeer = peerMap.get(peer.identity) + + if (existingPeer) { + // Update existing peer if this one has higher block number if (peer.sync.block > existingPeer.sync.block) { existingPeer.sync.block = peer.sync.block existingPeer.sync.block_hash = peer.sync.block_hash existingPeer.sync.status = peer.sync.status } - continue + } else { + // Add new peer + peerMap.set(peer.identity, peer) } + } - peerMap.set(peer.identity, peer) - - // if (!mergedPeerlist.includes(peer)) { - // mergedPeerlist.push(peer) - // } + // Yield to event loop every batch + if (i + batchSize < allPeers.length) { + await new Promise(resolve => setImmediate(resolve)) } } const mergedPeerlist = Array.from(peerMap.values()) - - // for (let peer of mergedPeerlist) { - // if (!peer.sync.block_hash) { - // log.debug("found invalid sync status") - // process.exit(0) - // } - // } - mergedPeerlist.sort((a, b) => a.identity.localeCompare(b.identity)) PeerManager.getInstance().setPeers(mergedPeerlist) return true @@ -193,11 +189,7 @@ function selectPeersForGossip(peers: Peer[]): Peer[] { return peers } - log.custom( - "peerGossip", - `Selecting ${maxGossipPeers} random peers`, - false, - ) + log.custom("peerGossip", `Selecting ${maxGossipPeers} random peers`, false) return shuffleArray(peers).slice(0, maxGossipPeers) } diff --git a/src/libs/utils/keyMaker.ts b/src/libs/utils/keyMaker.ts index c7e7f374f..499472d39 100644 --- a/src/libs/utils/keyMaker.ts +++ b/src/libs/utils/keyMaker.ts @@ -26,7 +26,7 @@ async function main() { const forceNew = process.argv.includes("-f") if (forceNew && fs.existsSync(".demos_identity")) { - fs.unlinkSync(".demos_identity") + await fs.promises.unlink(".demos_identity") console.log("Existing .demos_identity file deleted.") } @@ -38,8 +38,8 @@ async function main() { console.log("Private Key:", privateKey) console.log("====\n\n") // Save to file - fs.writeFileSync("public.key", publicKey) - fs.writeFileSync(".demos_identity", "0x" + privateKey) + await fs.promises.writeFile("public.key", publicKey) + await fs.promises.writeFile(".demos_identity", "0x" + privateKey) // Logging console.log("Identity saved (or kept) to .demos_identity and public.key") } diff --git a/src/model/datasource.ts b/src/model/datasource.ts index 933196eac..fd1b8d5f2 100644 --- a/src/model/datasource.ts +++ b/src/model/datasource.ts @@ -43,7 +43,6 @@ export const dataSource = new DataSource({ GlobalChangeRegistry, GCRTracker, GCRMain, - GCRTracker, ], synchronize: true, logging: false, diff --git a/src/utilities/backupAndRestore.ts b/src/utilities/backupAndRestore.ts index d4f1bded1..8aa672d81 100644 --- a/src/utilities/backupAndRestore.ts +++ b/src/utilities/backupAndRestore.ts @@ -86,7 +86,7 @@ async function dumpUserData(): Promise { ) // Write the data to a JSON file - fs.writeFileSync( + await fs.promises.writeFile( outputPath, JSON.stringify(outputData, null, 2), "utf8", diff --git a/src/utilities/cli_libraries/wallet.ts b/src/utilities/cli_libraries/wallet.ts index c94facb43..88625364b 100644 --- a/src/utilities/cli_libraries/wallet.ts +++ b/src/utilities/cli_libraries/wallet.ts @@ -32,7 +32,7 @@ export default class Wallet { this.identity = forge.pki.ed25519.generateKeyPair() } - dispatch(dividedInput: any) { + async dispatch(dividedInput: any) { // We need the modes (2 to 3 arguments) if (dividedInput.length < 2 || dividedInput.length > 3) { console.log("Please specify a command") @@ -81,7 +81,7 @@ export default class Wallet { } // Writing to file try { - this.save(filename) + await this.save(filename) console.log("Wallet saved successfully") } catch (e) { console.log(e["message"]) @@ -120,8 +120,8 @@ export default class Wallet { }) } - save(filename: string) { - fs.writeFileSync(filename, this.identity.privateKey.toString("hex")) + async save(filename: string) { + await fs.promises.writeFile(filename, this.identity.privateKey.toString("hex")) } read(filename: string) { diff --git a/src/utilities/commandLine.ts b/src/utilities/commandLine.ts index 658032654..714e9e538 100644 --- a/src/utilities/commandLine.ts +++ b/src/utilities/commandLine.ts @@ -38,7 +38,7 @@ export default async function commandLine(): Promise { switch (input.toLowerCase()) { // INFO Wallet case is to work with wallets case "wallet": - Wallet.getInstance().dispatch(dividedInput) + await Wallet.getInstance().dispatch(dividedInput) break // TODO Write commands case "crypto": diff --git a/src/utilities/logger.ts b/src/utilities/logger.ts index d2533afc7..d9b2ab9b1 100644 --- a/src/utilities/logger.ts +++ b/src/utilities/logger.ts @@ -86,7 +86,7 @@ export default class Logger { ) } - static custom( + static async custom( logfile: string, message: string, logToTerminal = true, @@ -105,7 +105,7 @@ export default class Logger { fs.rmSync(this.LOG_CUSTOM_PREFIX + logfile + ".log", { force: true, }) - fs.writeFileSync(this.LOG_CUSTOM_PREFIX + logfile + ".log", "") + await fs.promises.writeFile(this.LOG_CUSTOM_PREFIX + logfile + ".log", "") } this.writeAsync(this.LOG_CUSTOM_PREFIX + logfile + ".log", logEntry) } diff --git a/src/utilities/mainLoop.ts b/src/utilities/mainLoop.ts index 33784b4a4..020f97eae 100644 --- a/src/utilities/mainLoop.ts +++ b/src/utilities/mainLoop.ts @@ -17,6 +17,11 @@ async function sleep(time: number) { return new Promise(resolve => setTimeout(resolve, time)) } +// Helper function to yield control back to the event loop +function yieldToEventLoop(): Promise { + return new Promise(resolve => setImmediate(resolve)) +} + export default async function mainLoop() { log.info("[MAIN LOOP] βœ… Started") // return await consensusRoutine() @@ -26,7 +31,7 @@ export default async function mainLoop() { } async function mainLoopCycle() { - await sleep(500) // Sleep for 500 ms + await sleep(getSharedState.mainLoopSleepTime) log.info( "\n============================================================\n", true, @@ -45,6 +50,7 @@ async function mainLoopCycle() { // Diagnostic logging log.info("[MAIN LOOP] Logging current diagnostics", false) logCurrentDiagnostics() + await yieldToEventLoop() // ANCHOR Execute the peer routine before the consensus loop /* NOTE The peerRoutine also checks getOnlinePeers, so it works by waiting for @@ -52,8 +58,13 @@ async function mainLoopCycle() { running the consensus routine. */ // let currentlyOnlinePeers: Peer[] = await peerRoutine() await checkOfflinePeers() + await yieldToEventLoop() + await peerGossip() + await yieldToEventLoop() + await fastSync([], "mainloop") // REVIEW Test here + await yieldToEventLoop() // we now have a list of online peers that can be used for consensus // ANCHOR Syncing the blockchain after the peer routine @@ -64,6 +75,7 @@ async function mainLoopCycle() { // ANCHOR Check if we have to forge the block now const isConsensusTimeReached = await consensusTime.checkConsensusTime() + await yieldToEventLoop() log.info("[MAINLOOP]: about to check if its time for consensus") if (!isConsensusTimeReached) { @@ -98,6 +110,7 @@ async function mainLoopCycle() { let timer = 0 while (getSharedState.peerRoutineRunning > 0) { await sleep(100) + await yieldToEventLoop() timer += 1 if (timer > 10) { log.error( @@ -107,8 +120,10 @@ async function mainLoopCycle() { break } } + await yieldToEventLoop() // ANCHOR Calling the consensus routine if is time for it await consensusRoutine() + await yieldToEventLoop() } else if (!getSharedState.syncStatus) { // ? This is a bit redundant, isn't it? log.warning( diff --git a/src/utilities/selfPeer.ts b/src/utilities/selfPeer.ts index 20b32ea70..e67747c5c 100644 --- a/src/utilities/selfPeer.ts +++ b/src/utilities/selfPeer.ts @@ -12,5 +12,5 @@ export default async function selfPeer() { ` const basicPeerScriptFile = "demos_peerlist.json" fs.rmSync(basicPeerScriptFile) - fs.writeFileSync(basicPeerScriptFile, basicPeerScript) + await fs.promises.writeFile(basicPeerScriptFile, basicPeerScript) } diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index 6559da214..17ccb71fd 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -30,7 +30,8 @@ export default class SharedState { lastShardSeed = "" referenceBlockRoom = 1 shardSize = parseInt(process.env.SHARD_SIZE) || 4 - + mainLoopSleepTime = parseInt(process.env.MAIN_LOOP_SLEEP_TIME) || 1000 // 1 second + // NOTE See calibrateTime.ts for this value timestampCorrection = 0 diff --git a/src/utilities/waiter.ts b/src/utilities/waiter.ts index de9518272..36588e5d3 100644 --- a/src/utilities/waiter.ts +++ b/src/utilities/waiter.ts @@ -23,7 +23,7 @@ type WaitEntry = { } export class Waiter { - static preHeld: Map = new Map() + static preHeld: Map = new Map() static waitList: Map = new Map() static keys = { GREEN_LIGHT: "greenLight", @@ -39,18 +39,17 @@ export class Waiter { * @param timeout - The timeout for the event * @returns The data of the resolved event */ - static async wait( - id: string, - timeout = 10000, - ): Promise { + static async wait(id: string, timeout = 10000): Promise { if (Waiter.waitList.has(id)) { throw new Error(`[WAITER] Already waiting for id: ${id}`) } if (Waiter.preHeld.has(id)) { - log.debug(`[WAITER] Found pre-held key: ${id}`) + log.only(`[WAITER] Found pre-held key: ${id} with value: ${Waiter.preHeld.get(id)}`) + const resolveValue = Waiter.preHeld.get(id) Waiter.preHeld.delete(id) - return null + log.only(`[WAITER] Resolved pre-held key: ${id} with data: ${resolveValue}`) + return resolveValue } const promise = new Promise((resolve, reject) => { @@ -102,8 +101,9 @@ export class Waiter { return data || null } - static preHold(id: string) { - Waiter.preHeld.set(id, null) + static preHold(id: string, data: any = null) { + log.only(`[WAITER] Pre-holding the key: ${id} with data: ${data}`) + Waiter.preHeld.set(id, data) } /** @@ -112,6 +112,7 @@ export class Waiter { * @param id - The id of the event to abort */ static abort(id: string) { + log.only(`[WAITER] Aborting the key: ${id}`) const entry = Waiter.waitList.get(id) if (!entry) { log.warning(`[WAITER] No wait entry found for ${id}`) From 1c950d020a3b240663eae0e062fe0071a55ac876 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Fri, 3 Oct 2025 15:05:55 +0300 Subject: [PATCH 13/16] try: set block time to 10 seconds --- src/libs/consensus/routines/consensusTime.ts | 2 +- src/utilities/sharedState.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libs/consensus/routines/consensusTime.ts b/src/libs/consensus/routines/consensusTime.ts index 4757c72ae..f7932bcfe 100644 --- a/src/libs/consensus/routines/consensusTime.ts +++ b/src/libs/consensus/routines/consensusTime.ts @@ -32,7 +32,7 @@ export async function checkConsensusTime( const consensusIntervalTime = getSharedState.getConsensusTime() log.debug("[CONSENSUS TIME] lastTimestamp: " + lastTimestamp, true) log.debug("[CONSENSUS TIME] currentTimestamp: " + currentTimestamp, true) - log.debug("[CONSENSUS TIME] delta: " + delta, true) + log.only("[CONSENSUS TIME] delta: " + delta, true) log.debug( "[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, true, diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index 17ccb71fd..fda7493f9 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -22,7 +22,7 @@ export default class SharedState { version_name = "Entangled Polymer" signingAlgorithm = "ed25519" as SigningAlgorithm - block_time = 20 // TODO Get it from the genesis (or see Consensus module) + block_time = 10 // TODO Get it from the genesis (or see Consensus module) currentTimestamp = 0 currentUTCTime = 0 From 4e5c5fab4c93d7aa6eb5a6649d8974c700a4ef44 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Sat, 4 Oct 2025 12:54:32 +0300 Subject: [PATCH 14/16] try: create a new SecretaryManage instance class for each consensus round - require secretaryManager.getInstance to use a blockRef number + handle instance deletion + transmit block ref on greenlight + handle non existent instances + update Waiter.wait to allow waiting for key in multiple places --- src/libs/blockchain/mempool_v2.ts | 20 +-- src/libs/consensus/routines/consensusTime.ts | 2 +- src/libs/consensus/v2/PoRBFT.ts | 86 +++++++----- .../consensus/v2/types/secretaryManager.ts | 129 +++++++++++++++--- src/libs/network/manageConsensusRoutines.ts | 42 ++++-- src/utilities/waiter.ts | 23 +++- 6 files changed, 219 insertions(+), 83 deletions(-) diff --git a/src/libs/blockchain/mempool_v2.ts b/src/libs/blockchain/mempool_v2.ts index 4e78f0e91..3c194569e 100644 --- a/src/libs/blockchain/mempool_v2.ts +++ b/src/libs/blockchain/mempool_v2.ts @@ -1,4 +1,10 @@ -import { FindManyOptions, In, LessThanOrEqual, QueryFailedError, Repository } from "typeorm" +import { + FindManyOptions, + In, + LessThanOrEqual, + QueryFailedError, + Repository, +} from "typeorm" import Datasource from "@/model/datasource" import TxUtils from "./transaction" @@ -7,6 +13,7 @@ import { MempoolTx } from "@/model/entities/Mempool" import { Transaction } from "@kynesyslabs/demosdk/types" import SecretaryManager from "../consensus/v2/types/secretaryManager" import Chain from "./chain" +import { getSharedState } from "@/utilities/sharedState" export default class Mempool { public static repo: Repository = null @@ -84,11 +91,10 @@ export default class Mempool { } let blockNumber: number - const manager = SecretaryManager.getInstance() // INFO: If we're in consensus, move tx to next block - if (manager.shard?.blockRef) { - blockNumber = manager.shard.blockRef + 1 + if (getSharedState.inConsensusLoop) { + blockNumber = SecretaryManager.lastBlockRef + 1 } if (!blockNumber) { @@ -153,15 +159,13 @@ export default class Mempool { } } - const manager = SecretaryManager.getInstance() - const blockNumber = manager.shard?.blockRef - - if (!blockNumber) { + if (!getSharedState.inConsensusLoop) { return { success: false, mempool: [], } } + const blockNumber = SecretaryManager.lastBlockRef const existingHashes = await this.getMempoolHashMap(blockNumber) const incomingSet = {} diff --git a/src/libs/consensus/routines/consensusTime.ts b/src/libs/consensus/routines/consensusTime.ts index f7932bcfe..189f6f075 100644 --- a/src/libs/consensus/routines/consensusTime.ts +++ b/src/libs/consensus/routines/consensusTime.ts @@ -32,7 +32,7 @@ export async function checkConsensusTime( const consensusIntervalTime = getSharedState.getConsensusTime() log.debug("[CONSENSUS TIME] lastTimestamp: " + lastTimestamp, true) log.debug("[CONSENSUS TIME] currentTimestamp: " + currentTimestamp, true) - log.only("[CONSENSUS TIME] delta: " + delta, true) + log.only("[CONSENSUS TIME] delta: " + delta) log.debug( "[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, true, diff --git a/src/libs/consensus/v2/PoRBFT.ts b/src/libs/consensus/v2/PoRBFT.ts index 6f9ff84cd..394035662 100644 --- a/src/libs/consensus/v2/PoRBFT.ts +++ b/src/libs/consensus/v2/PoRBFT.ts @@ -24,6 +24,7 @@ import { } from "src/exceptions" import HandleGCR from "src/libs/blockchain/gcr/handleGCR" import { GCREdit } from "@kynesyslabs/demosdk/types" +import { Waiter } from "@/utilities/waiter" /* INFO # Semaphore system @@ -62,8 +63,11 @@ export async function consensusRoutine(): Promise { ) return } - log.debug("πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯") - const manager = SecretaryManager.getInstance() + log.only("πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯") + const manager = SecretaryManager.getInstance( + getSharedState.lastBlockNumber + 1, + true, + ) // Defining the variables needed for rolling back the GCREdits let successfulTxs: string[] = [] @@ -255,6 +259,8 @@ export async function consensusRoutine(): Promise { log.only("πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹") log.only("[consensusRoutine] CONSENSUS ROUTINE ENDED πŸ”₯πŸ”₯πŸ”₯") + log.only("Waiters: " + JSON.stringify(Array.from(Waiter.waitList.keys()), null, 2)) + log.only("Preheld keys: " + Array.from(Waiter.preHeld.keys()).length) } // SECTION: Consensus functions! @@ -408,34 +414,34 @@ async function applyGCREditsFromMergedMempool( return [successfulTxs, failedTxs] } -/** - * Apply the GCR operations to the state - * - * @param mempool - The mempool - * @returns The successful and failed GCR operations - */ -async function applyGCRForNewBlock( - mempool: Transaction[], -): Promise<[string[], string[]]> { - const successfulTxs: string[] = [] - const failedTxs: string[] = [] - for (const tx of mempool) { - const operation = await txToGCROperation(tx) - const success = await applyGCROperation(operation) - if (success) { - successfulTxs.push(tx.hash) - } else { - failedTxs.push(tx.hash) - } - } - - log.info(`[consensusRoutine] Successful GCR operations: ${successfulTxs}`) - log.info(`[consensusRoutine] Failed GCR operations: ${failedTxs}`) - // await updateValidatorStatus("appliedGCR", true, false, true) - // Using the secretary to update the local statuses - await updateValidatorPhase(4) - return [successfulTxs, failedTxs] -} +// /** +// * Apply the GCR operations to the state +// * +// * @param mempool - The mempool +// * @returns The successful and failed GCR operations +// */ +// async function applyGCRForNewBlock( +// mempool: Transaction[], +// ): Promise<[string[], string[]]> { +// const successfulTxs: string[] = [] +// const failedTxs: string[] = [] +// for (const tx of mempool) { +// const operation = await txToGCROperation(tx) +// const success = await applyGCROperation(operation) +// if (success) { +// successfulTxs.push(tx.hash) +// } else { +// failedTxs.push(tx.hash) +// } +// } + +// log.info(`[consensusRoutine] Successful GCR operations: ${successfulTxs}`) +// log.info(`[consensusRoutine] Failed GCR operations: ${failedTxs}`) +// // await updateValidatorStatus("appliedGCR", true, false, true) +// // Using the secretary to update the local statuses +// await updateValidatorPhase(4) +// return [successfulTxs, failedTxs] +// } /** * Merge the peerlist between the shard and the local node @@ -540,8 +546,13 @@ async function finalizeBlock(block: Block, pro: number): Promise { console.log(lastBlock) } -function preventForgingEnded() { - const manager = SecretaryManager.getInstance() +function preventForgingEnded(blockRef: number) { + const manager = SecretaryManager.getInstance(blockRef) + log.only("preventForgingEnded blockRef: " + blockRef) + + if (!manager) { + throw new ForgingEndedError("Secretary manager not found") + } if ( getSharedState.lastBlockNumber === manager.shard.blockRef && @@ -560,16 +571,21 @@ function preventForgingEnded() { * @returns The block timestamp returned by the secretary */ async function updateValidatorPhase(phase: number): Promise { - const manager = SecretaryManager.getInstance() + let blockRef = getSharedState.lastBlockNumber + 1 + if (phase == 7) { + blockRef = SecretaryManager.lastBlockRef + } + + const manager = SecretaryManager.getInstance(blockRef) await manager.setOurValidatorPhase(phase, true) - preventForgingEnded() + preventForgingEnded(blockRef) // INFO: If it's the first phase, the secretary might not have started the consensus routine yet, // Increase retry steps to 10 to wait for the secretary to start const retries = phase === 1 ? 10 : 3 const res = await manager.sendOurValidatorPhaseToSecretary(retries) - preventForgingEnded() + preventForgingEnded(blockRef) log.debug( `[updateValidatorStatus πŸŽ‰] Validator phase ${phase} resolved with value: ${JSON.stringify( res, diff --git a/src/libs/consensus/v2/types/secretaryManager.ts b/src/libs/consensus/v2/types/secretaryManager.ts index 3f02802ad..a94ff4a80 100644 --- a/src/libs/consensus/v2/types/secretaryManager.ts +++ b/src/libs/consensus/v2/types/secretaryManager.ts @@ -13,9 +13,18 @@ import getCommonValidatorSeed from "../routines/getCommonValidatorSeed" // ANCHOR SecretaryManager export default class SecretaryManager { - private _greenlight_timeout = 30000 // 15 seconds - private _set_validator_phase_timeout = 15000 // 10 seconds - private static instance: SecretaryManager + private _greenlight_timeout = 30_000 // 15 seconds + private _set_validator_phase_timeout = 15_000 // 10 seconds + + // A map of block numbers to SecretaryManager instances + private static instances: Map = new Map() + static get lastBlockRef() { + return ( + Array.from(SecretaryManager.instances.keys()).sort( + (a, b) => b - a, + )[0] || 0 + ) + } // Internal variables public shard: Shard @@ -93,8 +102,8 @@ export default class SecretaryManager { log.only( "⬜️ We are the secretary ⬜️. starting the secretary routine", ) - this.secretaryRoutine().then(() => { - log.only("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") + this.secretaryRoutine().finally(async () => { + log.debug("Secretary routine finished confetti confetti πŸŽŠπŸŽ‰") }) } @@ -553,7 +562,11 @@ export default class SecretaryManager { params: [ { method: "greenlight", - params: [this.blockTimestamp, phase], + params: [ + this.shard.blockRef, + this.blockTimestamp, + phase, + ], }, ], } @@ -663,11 +676,16 @@ export default class SecretaryManager { if (this.ourValidatorPhase.currentPhase <= validatorPhase) { log.only(`[SECRETARY ROUTINE] Pre-holding the key: ${waiterKey}`) + log.only("Is Waiting for key: " + Waiter.isWaiting(waiterKey)) + log.only( + "Waitlist keys: " + + JSON.stringify(Array.from(Waiter.waitList.keys()), null, 2), + ) Waiter.preHold(waiterKey, secretaryBlockTimestamp) return true } - if (this.ourValidatorPhase.currentPhase > validatorPhase){ + if (this.ourValidatorPhase.currentPhase > validatorPhase) { // INFO: Older greenlight received, ignoring it return true } @@ -733,7 +751,9 @@ export default class SecretaryManager { log.only("Shard block ref: " + this.shard.blockRef) const waiterKey = - Waiter.keys.GREEN_LIGHT + this.shard.blockRef + this.ourValidatorPhase.currentPhase + Waiter.keys.GREEN_LIGHT + + this.shard.blockRef + + this.ourValidatorPhase.currentPhase const greenlight: Promise = Waiter.wait( waiterKey, this._greenlight_timeout, @@ -884,18 +904,58 @@ export default class SecretaryManager { public async endConsensusRoutine() { log.only("Ending the consensus routine") - SecretaryManager.instance.runSecretaryRoutine = false - SecretaryManager.instance = null - + const manager = SecretaryManager.instances.get(this.shard.blockRef) + + if (manager) { + manager.runSecretaryRoutine = false + } + const filter = (key: string) => + key.includes("greenLight" + this.shard.blockRef) + + const waiterKeys = Array.from(Waiter.waitList.keys()).filter(filter) + const waiters = waiterKeys.map(key => Waiter.wait(key)) + + log.only( + "πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’ WAITING FOR HANGING GREENLIGHTS πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’", + ) + log.only("Waiter keys: " + JSON.stringify(waiterKeys, null, 2)) + try { + await Promise.all(waiters) + } catch (error) { + console.error(error) + process.exit(1) + log.error("Error waiting for hanging greenlights: " + error) + } + + // INFO: Delete pre-held keys for ended consensus round + Waiter.preHeld + .keys() + .filter(filter) + .forEach(key => Waiter.preHeld.delete(key)) + + log.only( + "😎😎😎😎😎😎😎😎😎😎 HANGING GREENLIGHTS RESOLVED 😎😎😎😎😎😎😎😎😎😎", + ) + log.only("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") + + if (SecretaryManager.getInstance(this.shard.blockRef) === this) { + log.only("deleting the instance") + SecretaryManager.instances.delete(this.shard.blockRef) + } else { + log.error("this instance is not doing that thing") + process.exit(1) + } + + // SecretaryManager.instance = null // TODO: Abort all waiters - + // INFO: Resolve all hanging waiters - if (Waiter.isWaiting(Waiter.keys.GREEN_LIGHT)) { - log.only( - "GREEN_LIGHT waiter found WHEN ENDING THE CONSENSUS ..., KILLING IT it", - ) - Waiter.abort(Waiter.keys.GREEN_LIGHT) - } + // if (Waiter.isWaiting(Waiter.keys.GREEN_LIGHT)) { + // log.only( + // "GREEN_LIGHT waiter found WHEN ENDING THE CONSENSUS ..., KILLING IT it", + // ) + // Waiter.abort(Waiter.keys.GREEN_LIGHT) + // } // if (Waiter.isWaiting(Waiter.keys.SET_WAIT_STATUS)) { // log.only( @@ -915,18 +975,43 @@ export default class SecretaryManager { async setOurValidatorPhase(phase: number, status: boolean) { log.debug("[setOurValidatorPhase] Setting our phase to: " + phase) // INFO: Update the current phase and the status of the phase + log.only("setting our phase to: " + phase) this.ourValidatorPhase.currentPhase = phase this.ourValidatorPhase.phases[phase][1] = status this.ourValidatorPhase.waitStatus = true } // ANCHOR Singleton logic - public static getInstance(): SecretaryManager { - if (!SecretaryManager.instance) { - SecretaryManager.instance = new SecretaryManager() + public static getInstance( + blockRef?: number, + initialize = false, + ): SecretaryManager { + log.only("getting instance for block ref: " + blockRef) + // INFO: If blockRef is not provided, use the last block number + 1 + // ie. assume we're using this instance for latest block + if (!blockRef) { + blockRef = getSharedState.lastBlockNumber + 1 + } + + log.only("block ref: " + blockRef) + log.only( + "instance keys: " + + JSON.stringify( + Array.from(SecretaryManager.instances.keys()), + null, + 2, + ), + ) + + if (!SecretaryManager.instances.get(blockRef)) { + if (initialize) { + SecretaryManager.instances.set(blockRef, new SecretaryManager()) + } else { + return null + } } - return SecretaryManager.instance + return SecretaryManager.instances.get(blockRef) } /** diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index d7432264a..4a0d03c7a 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -39,7 +39,7 @@ export default async function manageConsensusRoutines( payload: ConsensusMethod, ): Promise { log.only("πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘ RECEIVED CONSENSUS CALL πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘") - + const peer = PeerManager.getInstance().getPeer(sender) log.only("Sender: " + peer.connection.string) log.only("Payload: " + JSON.stringify(payload, null, 2)) @@ -160,6 +160,8 @@ export default async function manageConsensusRoutines( return response } + log.only("processing payload: " + JSON.stringify(payload, null, 2)) + // NOTE Each method has its own logic to be implemented switch (payload.method) { /* @@ -229,7 +231,19 @@ export default async function manageConsensusRoutines( case "setValidatorPhase": { try { const [phase, seed, blockRef] = payload.params - const manager = SecretaryManager.getInstance() + const manager = SecretaryManager.getInstance(blockRef) + + //INFO: If the manager class for that block is not found, assume peer is behind on the consensus + // return a greenlight to unblock peer + if (!manager) { + response.result = 200 + response.response = "Secretary manager not found" + response.extra = { + greenlight: true, + } + + return response + } // INFO: Seed check if ( @@ -332,20 +346,26 @@ export default async function manageConsensusRoutines( case "greenlight": { // TODO: Check if the sender is the secretary (without verifying the signature // as we have already done that) in validateHeaders - const [timestamp, validatorPhase] = payload.params as [ - number, - number, + const [blockRef, timestamp, validatorPhase] = payload.params as [ + number, // blockRef + number, // timestamp + number, // validatorPhase ] - log.info( - "payload.params: " + JSON.stringify(payload.params, null, 2), - ) - const manager = SecretaryManager.getInstance() - log.debug("Our secretary identity: " + manager.secretary.identity) - log.debug("shard: " + manager.shard.members.map(m => m.identity)) + const manager = SecretaryManager.getInstance(blockRef) + + // INFO: If the manager class for that block is not found, assume peer is behind on the consensus + // return a 200 to unblock peer + if (!manager) { + log.only("returning a fake 200") + response.result = 200 + response.response = "Secretary manager not found" + return response + } // INFO: Check if the sender is the secretary if (sender !== manager.secretary.identity) { + log.only("returning a 401") response.result = 401 response.response = "Greenlight not accepted" response.extra = "Secretary identity mismatch" diff --git a/src/utilities/waiter.ts b/src/utilities/waiter.ts index 36588e5d3..5c8550ceb 100644 --- a/src/utilities/waiter.ts +++ b/src/utilities/waiter.ts @@ -41,14 +41,20 @@ export class Waiter { */ static async wait(id: string, timeout = 10000): Promise { if (Waiter.waitList.has(id)) { - throw new Error(`[WAITER] Already waiting for id: ${id}`) + return Waiter.waitList.get(id).promise } if (Waiter.preHeld.has(id)) { - log.only(`[WAITER] Found pre-held key: ${id} with value: ${Waiter.preHeld.get(id)}`) + log.only( + `[WAITER] Found pre-held key: ${id} with value: ${Waiter.preHeld.get( + id, + )}`, + ) const resolveValue = Waiter.preHeld.get(id) Waiter.preHeld.delete(id) - log.only(`[WAITER] Resolved pre-held key: ${id} with data: ${resolveValue}`) + log.only( + `[WAITER] Resolved pre-held key: ${id} with data: ${resolveValue}`, + ) return resolveValue } @@ -72,7 +78,7 @@ export class Waiter { promise: null, }) - log.debug(`[WAITER] Created wait entry for ${id}`) + log.only(`[WAITER] πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’ Created wait entry for ${id}`) }) Waiter.waitList.get(id).promise = promise @@ -88,7 +94,7 @@ export class Waiter { static resolve(id: string, data: T = null): T { const entry = Waiter.waitList.get(id) if (!entry) { - log.warning(`[WAITER] No wait entry found for ${id}`) + log.error(`[WAITER] No wait entry found for ${id}`) return null } @@ -96,12 +102,17 @@ export class Waiter { Waiter.preHeld.delete(id) Waiter.waitList.delete(id) entry.resolve(data) - log.debug(`[WAITER] Resolved wait entry for ${id}`) + log.only(`[WAITER] Resolved wait entry for ${id}`) return data || null } static preHold(id: string, data: any = null) { + if (Waiter.waitList.has(id)) { + log.error(`[WAITER] Cannot pre-hold key: ${id} because it's already waiting`) + throw new Error(`[WAITER] Already waiting for id: ${id}`) + } + log.only(`[WAITER] Pre-holding the key: ${id} with data: ${data}`) Waiter.preHeld.set(id, data) } From 5aa3483dd81eafb27789a793be0e09fe1ff7cb1d Mon Sep 17 00:00:00 2001 From: cwilvx Date: Sun, 5 Oct 2025 12:26:25 +0300 Subject: [PATCH 15/16] pass blockRef to secretary manager in consensus routine --- src/libs/consensus/v2/PoRBFT.ts | 115 +++++++++++--------- src/libs/network/manageConsensusRoutines.ts | 19 +++- 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/src/libs/consensus/v2/PoRBFT.ts b/src/libs/consensus/v2/PoRBFT.ts index 394035662..e311cf107 100644 --- a/src/libs/consensus/v2/PoRBFT.ts +++ b/src/libs/consensus/v2/PoRBFT.ts @@ -64,10 +64,8 @@ export async function consensusRoutine(): Promise { return } log.only("πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯") - const manager = SecretaryManager.getInstance( - getSharedState.lastBlockNumber + 1, - true, - ) + const blockRef = getSharedState.lastBlockNumber + 1 + const manager = SecretaryManager.getInstance(blockRef, true) // Defining the variables needed for rolling back the GCREdits let successfulTxs: string[] = [] @@ -82,7 +80,7 @@ export async function consensusRoutine(): Promise { // INFO: We won't use the shard returned by initializeShard // as it can change through the consensus routine // INFO: CONSENSUS ACTION 1: Initialize the shard - await initializeShard() + await initializeShard(blockRef) log.debug("Forgin block: " + manager.shard.blockRef) log.info("[consensusRoutine] We are in the shard, creating the block") log.info( @@ -95,7 +93,7 @@ export async function consensusRoutine(): Promise { ) // INFO: Broadcast our validation phase to the secretary - await updateValidatorPhase(1) + await updateValidatorPhase(1, blockRef) // synchronize and average the time // NOTE: Instead of averaging the time, we'll use the secretary timestamp @@ -212,7 +210,7 @@ export async function consensusRoutine(): Promise { // INFO: CONSENSUS ACTION 7: End the consensus routine log.only("Sending validator phase 7 to the secretary 🏁") - await updateValidatorPhase(7) + await updateValidatorPhase(7, blockRef) log.only("Validator phase 7 sent to the secretary βœ…") } catch (error) { if (error instanceof NotInShardError) { @@ -259,7 +257,10 @@ export async function consensusRoutine(): Promise { log.only("πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹") log.only("[consensusRoutine] CONSENSUS ROUTINE ENDED πŸ”₯πŸ”₯πŸ”₯") - log.only("Waiters: " + JSON.stringify(Array.from(Waiter.waitList.keys()), null, 2)) + log.only( + "Waiters: " + + JSON.stringify(Array.from(Waiter.waitList.keys()), null, 2), + ) log.only("Preheld keys: " + Array.from(Waiter.preHeld.keys()).length) } @@ -300,36 +301,36 @@ async function initializeConsensusState(): Promise { * * @returns The shard members */ -async function initializeShard(): Promise { +async function initializeShard(blockRef: number): Promise { const { commonValidatorSeed, lastBlockNumber } = await getCommonValidatorSeed() // return await getShard(commonValidatorSeed) - const manager = SecretaryManager.getInstance() + const manager = SecretaryManager.getInstance(blockRef) return await manager.initializeShard(commonValidatorSeed, lastBlockNumber) } // SECTION Routines -/** - * Synchronize and average the time between the shard and the local node - * - * @param shard - The shard members - */ -async function synchronizeAndAverageTime(shard: Peer[]): Promise { - await fastSync(shard, "synchronizeAndAverageTime") - let averageTimestamp = await averageTimestamps(shard) - // Strip the decimal part - if (!Number.isInteger(averageTimestamp)) { - log.warning( - "[synchronizeAndAverageTime] Average timestamp is not an integer", - ) - averageTimestamp = Math.round(averageTimestamp) - } - getSharedState.lastConsensusTime = averageTimestamp - // Using the secretary to update the local statuses - await updateValidatorPhase(2) -} +// /** +// * Synchronize and average the time between the shard and the local node +// * +// * @param shard - The shard members +// */ +// async function synchronizeAndAverageTime(shard: Peer[]): Promise { +// await fastSync(shard, "synchronizeAndAverageTime") +// let averageTimestamp = await averageTimestamps(shard) +// // Strip the decimal part +// if (!Number.isInteger(averageTimestamp)) { +// log.warning( +// "[synchronizeAndAverageTime] Average timestamp is not an integer", +// ) +// averageTimestamp = Math.round(averageTimestamp) +// } +// getSharedState.lastConsensusTime = averageTimestamp +// // Using the secretary to update the local statuses +// await updateValidatorPhase(2) +// } /** * Merge and order the mempools between the shard and the local node @@ -339,18 +340,18 @@ async function synchronizeAndAverageTime(shard: Peer[]): Promise { */ async function mergeAndOrderMempools( shard: Peer[], - blockNumber: number, + blockRef: number, ): Promise<(Transaction & { reference_block: number })[]> { - const ourMempool = await Mempool.getMempool(blockNumber) + const ourMempool = await Mempool.getMempool(blockRef) console.log("[consensusRoutine] Our mempool:") console.log(ourMempool) log.info("[consensusRoutine] Our mempool has been retrieved") // NOTE: Transactions here should be ordered by timestamp await mergeMempools(ourMempool, shard) - await updateValidatorPhase(3) + await updateValidatorPhase(3, blockRef) - return await Mempool.getMempool(blockNumber) + return await Mempool.getMempool(blockRef) } /** @@ -443,19 +444,19 @@ async function applyGCREditsFromMergedMempool( // return [successfulTxs, failedTxs] // } -/** - * Merge the peerlist between the shard and the local node - * - * @param shard - The shard members - * @returns The merged peerlist - */ -async function mergePeerlistAndWait(shard: Peer[]): Promise { - const mergedPeerList = await mergePeerlist(shard) - //await updateValidatorStatus("mergedPeerlist", true, false, true) - // Using the secretary to update the local statuses - await updateValidatorPhase(4) - return mergedPeerList -} +// /** +// * Merge the peerlist between the shard and the local node +// * +// * @param shard - The shard members +// * @returns The merged peerlist +// */ +// async function mergePeerlistAndWait(shard: Peer[]): Promise { +// const mergedPeerList = await mergePeerlist(shard) +// //await updateValidatorStatus("mergedPeerlist", true, false, true) +// // Using the secretary to update the local statuses +// await updateValidatorPhase(4) +// return mergedPeerList +// } /** * Forge the block from the ordered transactions @@ -484,7 +485,7 @@ async function forgeBlock( // await updateValidatorStatus("forgedBlock", true, false, true) // Using the secretary to update the local statuses - await updateValidatorPhase(5) + await updateValidatorPhase(5, block.number) return block } @@ -505,7 +506,7 @@ async function voteOnBlock( const [pro, con] = await broadcastBlockHash(block, shard) // await updateValidatorStatus("votedForBlock", true, false, true) // Using the secretary to update the local statuses - await updateValidatorPhase(6) + await updateValidatorPhase(6, block.number) log.info( `[consensusRoutine] Block hash broadcasted to the shard: ${block.hash}`, @@ -570,13 +571,21 @@ function preventForgingEnded(blockRef: number) { * @param phase - Our cleared phase number * @returns The block timestamp returned by the secretary */ -async function updateValidatorPhase(phase: number): Promise { - let blockRef = getSharedState.lastBlockNumber + 1 - if (phase == 7) { - blockRef = SecretaryManager.lastBlockRef +async function updateValidatorPhase( + phase: number, + blockRef: number, +): Promise { + const manager = SecretaryManager.getInstance(blockRef) + + if (!manager) { + // INFO: This should never happen + log.only("Secretary manager not found") + log.only("Block ref: " + blockRef) + log.only("Last block number: " + (await Chain.getLastBlockNumber())) + log.only("Phase: " + phase) + process.exit(1) } - const manager = SecretaryManager.getInstance(blockRef) await manager.setOurValidatorPhase(phase, true) preventForgingEnded(blockRef) diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 4a0d03c7a..0dc441655 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -389,14 +389,31 @@ export default async function manageConsensusRoutines( // NOTE: Ideally, we should never need to use these methods case "getValidatorPhase": { const manager = SecretaryManager.getInstance() + + if (!manager) { + response.result = 400 + response.response = [null] + response.extra = { error: "Consensus not in progress" } + return response + } + response.result = 200 response.response = [manager.ourValidatorPhase.currentPhase] break } case "getBlockTimestamp": { + const manager = SecretaryManager.getInstance() + + if (!manager) { + response.result = 400 + response.response = [null] + response.extra = { error: "Consensus not in progress" } + return response + } + response.result = 200 - response.response = [SecretaryManager.getInstance().blockTimestamp] + response.response = [manager.blockTimestamp] break } } From 2776e9d9662495191630e7666f2a485a50255f22 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Fri, 10 Oct 2025 11:14:54 +0300 Subject: [PATCH 16/16] cleanup --- src/libs/blockchain/routines/Sync.ts | 8 +- src/libs/consensus/routines/consensusTime.ts | 2 +- src/libs/consensus/v2/PoRBFT.ts | 28 +---- src/libs/consensus/v2/routines/getShard.ts | 101 ++--------------- .../consensus/v2/types/secretaryManager.ts | 107 ++++++------------ src/libs/identity/tools/twitter.ts | 2 +- src/libs/network/manageConsensusRoutines.ts | 18 ++- src/utilities/mainLoop.ts | 4 +- src/utilities/waiter.ts | 12 +- 9 files changed, 71 insertions(+), 211 deletions(-) diff --git a/src/libs/blockchain/routines/Sync.ts b/src/libs/blockchain/routines/Sync.ts index 361e98086..789750806 100644 --- a/src/libs/blockchain/routines/Sync.ts +++ b/src/libs/blockchain/routines/Sync.ts @@ -250,7 +250,7 @@ async function verifyLastBlockIntegrity( } async function downloadBlock(peer: Peer, blockToAsk: number) { - log.only("Downloading block: " + blockToAsk) + log.debug("Downloading block: " + blockToAsk) const blockRequest: RPCRequest = { method: "nodeCall", params: [ @@ -265,7 +265,7 @@ async function downloadBlock(peer: Peer, blockToAsk: number) { const blockResponse = await peer.longCall(blockRequest, false, 250, 3, [ 404, ]) - log.only("Block response: " + blockResponse.result) + log.debug("Block response: " + blockResponse.result) // INFO: Handle max retries reached if (blockResponse.result === 400) { @@ -292,8 +292,8 @@ async function downloadBlock(peer: Peer, blockToAsk: number) { log.info("[downloadBlock] Block received: " + block.hash) await Chain.insertBlock(block, [], null, false) - log.only("Block inserted successfully") - log.only("Last block number: " + getSharedState.lastBlockNumber + " Last block hash: " + getSharedState.lastBlockHash) + log.debug("Block inserted successfully") + log.debug("Last block number: " + getSharedState.lastBlockNumber + " Last block hash: " + getSharedState.lastBlockHash) log.info( "[fastSync] Block inserted successfully at the head of the chain!", ) diff --git a/src/libs/consensus/routines/consensusTime.ts b/src/libs/consensus/routines/consensusTime.ts index 189f6f075..a8555c828 100644 --- a/src/libs/consensus/routines/consensusTime.ts +++ b/src/libs/consensus/routines/consensusTime.ts @@ -32,7 +32,7 @@ export async function checkConsensusTime( const consensusIntervalTime = getSharedState.getConsensusTime() log.debug("[CONSENSUS TIME] lastTimestamp: " + lastTimestamp, true) log.debug("[CONSENSUS TIME] currentTimestamp: " + currentTimestamp, true) - log.only("[CONSENSUS TIME] delta: " + delta) + log.debug("[CONSENSUS TIME] delta: " + delta) log.debug( "[CONSENSUS TIME] consensusIntervalTime: " + consensusIntervalTime, true, diff --git a/src/libs/consensus/v2/PoRBFT.ts b/src/libs/consensus/v2/PoRBFT.ts index e311cf107..d76565324 100644 --- a/src/libs/consensus/v2/PoRBFT.ts +++ b/src/libs/consensus/v2/PoRBFT.ts @@ -63,7 +63,7 @@ export async function consensusRoutine(): Promise { ) return } - log.only("πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯") + log.debug("πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯πŸ”₯") const blockRef = getSharedState.lastBlockNumber + 1 const manager = SecretaryManager.getInstance(blockRef, true) @@ -181,9 +181,7 @@ export async function consensusRoutine(): Promise { } // INFO: CONSENSUS ACTION 5: Forge the block - log.only("Forging block 🏁") const block = await forgeBlock(tempMempool, peerlist) // NOTE The GCR hash is calculated here and added to the block - log.only("Block forged βœ…") // REVIEW Set last consensus time to the current block timestamp getSharedState.lastConsensusTime = block.content.timestamp @@ -209,9 +207,7 @@ export async function consensusRoutine(): Promise { } // INFO: CONSENSUS ACTION 7: End the consensus routine - log.only("Sending validator phase 7 to the secretary 🏁") await updateValidatorPhase(7, blockRef) - log.only("Validator phase 7 sent to the secretary βœ…") } catch (error) { if (error instanceof NotInShardError) { log.info( @@ -226,6 +222,7 @@ export async function consensusRoutine(): Promise { ) return } + if (error instanceof BlockInvalidError) { log.info( "[consensusRoutine] Block is invalid. Rolling back the GCREdits", @@ -248,20 +245,9 @@ export async function consensusRoutine(): Promise { console.error(error) process.exit(1) } finally { - manager.endConsensusRoutine() - // Cleanup the consensus state cleanupConsensusState() - // Joining the temporary mempool to the main one - // await Mempool.joinTemporaryMempool() // ? Is await ok here? + manager.endConsensusRoutine() } - - log.only("πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹πŸ‘‹") - log.only("[consensusRoutine] CONSENSUS ROUTINE ENDED πŸ”₯πŸ”₯πŸ”₯") - log.only( - "Waiters: " + - JSON.stringify(Array.from(Waiter.waitList.keys()), null, 2), - ) - log.only("Preheld keys: " + Array.from(Waiter.preHeld.keys()).length) } // SECTION: Consensus functions! @@ -549,7 +535,6 @@ async function finalizeBlock(block: Block, pro: number): Promise { function preventForgingEnded(blockRef: number) { const manager = SecretaryManager.getInstance(blockRef) - log.only("preventForgingEnded blockRef: " + blockRef) if (!manager) { throw new ForgingEndedError("Secretary manager not found") @@ -578,12 +563,7 @@ async function updateValidatorPhase( const manager = SecretaryManager.getInstance(blockRef) if (!manager) { - // INFO: This should never happen - log.only("Secretary manager not found") - log.only("Block ref: " + blockRef) - log.only("Last block number: " + (await Chain.getLastBlockNumber())) - log.only("Phase: " + phase) - process.exit(1) + throw new ForgingEndedError("Secretary Manager instance for this block has been deleted") } await manager.setOurValidatorPhase(phase, true) diff --git a/src/libs/consensus/v2/routines/getShard.ts b/src/libs/consensus/v2/routines/getShard.ts index bd12709e5..d2d49118c 100644 --- a/src/libs/consensus/v2/routines/getShard.ts +++ b/src/libs/consensus/v2/routines/getShard.ts @@ -5,72 +5,10 @@ import { getSharedState } from "src/utilities/sharedState" import log from "src/utilities/logger" import Chain from "src/libs/blockchain/chain" -export default async function getShard( - seed: string, - debug = false, -): Promise { +export default async function getShard(seed: string): Promise { // ! we need to get the peers from the last 3 blocks too const allPeers = await PeerManager.getInstance().getOnlinePeers() - if (debug) { - log.only( - "peers: " + - JSON.stringify( - allPeers.map(peer => ({ - url: peer.connection.string, - status: peer.sync.status, - })), - null, - 2, - ), - ) - } const peers = allPeers.filter(peer => peer.sync.status) - if (debug) { - log.only( - "peers: " + - JSON.stringify( - peers.map(peer => ({ - url: peer.connection.string, - status: peer.sync.status, - })), - null, - 2, - ), - ) - } - - // const peerIdentites = peers.map(peer => peer.identity) - - // const lastBlock = await Chain.getLastBlock() - - // log.debug( - // "typeof lastBlock.validation_data: " + typeof lastBlock.validation_data, - // ) - // log.debug(`Last block: ${lastBlock.validation_data}`) - - // let signatures: { [key: string]: string } = {} - - // if (lastBlock.validation_data !== "genesis") { - // signatures = JSON.parse(lastBlock.validation_data)["signatures"] - // } - - // // INFO: Include the validators from the last block - // // REVIEW: Do we include all peers from the last N blocks or only the validators? - // for (const identity of Object.keys(signatures)) { - // if (peerIdentites.includes(identity)) { - // continue - // } - - // const peer = PeerManager.getInstance().getPeer(identity) - // log.debug( - // `Peer result for ${identity}: ${JSON.stringify(peer, null, 2)}`, - // ) - - // if (peer) { - // log.debug(`Peer ${identity} not in the shard, adding it`) - // peers.push(peer) - // } - // } // Select up to 10 peers from the list using the seed as a source of randomness let maxShardSize = getSharedState.shardSize @@ -87,39 +25,16 @@ export default async function getShard( // REVIEW: sort available peers by .identity (which is a hex string) // before choosing the peers for a uniform sample across nodes availablePeers.sort((a, b) => a.identity.localeCompare(b.identity)) - if (debug) { - log.only( - "availablePeers: " + - JSON.stringify( - availablePeers.map(peer => ({ - url: peer.connection.string, - status: peer.sync.status, - })), - null, - 2, - ), - ) - } // REVIEW: check if this is the right way to do it // NOTE Choosing the secretary by randomly ordering the list: the first one is the secretary for (let i = 0; i < maxShardSize && availablePeers.length > 0; i++) { - const index = Math.floor(deterministicRandomness() * availablePeers.length) + const index = Math.floor( + deterministicRandomness() * availablePeers.length, + ) shard.push(availablePeers[index]) availablePeers.splice(index, 1) } - if (debug) { - log.only( - "shard: " + - JSON.stringify( - shard.map(peer => ({ - url: peer.connection.string, - status: peer.sync.status, - })), - null, - 2, - ), - ) - } + // Setting the last shard // getSharedState.lastShard = shard.map(peer => peer.identity) if (shard.length < 3) { @@ -130,7 +45,11 @@ export default async function getShard( log.info(`Last shard: ${shard.map(peer => peer.identity)}`) log.custom( "last_shard", - JSON.stringify(shard.map(peer => peer.identity), null, 2), + JSON.stringify( + shard.map(peer => peer.identity), + null, + 2, + ), false, true, ) diff --git a/src/libs/consensus/v2/types/secretaryManager.ts b/src/libs/consensus/v2/types/secretaryManager.ts index a94ff4a80..96b3d54f1 100644 --- a/src/libs/consensus/v2/types/secretaryManager.ts +++ b/src/libs/consensus/v2/types/secretaryManager.ts @@ -54,23 +54,12 @@ export default class SecretaryManager { secretaryKey: "", blockRef: lastBlockNumber + 1, } - log.only("🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒🟒") - log.only("Initializing shard") - log.only("CVSA: " + cVSA) - log.only("Last block number: " + lastBlockNumber) // Reusing the method to create the members this.shard.members = await getShard(cVSA) // this.ourKey = getSharedState.identity.ed25519.publicKey.toString("hex") this.ourKey = getSharedState.publicKeyHex - log.only( - "Shard members: " + - JSON.stringify( - this.shard.members.map(m => m.connection.string), - ), - ) - if ( !this.shard.members.map(peer => peer.identity).includes(this.ourKey) ) { @@ -80,14 +69,14 @@ export default class SecretaryManager { // Assigning the secretary and its key this.shard.secretaryKey = this.secretary.identity - log.only("INITIALIZED SHARD:") - log.only( + log.debug("INITIALIZED SHARD:") + log.debug( "SHARD: " + JSON.stringify( this.shard.members.map(m => m.connection.string), ), ) - log.only("SECRETARY: " + this.secretary.identity) + log.debug("SECRETARY: " + this.secretary.identity) // INFO: If some nodes crash, kill the node for debugging! // if (this.shard.members.length < 3 && this.shard.blockRef > 24000) { @@ -99,7 +88,7 @@ export default class SecretaryManager { // INFO: Start the secretary routine if (this.checkIfWeAreSecretary()) { - log.only( + log.debug( "⬜️ We are the secretary ⬜️. starting the secretary routine", ) this.secretaryRoutine().finally(async () => { @@ -640,13 +629,13 @@ export default class SecretaryManager { return false } - log.only("Received green light for phase: " + validatorPhase) - log.only("---- DIAGNOSTICS ----") - log.only("Our phase: " + this.ourValidatorPhase.currentPhase) - log.only("Our blockRef: " + this.shard.blockRef) - log.only("Secretary timestamp: " + secretaryBlockTimestamp) - log.only("Secretary: " + this.secretary.identity) - log.only("---- END DIAGNOSTICS ----") + log.debug("Received green light for phase: " + validatorPhase) + log.debug("---- DIAGNOSTICS ----") + log.debug("Our phase: " + this.ourValidatorPhase.currentPhase) + log.debug("Our blockRef: " + this.shard.blockRef) + log.debug("Secretary timestamp: " + secretaryBlockTimestamp) + log.debug("Secretary: " + this.secretary.identity) + log.debug("---- END DIAGNOSTICS ----") if (secretaryBlockTimestamp < this.blockTimestamp) { log.debug( @@ -666,7 +655,7 @@ export default class SecretaryManager { const waiterKey = Waiter.keys.GREEN_LIGHT + this.shard.blockRef + validatorPhase - log.only("Waiter key: " + waiterKey) + log.debug("Waiter key: " + waiterKey) if (Waiter.isWaiting(waiterKey)) { Waiter.resolve(waiterKey, secretaryBlockTimestamp) @@ -675,9 +664,9 @@ export default class SecretaryManager { } if (this.ourValidatorPhase.currentPhase <= validatorPhase) { - log.only(`[SECRETARY ROUTINE] Pre-holding the key: ${waiterKey}`) - log.only("Is Waiting for key: " + Waiter.isWaiting(waiterKey)) - log.only( + log.debug(`[SECRETARY ROUTINE] Pre-holding the key: ${waiterKey}`) + log.debug("Is Waiting for key: " + Waiter.isWaiting(waiterKey)) + log.debug( "Waitlist keys: " + JSON.stringify(Array.from(Waiter.waitList.keys()), null, 2), ) @@ -690,26 +679,12 @@ export default class SecretaryManager { return true } - log.only("We don't know what to do with this green light") - log.only("Validator phase: " + validatorPhase) - log.only("Our phase: " + this.ourValidatorPhase.currentPhase) - log.only("Secretary block timestamp: " + secretaryBlockTimestamp) - log.only("Block timestamp: " + this.blockTimestamp) + log.debug("We don't know what to do with this green light") + log.debug("Validator phase: " + validatorPhase) + log.debug("Our phase: " + this.ourValidatorPhase.currentPhase) + log.debug("Secretary block timestamp: " + secretaryBlockTimestamp) + log.debug("Block timestamp: " + this.blockTimestamp) process.exit(1) - - return false - - // log.debug("Our phase: " + this.ourValidatorPhase.currentPhase) - // if (validatorPhase > this.ourValidatorPhase.currentPhase) { - // // INFO: This node has already timed out - // log.debug("We are not in the same phase, stopping the node ...") - // process.exit(1) - // } - - // INFO: Resolve the waiter with the timestamp - Waiter.resolve(waiterKey, secretaryBlockTimestamp) - this.ourValidatorPhase.waitStatus = false - return true } /** @@ -746,9 +721,9 @@ export default class SecretaryManager { // // await this.simulateSecretaryGoingOffline() // } - log.only("Sending our validator phase to the secretary") - log.only("Our phase: " + this.ourValidatorPhase.currentPhase) - log.only("Shard block ref: " + this.shard.blockRef) + log.debug("Sending our validator phase to the secretary") + log.debug("Our phase: " + this.ourValidatorPhase.currentPhase) + log.debug("Shard block ref: " + this.shard.blockRef) const waiterKey = Waiter.keys.GREEN_LIGHT + @@ -759,8 +734,8 @@ export default class SecretaryManager { this._greenlight_timeout, ) - log.only("Greenlight waiter created") - log.only("Waiter key: " + waiterKey) + log.debug("Greenlight waiter created") + log.debug("Waiter key: " + waiterKey) const sendStatus = async () => { const request: RPCRequest = { @@ -826,8 +801,8 @@ export default class SecretaryManager { } if (res.result == 401) { - log.only("received a 401") - log.only(JSON.stringify(res, null, 2)) + log.debug("received a 401") + log.debug(JSON.stringify(res, null, 2)) process.exit(1) } @@ -903,7 +878,7 @@ export default class SecretaryManager { } public async endConsensusRoutine() { - log.only("Ending the consensus routine") + log.debug("Ending the consensus routine") const manager = SecretaryManager.instances.get(this.shard.blockRef) if (manager) { @@ -915,10 +890,10 @@ export default class SecretaryManager { const waiterKeys = Array.from(Waiter.waitList.keys()).filter(filter) const waiters = waiterKeys.map(key => Waiter.wait(key)) - log.only( + log.debug( "πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’ WAITING FOR HANGING GREENLIGHTS πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’πŸ’", ) - log.only("Waiter keys: " + JSON.stringify(waiterKeys, null, 2)) + log.debug("Waiter keys: " + JSON.stringify(waiterKeys, null, 2)) try { await Promise.all(waiters) } catch (error) { @@ -933,13 +908,13 @@ export default class SecretaryManager { .filter(filter) .forEach(key => Waiter.preHeld.delete(key)) - log.only( + log.debug( "😎😎😎😎😎😎😎😎😎😎 HANGING GREENLIGHTS RESOLVED 😎😎😎😎😎😎😎😎😎😎", ) - log.only("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") + log.debug("[SECRETARY ROUTINE] Secretary routine finished πŸŽ‰") if (SecretaryManager.getInstance(this.shard.blockRef) === this) { - log.only("deleting the instance") + log.debug("deleting the instance") SecretaryManager.instances.delete(this.shard.blockRef) } else { log.error("this instance is not doing that thing") @@ -951,14 +926,14 @@ export default class SecretaryManager { // INFO: Resolve all hanging waiters // if (Waiter.isWaiting(Waiter.keys.GREEN_LIGHT)) { - // log.only( + // log.debug( // "GREEN_LIGHT waiter found WHEN ENDING THE CONSENSUS ..., KILLING IT it", // ) // Waiter.abort(Waiter.keys.GREEN_LIGHT) // } // if (Waiter.isWaiting(Waiter.keys.SET_WAIT_STATUS)) { - // log.only( + // log.debug( // "SET_WAIT_STATUS waiter found WHEN ENDING THE CONSENSUS ..., KILLING IT it", // ) // Waiter.abort(Waiter.keys.SET_WAIT_STATUS) @@ -975,7 +950,6 @@ export default class SecretaryManager { async setOurValidatorPhase(phase: number, status: boolean) { log.debug("[setOurValidatorPhase] Setting our phase to: " + phase) // INFO: Update the current phase and the status of the phase - log.only("setting our phase to: " + phase) this.ourValidatorPhase.currentPhase = phase this.ourValidatorPhase.phases[phase][1] = status this.ourValidatorPhase.waitStatus = true @@ -986,23 +960,12 @@ export default class SecretaryManager { blockRef?: number, initialize = false, ): SecretaryManager { - log.only("getting instance for block ref: " + blockRef) // INFO: If blockRef is not provided, use the last block number + 1 // ie. assume we're using this instance for latest block if (!blockRef) { blockRef = getSharedState.lastBlockNumber + 1 } - log.only("block ref: " + blockRef) - log.only( - "instance keys: " + - JSON.stringify( - Array.from(SecretaryManager.instances.keys()), - null, - 2, - ), - ) - if (!SecretaryManager.instances.get(blockRef)) { if (initialize) { SecretaryManager.instances.set(blockRef, new SecretaryManager()) diff --git a/src/libs/identity/tools/twitter.ts b/src/libs/identity/tools/twitter.ts index c2ab4ea2f..4c80ec662 100644 --- a/src/libs/identity/tools/twitter.ts +++ b/src/libs/identity/tools/twitter.ts @@ -473,7 +473,7 @@ export class Twitter { async makeRequest(url: string, delay = 0): Promise> { if (delay > 0) { await new Promise(resolve => setTimeout(resolve, delay)) - log.only(`β˜ΊοΈπŸ˜”πŸ‘€ Delayed request to ${url} for ${delay}ms`) + log.debug(`β˜ΊοΈπŸ˜”πŸ‘€ Delayed request to ${url} for ${delay}ms`) } return await axios.get(url, { diff --git a/src/libs/network/manageConsensusRoutines.ts b/src/libs/network/manageConsensusRoutines.ts index 0dc441655..6a94a5ad0 100644 --- a/src/libs/network/manageConsensusRoutines.ts +++ b/src/libs/network/manageConsensusRoutines.ts @@ -38,12 +38,12 @@ export default async function manageConsensusRoutines( sender: string, payload: ConsensusMethod, ): Promise { - log.only("πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘ RECEIVED CONSENSUS CALL πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘") + log.debug("πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘ RECEIVED CONSENSUS CALL πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘πŸ‘") const peer = PeerManager.getInstance().getPeer(sender) - log.only("Sender: " + peer.connection.string) - log.only("Payload: " + JSON.stringify(payload, null, 2)) - log.only("-----------------------------") + log.debug("Sender: " + peer.connection.string) + log.debug("Payload: " + JSON.stringify(payload, null, 2)) + log.debug("-----------------------------") let response = _.cloneDeep(emptyResponse) /* REVIEW @@ -60,7 +60,7 @@ export default async function manageConsensusRoutines( const isConsensusRunning = isConsensusAlreadyRunning() const inConsensus = isConsensusTime || isConsensusRunning - log.only("inConsensus: " + inConsensus) + log.debug("inConsensus: " + inConsensus) if (!inConsensus) { response.result = 400 @@ -94,7 +94,7 @@ export default async function manageConsensusRoutines( } } - log.only("isInShard: " + isInShard) + log.debug("isInShard: " + isInShard) inShardCheck: if (!isInShard) { // INFO: If is a greenlight request, return 200 @@ -160,8 +160,6 @@ export default async function manageConsensusRoutines( return response } - log.only("processing payload: " + JSON.stringify(payload, null, 2)) - // NOTE Each method has its own logic to be implemented switch (payload.method) { /* @@ -357,7 +355,7 @@ export default async function manageConsensusRoutines( // INFO: If the manager class for that block is not found, assume peer is behind on the consensus // return a 200 to unblock peer if (!manager) { - log.only("returning a fake 200") + log.debug("returning a fake 200") response.result = 200 response.response = "Secretary manager not found" return response @@ -365,7 +363,7 @@ export default async function manageConsensusRoutines( // INFO: Check if the sender is the secretary if (sender !== manager.secretary.identity) { - log.only("returning a 401") + log.debug("returning a 401") response.result = 401 response.response = "Greenlight not accepted" response.extra = "Secretary identity mismatch" diff --git a/src/utilities/mainLoop.ts b/src/utilities/mainLoop.ts index 020f97eae..6e87daae7 100644 --- a/src/utilities/mainLoop.ts +++ b/src/utilities/mainLoop.ts @@ -79,7 +79,7 @@ async function mainLoopCycle() { log.info("[MAINLOOP]: about to check if its time for consensus") if (!isConsensusTimeReached) { - log.only("[MAINLOOP]: is not consensus time") + log.debug("[MAINLOOP]: is not consensus time") //await sendNodeOnlineTx() } @@ -105,7 +105,7 @@ async function mainLoopCycle() { ) { // Set the startingConsensus flag to true to avoid conflicts with starting loops getSharedState.startingConsensus = true - log.only("[MAIN LOOP] Consensus time reached and sync status is true") + log.debug("[MAIN LOOP] Consensus time reached and sync status is true") // Wait for the peer routine to finish if it is still running let timer = 0 while (getSharedState.peerRoutineRunning > 0) { diff --git a/src/utilities/waiter.ts b/src/utilities/waiter.ts index 5c8550ceb..b3f495296 100644 --- a/src/utilities/waiter.ts +++ b/src/utilities/waiter.ts @@ -45,14 +45,14 @@ export class Waiter { } if (Waiter.preHeld.has(id)) { - log.only( + log.debug( `[WAITER] Found pre-held key: ${id} with value: ${Waiter.preHeld.get( id, )}`, ) const resolveValue = Waiter.preHeld.get(id) Waiter.preHeld.delete(id) - log.only( + log.debug( `[WAITER] Resolved pre-held key: ${id} with data: ${resolveValue}`, ) return resolveValue @@ -78,7 +78,7 @@ export class Waiter { promise: null, }) - log.only(`[WAITER] πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’ Created wait entry for ${id}`) + log.debug(`[WAITER] πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’πŸ˜’ Created wait entry for ${id}`) }) Waiter.waitList.get(id).promise = promise @@ -102,7 +102,7 @@ export class Waiter { Waiter.preHeld.delete(id) Waiter.waitList.delete(id) entry.resolve(data) - log.only(`[WAITER] Resolved wait entry for ${id}`) + log.debug(`[WAITER] Resolved wait entry for ${id}`) return data || null } @@ -113,7 +113,7 @@ export class Waiter { throw new Error(`[WAITER] Already waiting for id: ${id}`) } - log.only(`[WAITER] Pre-holding the key: ${id} with data: ${data}`) + log.debug(`[WAITER] Pre-holding the key: ${id} with data: ${data}`) Waiter.preHeld.set(id, data) } @@ -123,7 +123,7 @@ export class Waiter { * @param id - The id of the event to abort */ static abort(id: string) { - log.only(`[WAITER] Aborting the key: ${id}`) + log.debug(`[WAITER] Aborting the key: ${id}`) const entry = Waiter.waitList.get(id) if (!entry) { log.warning(`[WAITER] No wait entry found for ${id}`)