Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions packages/dapi/lib/externalApis/tenderdash/BlockchainListener.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ class BlockchainListener extends EventEmitter {
*/
constructor(tenderdashWsClient) {
super();

this.wsClient = tenderdashWsClient;

this.processLogger = logger.child({
process: 'BlockchainListener',
});
}

/**
Expand All @@ -30,14 +35,7 @@ class BlockchainListener extends EventEmitter {
* Subscribe to blocks and transaction results
*/
start() {
const processLogger = logger.child({
process: 'BlockchainListener',
});

processLogger.info('Subscribed to state transition results');

// Emit transaction results
this.wsClient.subscribe(TX_QUERY);
this.wsClient.on(TX_QUERY, (message) => {
const [hashString] = (message.events || []).map((event) => {
const hashAttribute = event.attributes.find((attribute) => attribute.key === 'hash');
Expand All @@ -53,15 +51,31 @@ class BlockchainListener extends EventEmitter {
return;
}

processLogger.trace(`received transaction result for ${hashString}`);
this.processLogger.trace(`Received transaction result for ${hashString}`);

this.emit(BlockchainListener.getTransactionEventName(hashString), message);
});

// TODO: It's not using
// Emit blocks and contained transactions
// this.wsClient.subscribe(NEW_BLOCK_QUERY);
// this.wsClient.on(NEW_BLOCK_QUERY, (message) => this.emit(EVENTS.NEW_BLOCK, message));
this.wsClient.on(NEW_BLOCK_QUERY, (message) => {
this.processLogger.trace('Received new platform block');

this.emit(EVENTS.NEW_BLOCK, message);
});

this.wsClient.on('connect', () => {
this.#subscribe();
});

if (this.wsClient.isConnected) {
this.#subscribe();
}
}

#subscribe() {
this.wsClient.subscribe(TX_QUERY);
this.wsClient.subscribe(NEW_BLOCK_QUERY);
this.processLogger.debug('Subscribed to platform blockchain events');
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
} = require('@dashevo/dapi-grpc');

const BlockchainListener = require('../../../externalApis/tenderdash/BlockchainListener');
const logger = require('../../../logger');

/**
* @param {BlockchainListener} blockchainListener
Expand All @@ -17,12 +18,23 @@ const BlockchainListener = require('../../../externalApis/tenderdash/BlockchainL
* @return {getStatusHandler}
*/
function getStatusHandlerFactory(blockchainListener, driveClient, tenderdashRpcClient) {
// Clean cache when new platform block committed
let cachedResponse = null;
let cleanCacheTimeout = null;

blockchainListener.on(BlockchainListener.EVENTS.NEW_BLOCK, () => {
function cleanCache() {
cachedResponse = null;
});

// cancel scheduled cache cleanup
if (cleanCacheTimeout !== null) {
clearTimeout(cleanCacheTimeout);
cleanCacheTimeout = null;
}

logger.trace({ endpoint: 'getStatus' }, 'cleanup cache');
}

// Clean cache when new platform block committed
blockchainListener.on(BlockchainListener.EVENTS.NEW_BLOCK, cleanCache);

// DAPI Software version
const packageJsonPath = path.resolve(__dirname, '..', '..', '..', '..', 'package.json');
Expand Down Expand Up @@ -210,6 +222,15 @@ function getStatusHandlerFactory(blockchainListener, driveClient, tenderdashRpcC
cachedResponse = new GetStatusResponse();
cachedResponse.setV0(v0);

// Cancel any existing scheduled cache cleanup
if (cleanCacheTimeout !== null) {
clearTimeout(cleanCacheTimeout);
cleanCacheTimeout = null;
}

// Clean cache in 3 minutes
cleanCacheTimeout = setTimeout(cleanCache, 3 * 60 * 1000);

Comment on lines +231 to +233
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential for multiple timeouts—ensure a single scheduled cache cleanup

Each time getStatusHandler generates a new cachedResponse, a new cleanCacheTimeout is set without clearing any existing timeout. This could lead to multiple timeouts running concurrently, causing redundant executions of cleanCache.

To prevent multiple scheduled timeouts, clear any existing timeout before setting a new one:

+        // Cancel any existing scheduled cache cleanup
+        if (cleanCacheTimeout !== null) {
+          clearTimeout(cleanCacheTimeout);
+          cleanCacheTimeout = null;
+        }
         // Clean cache in 3 minutes
         cleanCacheTimeout = setTimeout(cleanCache, 3 * 60 * 1000);
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Clean cache in 3 minutes
cleanCacheTimeout = setTimeout(cleanCache, 3 * 60 * 1000);
// Cancel any existing scheduled cache cleanup
if (cleanCacheTimeout !== null) {
clearTimeout(cleanCacheTimeout);
cleanCacheTimeout = null;
}
// Clean cache in 3 minutes
cleanCacheTimeout = setTimeout(cleanCache, 3 * 60 * 1000);

🛠️ Refactor suggestion

Consider using 'setInterval' for consistent cache cleanup

Using setInterval can simplify the cache cleanup mechanism by consistently invalidating the cache every three minutes, regardless of cache usage patterns.

Implement setInterval to schedule regular cache cleanups:

-        // Clean cache in 3 minutes
-        cleanCacheTimeout = setTimeout(cleanCache, 3 * 60 * 1000);
+        // Schedule cache cleanup every 3 minutes if not already scheduled
+        if (cleanCacheTimeout === null) {
+          cleanCacheTimeout = setInterval(cleanCache, 3 * 60 * 1000);
+        }

Modify the cleanCache function to retain the interval:

     function cleanCache() {
         cachedResponse = null;

-        // cancel scheduled cache cleanup
-        if (cleanCacheTimeout !== null) {
-          clearTimeout(cleanCacheTimeout);
-          cleanCacheTimeout = null;
-        }

         logger.trace({ endpoint: 'getStatus' }, 'cleanup cache');
     }

This approach ensures the cache is invalidated regularly without the need to manage timeouts manually.

Committable suggestion was skipped due to low confidence.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rabbit misunderstood logic. We cleanup cache by block or in 3 mintues after it was generated by a user request. Interval doesn't work for our case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shumkov, thank you for the clarification. I apologize for the misunderstanding. I now understand that the cache is cleaned up by block or in 3 minutes after it was generated by a user request, so using an interval doesn't fit your requirements.

(・・?)


✏️ Learnings added
Learnt from: shumkov
PR: dashpay/platform#2155
File: packages/dapi/lib/grpcServer/handlers/platform/getStatusHandlerFactory.js:231-233
Timestamp: 2024-09-26T14:33:28.944Z
Learning: In this codebase, the cache is cleaned up by block or in 3 minutes after it was generated by a user request. Using 'setInterval' does not work for this case.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

return cachedResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ describe('BlockchainListener', () => {
({ sinon } = this);
wsClientMock = new EventEmitter();
wsClientMock.subscribe = sinon.stub();

blockchainListener = new BlockchainListener(wsClientMock);
blockchainListener.start();

sinon.spy(blockchainListener, 'on');
sinon.spy(blockchainListener, 'off');
Expand Down Expand Up @@ -84,19 +84,23 @@ describe('BlockchainListener', () => {
});

describe('#start', () => {
it('should subscribe to transaction events from WS client', () => {
// TODO: We don't use it for now
// expect(wsClientMock.subscribe).to.be.calledTwice();
expect(wsClientMock.subscribe).to.be.calledOnce();
it('should subscribe to transaction events from WS client if it is connected', () => {
wsClientMock.isConnected = true;

blockchainListener.start();

expect(wsClientMock.subscribe).to.be.calledTwice();
expect(wsClientMock.subscribe.firstCall).to.be.calledWithExactly(
BlockchainListener.TX_QUERY,
);
// expect(wsClientMock.subscribe.secondCall).to.be.calledWithExactly(
// BlockchainListener.NEW_BLOCK_QUERY,
// );
expect(wsClientMock.subscribe.secondCall).to.be.calledWithExactly(
BlockchainListener.NEW_BLOCK_QUERY,
);
});

it.skip('should emit block when new block is arrived', (done) => {
it('should emit block when new block is arrived', (done) => {
blockchainListener.start();

blockchainListener.on(BlockchainListener.EVENTS.NEW_BLOCK, (message) => {
expect(message).to.be.deep.equal(blockMessageMock);

Expand All @@ -107,6 +111,8 @@ describe('BlockchainListener', () => {
});

it('should emit transaction when transaction is arrived', (done) => {
blockchainListener.start();

const topic = BlockchainListener.getTransactionEventName(transactionHash);

blockchainListener.on(topic, (message) => {
Expand Down