Subscription: refactor payload APIs to use ResultSet and tsfile readers#17225
Subscription: refactor payload APIs to use ResultSet and tsfile readers#17225VGalaxies wants to merge 7 commits intoapache:masterfrom
Conversation
…ile APIs Reuse tsfile ResultSet in subscription payloads, rename message accessors to getRecords()/getTsFile(), and return ITsFileReader from tsfile handler. Also rename format constants to SubscriptionRecordHandler/SubscriptionTsFileHandler with legacy TsFileHandler compatibility. BREAKING CHANGE: SubscriptionMessage.getSessionDataSetsHandler() and getTsFileHandler() are removed in favor of getRecords() and getTsFile(). SubscriptionSessionDataSet/SubscriptionSessionDataSetsHandler are removed and format default/value names are updated.
…aders - migrate subscription message usage to getRecords/getTsFile across tests and examples - add record helpers (tablet iterator, row iteration) for ResultSet-based payloads - unify tsfile access through openReader() with tree/table model-specific implementations BREAKING CHANGE: subscription payload consumers should use getRecords()/getTsFile() and the new message type names (RECORD_HANDLER/TS_FILE); legacy handler-style payload APIs are no longer used.
There was a problem hiding this comment.
Pull request overview
This PR refactors the IoTDB subscription client payload APIs, replacing SubscriptionSessionDataSet / SubscriptionSessionDataSetsHandler with a ResultSet-based interface, renaming getSessionDataSetsHandler() to getRecords() and getTsFileHandler() to getTsFile(), and updating openReader() to use the v4 TsFileTreeReaderBuilder / TsFileReaderBuilder APIs. The message type enum values are renamed from SESSION_DATA_SETS_HANDLER/TS_FILE_HANDLER to RECORD_HANDLER/TS_FILE.
Changes:
- Introduce
SubscriptionRecordHandleras the new payload adapter, exposingList<ResultSet>instead of the customSubscriptionSessionDataSetsHandler, and remove the oldSubscriptionSessionDataSet/SubscriptionSessionDataSetsHandlerclasses. - Refactor
SubscriptionTsFileHandler.openReader()to use generic return type dispatching betweenTsFileTreeReaderBuilder(tree-model) andTsFileReaderBuilder(table-model). - Update all integration tests, examples, CLI tools, and topic constants to use the new API naming, with backward-compatible deprecated constants for old format values.
Reviewed changes
Copilot reviewed 68 out of 68 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
SubscriptionTsFileHandler.java |
Replace TsFileReader/TsFileSequenceReader with generic openReader() dispatching to v4 builders |
SubscriptionRecordHandler.java |
New payload adapter implementing Iterable<ResultSet> with inner SubscriptionRecord class |
SubscriptionSessionDataSetsHandler.java |
Deleted (replaced by SubscriptionRecordHandler) |
SubscriptionSessionDataSet.java |
Deleted (replaced by SubscriptionRecord) |
SubscriptionMessage.java |
Rename accessors to getRecords(), getTsFile(), add getRecordTabletIterator() |
SubscriptionMessageType.java |
Rename enum values to RECORD_HANDLER and TS_FILE |
TopicConstant.java |
Rename format constants, add deprecated aliases |
TopicConfig.java |
Add isTsFileFormat() with legacy backward compat |
AbstractSubscriptionConsumer.java |
Update references to new API names |
| CLI & example files | Update to new API names |
| ~30 integration test files | Update to new API, adding downcasts from ResultSet to SubscriptionRecord |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Deprecated | ||
| public static final String FORMAT_SESSION_DATA_SETS_HANDLER_VALUE = FORMAT_RECORD_HANDLER_VALUE; | ||
|
|
||
| @Deprecated public static final String FORMAT_TS_FILE_HANDLER_VALUE = FORMAT_TS_FILE_VALUE; |
There was a problem hiding this comment.
The @Deprecated constants have had their values silently changed. FORMAT_TS_FILE_HANDLER_VALUE was previously "TsFileHandler" and is now "SubscriptionTsFileHandler". Similarly, FORMAT_SESSION_DATA_SETS_HANDLER_VALUE was "SessionDataSetsHandler" and is now "SubscriptionRecordHandler".
Any external code that was using these deprecated constants as string values (e.g., stored in topic configurations or compared against persisted strings) will break silently. The isTsFileFormat() method in TopicConfig correctly handles the legacy "TsFileHandler" string, but there's no equivalent backward compat for the old "SessionDataSetsHandler" string in the non-TsFile path. If existing topics were created with format=SessionDataSetsHandler, the behavior may change.
Consider preserving the original string values in the deprecated constants, or adding a legacy check for "SessionDataSetsHandler" similar to LEGACY_FORMAT_TS_FILE_HANDLER_VALUE.
| // System.out.println(FORMAT.format(new Date()) + " onReceived=" + onReceived.get()); | ||
| final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler(); | ||
| final SubscriptionTsFileHandler tsFileHandler = message.getTsFile(); | ||
| try (final TsFileReader tsFileReader = tsFileHandler.openReader()) { |
There was a problem hiding this comment.
These three call sites use openReader() without an explicit cast (e.g. TsFileReader tsFileReader = tsFileHandler.openReader()), relying on the generic type inference of the new <T extends AutoCloseable> T openReader() method. While this compiles, if the builder returns a type other than TsFileReader (e.g., ITsFileTreeReader), it will fail with a ClassCastException at runtime. This should be updated to use an explicit cast consistent with the other integration tests, or better yet, use the actual return type of the builder.
| try (final TsFileReader tsFileReader = tsFileHandler.openReader()) { | |
| try (final TsFileReader tsFileReader = (TsFileReader) tsFileHandler.openReader()) { |
| @SuppressWarnings("unchecked") | ||
| public <T extends AutoCloseable> T openReader() throws IOException { | ||
| return (T) | ||
| (Objects.isNull(databaseName) | ||
| ? new TsFileTreeReaderBuilder().file(new File(absolutePath)).build() | ||
| : new TsFileReaderBuilder().file(new File(absolutePath)).build()); | ||
| } |
There was a problem hiding this comment.
The generic method signature <T extends AutoCloseable> T openReader() is type-unsafe. It relies on unchecked cast inference and the caller can assign the result to any AutoCloseable subtype without a compiler warning — but a ClassCastException will occur at runtime if the inferred type doesn't match the actual return type. For example, in AbstractSubscriptionTreeRegressionIT (lines 346, 413, 464), openReader() is called as TsFileReader tsFileReader = tsFileHandler.openReader() (no explicit cast), which will silently infer T = TsFileReader. If the builder returns a different type (e.g. ITsFileTreeReader), this will fail at runtime.
A safer approach is to either:
- Return
AutoCloseable(or a common interface likeITsFileTreeReader/ITsFileReader) and let callers cast explicitly, making the risk visible. - Provide two separate methods (e.g.
openTreeReader()/openTableReader()) with concrete return types.
| public List<ResultSet> getRecords() { | ||
| if (handler instanceof SubscriptionRecordHandler) { | ||
| return ((SubscriptionRecordHandler) handler).getRecords(); | ||
| } | ||
| throw new SubscriptionIncompatibleHandlerException( | ||
| String.format("%s do not support getRecords().", handler.getClass().getSimpleName())); | ||
| } |
There was a problem hiding this comment.
The new public API exposes List<ResultSet> from getRecords(), but almost every consumer site immediately downcasts the ResultSet to SubscriptionRecordHandler.SubscriptionRecord to call .getTablet(), .hasNext(), .nextRecord(), .getColumnNames(), etc. This defeats the purpose of the refactoring — the API promises a standard ResultSet abstraction, but it isn't actually usable without the downcast.
Consider either:
- Exposing
List<SubscriptionRecordHandler.SubscriptionRecord>directly fromgetRecords()(since that's what all callers need). - Adding convenience methods on
SubscriptionMessagefor the most common operations (like the existinggetRecordTabletIterator()) so callers don't need to know aboutSubscriptionRecordat all.
| return records.iterator(); | ||
| } | ||
|
|
||
| public static class SubscriptionRecord extends AbstractResultSet { |
There was a problem hiding this comment.
SubscriptionRecord -> SubscriptionResultSet
| System.out.println(dataSet.getColumnTypes()); | ||
| while (dataSet.hasNext()) { | ||
| System.out.println(dataSet.next()); | ||
| for (final ResultSet dataSet : message.getRecords()) { |
There was a problem hiding this comment.
getRecords -> getResultSets
| try (final ITsFileTreeReader reader = message.getTsFile().openReader()) { | ||
| reader.getAllDeviceIds().forEach(System.out::println); | ||
| } |
There was a problem hiding this comment.
openReader -> openTreeReader, and also add openTableReader?
| while (((org.apache.iotdb.session.subscription.payload | ||
| .SubscriptionRecordHandler.SubscriptionRecord) |
There was a problem hiding this comment.
Do not use the full name, check all places
| try (final TsFileReader tsFileReader = | ||
| (TsFileReader) message.getTsFile().openReader()) { |
There was a problem hiding this comment.
TsFileReader is an inner implementation, better not to use it.
Replace SubscriptionTsFileHandler.openReader() with openTreeReader() and openTableReader(). Update tree-model examples and integration tests to use ITsFileTreeReader and a v4 query adapter. BREAKING CHANGE: tree consumers must call openTreeReader(), and table consumers must call openTableReader().
Rename SubscriptionMessage.getRecords() to getResultSets() and SubscriptionRecordHandler.SubscriptionRecord to SubscriptionResultSet. Update examples and integration tests to use the new record payload names. BREAKING CHANGE: callers must replace getRecords() with getResultSets() and SubscriptionRecord with SubscriptionResultSet.
Summary
This PR refactors the subscription client payload APIs to expose record-oriented and tsfile-oriented interfaces directly, instead of the previous handler-centric API surface.
The main goal is to make the client API align better with the public tsfile interfaces and remove the custom dataset wrapper that was only used by subscription consumers.
Main interface changes
SubscriptionSessionDataSet/SubscriptionSessionDataSetsHandlerwithList<org.apache.tsfile.read.query.dataset.ResultSet>exposed fromSubscriptionMessage#getRecords().SubscriptionMessage#getSessionDataSetsHandler()toSubscriptionMessage#getRecords().SubscriptionMessage#getTsFileHandler()toSubscriptionMessage#getTsFile().RECORD_HANDLERandTS_FILE.SubscriptionTsFileHandler#openReader()entry point while avoiding hard-coding consumer-facing logic around concrete reader construction.Reader changes
openReader()now builds the reader withTsFileTreeReaderBuilder.openReader()now builds the reader withTsFileReaderBuilder.new TsFileTreeReaderBuilder().file(file).build()new TsFileReaderBuilder().file(file).build()Payload and implementation updates
SubscriptionRecordHandleras the payload adapter for record-format messages.SubscriptionSessionDataSetandSubscriptionSessionDataSetsHandlerclasses.ResultSetinterface instead of exposing a custom subscription-only dataset abstraction.Compatibility and migration notes
message.getRecords()message.getTsFile()SubscriptionSessionDataSetshould now use the tsfileResultSetAPI directly.Verification
mvn -T 8 spotless:apply -P with-integration-testsmvn -T 8 clean package -P with-integration-tests -DskipUTs -pl integration-test,distribution -DfailIfNoTests=false -am -UThis PR was primarily authored with Codex using gpt-5.4 xhigh and then hand-reviewed by me. I AM responsible for every change made in this PR. I aimed to keep it aligned with our goals, though I may have missed minor issues. Please flag anything that feels off, I'll fix it quickly.