-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(net): fix RejectedExecutionException during shutdown trxHandlePool #6692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import lombok.Getter; | ||
|
|
@@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler { | |
|
|
||
| private BlockingQueue<Runnable> queue = new LinkedBlockingQueue(); | ||
|
|
||
| private volatile boolean isClosed = false; | ||
| private int threadNum = Args.getInstance().getValidateSignThreadNum(); | ||
| private final String trxEsName = "trx-msg-handler"; | ||
| private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( | ||
|
|
@@ -58,8 +60,14 @@ public void init() { | |
| } | ||
|
|
||
| public void close() { | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); | ||
| isClosed = true; | ||
| // Stop the scheduler first so no new tasks are drained from smartContractQueue. | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); | ||
| // Then shutdown the worker pool to finish already-submitted tasks. | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); | ||
| // Discard any remaining items and release references. | ||
| smartContractQueue.clear(); | ||
| queue.clear(); | ||
| } | ||
|
|
||
| public boolean isBusy() { | ||
|
|
@@ -68,6 +76,10 @@ public boolean isBusy() { | |
|
|
||
| @Override | ||
| public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { | ||
| if (isClosed) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [SHOULD]Based on your problem description, the exception should occur in the handleSmartContract() function. You don't need to modify the processMessage function much; this check can be removed.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point — the PR description's "Why" section only covers the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a node shuts down, it closes the peer first, so peer packets are no longer processed. Other packets don't have this check logic. Even if this logic is triggered, no exception will be thrown, even if there's no specific check for it.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your feedback! I think there is still a race here; shutdown is not hard-synchronized with the message-handling path. |
||
| logger.warn("TransactionsMsgHandler is closed, drop message"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [QUESTION] Both
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point — I wasn't confident about the right level here. I checked the rest of the codebase and java-tron has two conventions that both have a claim on these lines:
L80/L94 sit on the boundary. The existing WARN followed the local "Drop" convention in this file. But your actionability argument is fair — during shutdown ops can't act on these, whereas the other Drop WARNs are actual runtime anomalies worth investigating. Happy to flip both to INFO to match Convention B (closing-window expected event) if you agree, or keep WARN to stay consistent with the other Drop lines in this file. DEBUG feels too quiet since a one-time record of "messages were dropped during shutdown" is still useful for post-mortem. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed — I considered that change L80, L94, and L109 to |
||
| return; | ||
| } | ||
| TransactionsMessage transactionsMessage = (TransactionsMessage) msg; | ||
| check(peer, transactionsMessage); | ||
| for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { | ||
|
|
@@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep | |
| int trxHandlePoolQueueSize = 0; | ||
| int dropSmartContractCount = 0; | ||
| for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { | ||
| if (isClosed) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [SHOULD]Duplicate checks can be deleted. |
||
| logger.warn("TransactionsMsgHandler is closed during processing, stop submit"); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT] Some transactions are processed, but some are dropped. Do we need to log the unprocessed tx num?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. I think it's fine to leave as-is — the two The existing |
||
| break; | ||
| } | ||
| int type = trx.getRawData().getContract(0).getType().getNumber(); | ||
| if (type == ContractType.TriggerSmartContract_VALUE | ||
| || type == ContractType.CreateSmartContract_VALUE) { | ||
|
|
@@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep | |
| dropSmartContractCount++; | ||
| } | ||
| } else { | ||
| ExecutorServiceManager.submit( | ||
| trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); | ||
| try { | ||
| ExecutorServiceManager.submit( | ||
| trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); | ||
| } catch (RejectedExecutionException e) { | ||
| logger.warn("Submit task to {} failed", trxEsName); | ||
| break; | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [SHOULD] This exception of
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the review. The two
Adding an REE catch here would document a case that cannot happen given the shutdown ordering, and invite future readers to ask "when can this fire?". So, I'd like to keep |
||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] These two lines
clear()may be unnecessary. AftershutdownAndAwaitTermination(trxHandlePool)returns, all tasks have completed, but the queue is the backing queue of the thread pool.These two comments may be also unnecessary, it's so simple & clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review — one nuance though:
smartContractQueueis not a backing queue. It's an independentLinkedBlockingQueue<TrxEvent>—processMessageoffers into it, andsmartContractExecutortakes from it only whenqueue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE=100. So under backpressure (whentrxHandlePool's queue is saturated), the scheduler stops drainingsmartContractQueueand it can hold up toMAX_TRX_SIZE=50_000pendingTrxEvents at shutdown. Explicit cleanup of state that may contain data matches the convention elsewhere in java-tron, so I'd like to keepsmartContractQueue.clear().queueis the backing queue as you said —queue.clear()is technically redundant aftershutdownAndAwaitTermination. I added it purely for symmetry withsmartContractQueue.clear(). If you think the symmetry isn't worth the redundancy, happy to dropqueue.clear()alone.The latter two (
// Then shutdown the worker pool ...and// Discard any remaining items ...) will remove.