Skip to content

Conversation

@zk-drizzle
Copy link
Contributor

RIP-82 Implement Timer message, transaction message, and index based on RocksDB

Fixes #9786

Brief Description

Implement Timer message, transaction message, and index based on RocksDB

How Did You Test This Change?

drizzle.zk and others added 6 commits October 27, 2025 11:27
Change-Id: Ia4b0e2a21aa5e12570a80713432fd48ddfb210e6
Change-Id: I65a43a612f09280cb61d2a23b3324d198c6a71de
Change-Id: Idf8080c7b17c25e14fe34ef6bad1e1150dab58d6
Change-Id: I904f0120a728b4eb87226159f9c0cde3d18ef26a
@RongtongJin RongtongJin changed the title RIP-82 Implement Timer message, transaction message, and index based on RocksDB [RIP-82] Implement Timer message, transaction message, and index based on RocksDB Oct 29, 2025
fuyou001
fuyou001 previously approved these changes Oct 29, 2025
@fuyou001 fuyou001 self-requested a review October 29, 2025 05:19
drizzle.zk added 2 commits October 29, 2025 13:54
Change-Id: I712b9cde9a18c730fd020ea76f05560e594a9edd
@codecov-commenter
Copy link

codecov-commenter commented Oct 29, 2025

Codecov Report

❌ Patch coverage is 12.91667% with 2090 lines in your changes missing coverage. Please review.
✅ Project coverage is 47.44%. Comparing base (bea086f) to head (f1cc7ba).
⚠️ Report is 21 commits behind head on develop.

Files with missing lines Patch % Lines
.../rocketmq/store/rocksdb/MessageRocksDBStorage.java 14.42% 356 Missing ⚠️
.../store/timer/rocksdb/TimerMessageRocksDBStore.java 0.00% 321 Missing ⚠️
.../apache/rocketmq/store/timer/rocksdb/Timeline.java 0.00% 253 Missing ⚠️
...mq/store/transaction/TransMessageRocksDBStore.java 0.00% 193 Missing ⚠️
...ocketmq/store/index/rocksdb/IndexRocksDBStore.java 8.98% 160 Missing and 2 partials ⚠️
...on/rocksdb/TransactionalMessageRocksDBService.java 0.00% 145 Missing ⚠️
...rocketmq/store/transaction/TransRocksDBRecord.java 0.00% 72 Missing ⚠️
...cketmq/store/timer/rocksdb/TimerRocksDBRecord.java 0.00% 64 Missing ⚠️
...org/apache/rocketmq/store/DefaultMessageStore.java 16.00% 58 Missing and 5 partials ⚠️
...ocketmq/broker/processor/AdminBrokerProcessor.java 0.00% 50 Missing ⚠️
... and 26 more
Additional details and impacted files
@@              Coverage Diff              @@
##             develop    #9787      +/-   ##
=============================================
- Coverage      48.41%   47.44%   -0.98%     
- Complexity     12255    12274      +19     
=============================================
  Files           1314     1324      +10     
  Lines          93668    96026    +2358     
  Branches       12011    12373     +362     
=============================================
+ Hits           45347    45557     +210     
- Misses         42733    44833    +2100     
- Partials        5588     5636      +48     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

fuyou001
fuyou001 previously approved these changes Oct 29, 2025
pom.xml Outdated

<properties>
<revision>5.3.4-SNAPSHOT</revision>
<revision>5.3.6-rocksdb-SNAPSHOT</revision>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version number should not be changed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

