-
Notifications
You must be signed in to change notification settings - Fork 1
fix(jsonrpc): enforce log filter cap and improve match efficiency #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
a7dfbae
4bd4453
0a28a38
aeac172
406fe7e
203588f
252ec8a
5ab9eb1
6323ea8
9897ea8
27cdfa4
61704aa
26c89bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.ForkJoinPool; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.regex.Pattern; | ||
| import lombok.Getter; | ||
|
|
@@ -115,6 +116,7 @@ public enum RequestSource { | |
| private static final String FILTER_NOT_FOUND = "filter not found"; | ||
| public static final int EXPIRE_SECONDS = 5 * 60; | ||
| private static final int maxBlockFilterNum = Args.getInstance().getJsonRpcMaxBlockFilterNum(); | ||
| private static final int maxLogFilterNum = Args.getInstance().getJsonRpcMaxLogFilterNum(); | ||
| private static final Cache<LogFilterElement, LogFilterElement> logElementCache = | ||
| CacheBuilder.newBuilder() | ||
| .maximumSize(300_000L) // 300s * tps(1000) * 1 log/tx ≈ 300_000 | ||
|
|
@@ -169,6 +171,8 @@ public enum RequestSource { | |
| private static final String NO_BLOCK_HEADER_BY_HASH = "header for hash not found"; | ||
|
|
||
| private static final String ERROR_SELECTOR = "08c379a0"; // Function selector for Error(string) | ||
| private static final int FILTER_PARALLEL_THRESHOLD = 10000; | ||
| private static final ForkJoinPool LOGS_FILTER_POOL = new ForkJoinPool(2); | ||
| /** | ||
| * thread pool of query section bloom store | ||
| */ | ||
|
|
@@ -222,53 +226,68 @@ public static void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) { | |
| * append LogsFilterCapsule's LogFilterElement list to each filter if matched | ||
| */ | ||
| public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) { | ||
| Iterator<Entry<String, LogFilterAndResult>> it; | ||
| long t1 = System.currentTimeMillis(); | ||
| Map<String, LogFilterAndResult> eventFilterMap; | ||
|
|
||
| if (logsFilterCapsule.isSolidified()) { | ||
| it = getEventFilter2ResultSolidity().entrySet().iterator(); | ||
| eventFilterMap = getEventFilter2ResultSolidity(); | ||
| } else { | ||
| it = getEventFilter2ResultFull().entrySet().iterator(); | ||
| eventFilterMap = getEventFilter2ResultFull(); | ||
| } | ||
|
|
||
| while (it.hasNext()) { | ||
| Entry<String, LogFilterAndResult> entry = it.next(); | ||
| if (entry.getValue().isExpire()) { | ||
| it.remove(); | ||
| continue; | ||
| } | ||
| if (eventFilterMap.size() <= FILTER_PARALLEL_THRESHOLD) { | ||
| eventFilterMap.entrySet().forEach( | ||
| entry -> processLogFilterEntry(entry, eventFilterMap, logsFilterCapsule)); | ||
| } else { | ||
| LOGS_FILTER_POOL.submit(() -> eventFilterMap.entrySet().parallelStream() | ||
| .forEach(entry -> processLogFilterEntry(entry, eventFilterMap, logsFilterCapsule)) | ||
| ).join(); | ||
| } | ||
| long t2 = System.currentTimeMillis(); | ||
| logger.debug("handleLogsFilter {} cost {}, filter size {}", | ||
|
317787106 marked this conversation as resolved.
|
||
| logsFilterCapsule.isSolidified() ? "Solidity" : "Full", t2 - t1, eventFilterMap.size()); | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| private static void processLogFilterEntry( | ||
| Map.Entry<String, LogFilterAndResult> entry, | ||
| Map<String, LogFilterAndResult> eventFilterMap, | ||
| LogsFilterCapsule logsFilterCapsule) { | ||
| LogFilterAndResult logFilterAndResult = entry.getValue(); | ||
| if (logFilterAndResult.isExpire()) { | ||
| eventFilterMap.remove(entry.getKey()); | ||
| return; | ||
| } | ||
|
|
||
| LogFilterAndResult logFilterAndResult = entry.getValue(); | ||
| long fromBlock = logFilterAndResult.getLogFilterWrapper().getFromBlock(); | ||
| long toBlock = logFilterAndResult.getLogFilterWrapper().getToBlock(); | ||
| if (!(fromBlock <= logsFilterCapsule.getBlockNumber() | ||
| && logsFilterCapsule.getBlockNumber() <= toBlock)) { | ||
| continue; | ||
| } | ||
| long blockNumber = logsFilterCapsule.getBlockNumber(); | ||
| long fromBlock = logFilterAndResult.getLogFilterWrapper().getFromBlock(); | ||
| long toBlock = logFilterAndResult.getLogFilterWrapper().getToBlock(); | ||
| if (!(fromBlock <= blockNumber && blockNumber <= toBlock)) { | ||
| return; | ||
| } | ||
|
|
||
| if (logsFilterCapsule.getBloom() != null | ||
| && !logFilterAndResult.getLogFilterWrapper().getLogFilter() | ||
| .matchBloom(logsFilterCapsule.getBloom())) { | ||
| continue; | ||
| } | ||
| if (logsFilterCapsule.getBloom() != null && !logFilterAndResult.getLogFilterWrapper() | ||
| .getLogFilter().matchBloom(logsFilterCapsule.getBloom())) { | ||
| return; | ||
| } | ||
|
|
||
| LogFilter logFilter = logFilterAndResult.getLogFilterWrapper().getLogFilter(); | ||
| List<LogFilterElement> elements = | ||
| LogMatch.matchBlock(logFilter, logsFilterCapsule.getBlockNumber(), | ||
| logsFilterCapsule.getBlockHash(), logsFilterCapsule.getTxInfoList(), | ||
| logsFilterCapsule.isRemoved()); | ||
| LogFilter logFilter = logFilterAndResult.getLogFilterWrapper().getLogFilter(); | ||
| List<LogFilterElement> elements = | ||
| LogMatch.matchBlock(logFilter, blockNumber, logsFilterCapsule.getBlockHash(), | ||
| logsFilterCapsule.getTxInfoList(), logsFilterCapsule.isRemoved()); | ||
|
|
||
| for (LogFilterElement element : elements) { | ||
| LogFilterElement cachedElement; | ||
| try { | ||
| // compare with hashcode() first, then with equals(). If not exist, put it. | ||
| cachedElement = logElementCache.get(element, () -> element); | ||
| } catch (ExecutionException e) { | ||
| logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen | ||
| cachedElement = element; | ||
| } | ||
| logFilterAndResult.getResult().add(cachedElement); | ||
| List<LogFilterElement> localResults = new ArrayList<>(elements.size()); | ||
| for (LogFilterElement element : elements) { | ||
| LogFilterElement cachedElement; | ||
| try { | ||
| // compare with hashcode() first, then with equals(). If not exist, put it. | ||
| cachedElement = logElementCache.get(element, () -> element); | ||
| } catch (ExecutionException e) { | ||
| logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen | ||
| cachedElement = element; | ||
| } | ||
| localResults.add(cachedElement); | ||
| } | ||
| logFilterAndResult.getResult().addAll(localResults); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -1406,7 +1425,7 @@ public CompilationResult ethSubmitHashrate(String hashrate, String id) | |
|
|
||
| @Override | ||
| public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, | ||
| JsonRpcMethodNotFoundException { | ||
| JsonRpcMethodNotFoundException, JsonRpcExceedLimitException { | ||
| disableInPBFT("eth_newFilter"); | ||
|
|
||
| // not supports finalized as block parameter | ||
|
|
@@ -1421,7 +1440,10 @@ public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException, | |
| } else { | ||
| eventFilter2Result = eventFilter2ResultSolidity; | ||
| } | ||
|
|
||
| if (eventFilter2Result.size() >= maxLogFilterNum) { | ||
| throw new JsonRpcExceedLimitException( | ||
| "exceed max log filters: " + maxLogFilterNum + ", try again later"); | ||
| } | ||
|
Comment on lines
+1443
to
+1446
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TOCTOU race: size check is not atomic with insert.
Consider using 🛡️ Example hardening- if (eventFilter2Result.size() >= maxLogFilterNum) {
- throw new JsonRpcExceedLimitException(
- "exceed max log filters: " + maxLogFilterNum + ", try again later");
- }
long currentMaxFullNum = wallet.getNowBlock().getBlockHeader().getRawData().getNumber();
LogFilterAndResult logFilterAndResult = new LogFilterAndResult(fr, currentMaxFullNum, wallet);
String filterID = generateFilterId();
- eventFilter2Result.put(filterID, logFilterAndResult);
+ if (eventFilter2Result.size() >= maxLogFilterNum) {
+ throw new JsonRpcExceedLimitException(
+ "exceed max log filters: " + maxLogFilterNum + ", try again later");
+ }
+ eventFilter2Result.put(filterID, logFilterAndResult);
+ if (eventFilter2Result.size() > maxLogFilterNum) {
+ eventFilter2Result.remove(filterID);
+ throw new JsonRpcExceedLimitException(
+ "exceed max log filters: " + maxLogFilterNum + ", try again later");
+ }
return ByteArray.toJsonHex(filterID);The same concern applies to 🤖 Prompt for AI Agents |
||
| long currentMaxFullNum = wallet.getNowBlock().getBlockHeader().getRawData().getNumber(); | ||
| LogFilterAndResult logFilterAndResult = new LogFilterAndResult(fr, currentMaxFullNum, wallet); | ||
| String filterID = generateFilterId(); | ||
|
|
@@ -1573,6 +1595,7 @@ public static Object[] getFilterResult(String filterId, Map<String, BlockFilterA | |
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(LOGS_FILTER_POOL, "logs-filter-pool"); | ||
| logElementCache.invalidateAll(); | ||
| blockHashCache.invalidateAll(); | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(sectionExecutor, esName); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: 317787106/java-tron
Length of output: 17055
🏁 Script executed:
Repository: 317787106/java-tron
Length of output: 45
🏁 Script executed:
Repository: 317787106/java-tron
Length of output: 2332
🏁 Script executed:
Repository: 317787106/java-tron
Length of output: 789
Add validation for
maxLogFilterNumto reject non-positive values.Line 270 accepts zero or negative values, which would break all
eth_newFiltercalls due to the check atTronJsonRpcImpl.java:1443(if (eventFilter2Result.size() >= maxLogFilterNum)). Validate that the parameter is positive when loading config to fail fast.Proposed validation
if (config.hasPath(ConfigKey.NODE_JSONRPC_MAX_LOG_FILTER_NUM)) { - PARAMETER.jsonRpcMaxLogFilterNum = - config.getInt(ConfigKey.NODE_JSONRPC_MAX_LOG_FILTER_NUM); + int maxLogFilterNum = config.getInt(ConfigKey.NODE_JSONRPC_MAX_LOG_FILTER_NUM); + if (maxLogFilterNum <= 0) { + throw new IllegalArgumentException( + ConfigKey.NODE_JSONRPC_MAX_LOG_FILTER_NUM + " must be greater than 0"); + } + PARAMETER.jsonRpcMaxLogFilterNum = maxLogFilterNum; }🤖 Prompt for AI Agents