db.flush(flushOptions, timerCFHandle);
log.info("MessageRocksDBStorage flush timer wal success");
} catch (Exception e) {
logError.error("MessageRocksDBStorage flush timer wal failed, error: {}", e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly printing the exception is preferable, as printing e.getMessage() may lose important stack trace information, hindering troubleshooting.

} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("switch timer engine error");
LOGGER.info("switchTimerEngine error");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @zk-drizzle . I'm just curious why all the logs here are INFO level, regardless of whether the restart succeeds or fails.

Shouldn't WARN or ERROR level logs be used to provide some warning information to the user when the restart fails?

while (true) {
try {
List<TransRocksDBRecord> trs = messageRocksDBStorage.scanRecordsForTrans(TRANS_COLUMN_FAMILY, MAX_BATCH_SIZE_FROM_ROCKSDB, lastKey);
if (null == trs || CollectionUtils.isEmpty(trs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null == trs check is redundant.

Suggested change
if (null == trs || CollectionUtils.isEmpty(trs)) {
if (CollectionUtils.isEmpty(trs)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right ~

}

public void shutdown() {
if (this.state != RUNNING || this.state == SHUTDOWN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @zk-drizzle . Is the check this.state == SHUTDOWN redundant? When state equals SHUTDOWN, the condition this.state != RUNNING is already satisfied.

Copy link
Contributor

@majialoong majialoong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @zk-drizzle PR, this is an exciting feature. I've left some comments.

}

public void shutdown() {
if (this.state != RUNNING || this.state == SHUTDOWN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

public long getCommitOffsetInRocksDB() {
if (null == messageRocksDBStorage || !storeConfig.isTransRocksDBEnable()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be isTimerRocksDBEnable?


private void checkTransRecordsStatus(List<TransRocksDBRecord> trs) {
if (CollectionUtils.isEmpty(trs)) {
log.error("TransactionalMessageRocksDBService checkTransRecordsStatus, trs is empty");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkTransRecordsStatus method is currently only called within the checkTransStatus method, and since the checkTransStatus method already checks for empty values, this method checks again for redundancy. Furthermore, the log level here is ERROR, while checkTransStatus uses INFO, creating inconsistencies.

this.storeConfig.setTimerStopEnqueue(true);
if (this.state == RUNNING && !this.storeConfig.isTimerRocksDBStopScan()) {
log.info("restart TimerMessageRocksDBStore has been running");
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, if the enqueue operation was stopped using setTimerStopEnqueue(true) above, will there be any issues if we simply return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem

long commitOffsetRocksDB = messageRocksDBStorage.getCheckpointForTimer(TIMER_COLUMN_FAMILY, MessageRocksDBStorage.SYS_TOPIC_SCAN_OFFSET_CHECK_POINT);
long maxCommitOffset = Math.max(commitOffsetFile, commitOffsetRocksDB);
this.readOffset.set(maxCommitOffset);
log.info("restart TimerMessageRocksDBStore has benn recover running, commitOffsetFile: {}, commitOffsetRocksDB: {}, readOffset: {}", commitOffsetFile, commitOffsetRocksDB, readOffset.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe is has been recovered to running

this.readOffset.set(maxCommitOffset);
log.info("restart TimerMessageRocksDBStore has benn recover running, commitOffsetFile: {}, commitOffsetRocksDB: {}, readOffset: {}", commitOffsetFile, commitOffsetRocksDB, readOffset.get());
} else {
this.load();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If load fails here, should we continue with the subsequent start?

Change-Id: Iafe10c007e9b5057212e2dcfb21671f970b1d344
fuyou001
fuyou001 previously approved these changes Nov 5, 2025
RongtongJin
RongtongJin previously approved these changes Nov 6, 2025
opt.setRequired(false);
options.addOption(opt);

opt = new Option("c", "clusterName", true, "update which cluster");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since either brokerAddr or clusterName must be specified, it would be more appropriate to use OptionGroup here and set Required to true. @zk-drizzle @fuyou001 @RongtongJin WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to require the cluster parameter, because when specifying a single broker explicitly, there's no need to set it.

Copy link
Contributor

@majialoong majialoong Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RongtongJin Thank you for your explanation. Yes, your description is correct; the code I marked might be misleading.

When specifying a broker, you don't need to set the clusterName.

However, you must choose between setting either the broker or the clusterName; this isn't mandatory in the current code.The current code doesn't enforce this; it only reminds the user to enter the e parameter.

image

Furthermore, other commands will use OptionGroup to remind the user that either the broker or the clusterName must be configured.

OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
optionGroup.addOption(opt);
opt = new Option("c", "clusterName", true, "create topic to which cluster");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If OptionGroup is used, the following message will be displayed:
image

It will remind the user that one of the broker and clusterName configurations must be set.

This may be more user-friendly and consistent with other commands.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RongtongJin Please take a look when you have time. I think this is just an optimization and doesn't affect actual use. I would be happy to make adjustments in a future PR if possible. Thanks !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RongtongJin Please take a look when you have time. I think this is just an optimization and doesn't affect actual use. I would be happy to make adjustments in a future PR if possible. Thanks !

Got it. This is indeed a better approach. Feel free to submit a PR for this change later. Thanks!

ShannonDing
ShannonDing previously approved these changes Nov 6, 2025
Change-Id: I40c1c7f54852ee033e0caa22573436ee505a27b8
@zk-drizzle zk-drizzle dismissed stale reviews from ShannonDing, RongtongJin, and fuyou001 via f1cc7ba November 6, 2025 11:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] RIP‐82 Implement Timer message, transaction message, and index based on RocksDB

7 participants