Fix incorrect measurement schema during compaction#17297
Draft
Fix incorrect measurement schema during compaction#17297
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR addresses incorrect/unstable measurement schema (notably data type) selection during compaction by making compaction choose the “latest” schema (from schema engine / table cache) when source TsFiles contain conflicting schemas, and by expanding/refactoring compaction tests to cover these scenarios.
Changes:
- Update compaction executors/iterators to detect data type mismatches across files and reconcile schemas using the latest schema (tree model) or table cache (table model).
- Add a test-only schema fetcher injection path and new/refactored compaction tests (including parameterized coverage across performer types).
- Fix table schema copy behavior in
TsTable(copy tag column index map rather than a removed/incorrect map).
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java | Fix copy-constructor to copy tag column index map; remove unused/incorrect map. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFakeSchemaFetcherImpl.java | Add schema fetcher test helper exposing schema tree. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java | Use centralized performer selection helper. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchTest.java | Move into alterDataType package; parameterize by performer; reuse shared helpers. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchAlterableDataTypeTest.java | Refactor to shared base; reuse file-generation helpers. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTest.java | New parameterized alter-data-type compaction test for tree model. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTableTest.java | New parameterized alter-data-type compaction test for table model using DataNodeTableCache. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/AbstractCompactionAlterDataTypeTest.java | New shared base wiring a test schema fetcher + reusable TsFile generators. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeAlterTest.java | Remove old non-parameterized test (replaced by new alterDataType package tests). |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java | Add shared getPerformer(String) helper for tests. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakeSchemaFetcherImpl.java | Make schema tree and generator overridable (protected) for test specialization. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java | Choose schema collection strategy for table vs tree model; reconcile schema for tree model using latest schema when needed. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java | Add “can alter” validation before rewriting chunk types. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java | Thread database name through to aligned executor for table model schema lookup. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java | Add database accessor; add schema reconciliation via latest schema (tree) and table cache (table); propagate corrected final types. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java | Add test-only schema fetcher injection and helper to fetch latest measurement schemas for tree model. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionSeriesContext.java | Track whether series needs type update and allow setting final type explicitly. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java | Pass database into aligned executor; reconcile non-aligned chunk types using latest schema (tree). |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java | Simplify non-aligned compaction setup to use updated iterator context map. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| } | ||
|
|
||
| @After | ||
| public void tearDown() throws IOException, StorageEngineException { |
Comment on lines
+173
to
+174
| TsTableColumnSchema schemaInTsTable = | ||
| tsTable.getColumnSchema(chunkMetadata.getMeasurementUid()); |
| if (measurementSchema == null && !timeseriesMetadata.getChunkMetadataList().isEmpty()) { | ||
| measurementSchema = | ||
| reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()); | ||
| } |
Comment on lines
322
to
+372
| // find correct data type | ||
| TSDataType correctDataType = null; | ||
| for (int i = readerAndChunkMetadataList.size() - 1; i >= 0 && correctDataType == null; i--) { | ||
| List<ChunkMetadata> chunkMetadataList = readerAndChunkMetadataList.get(i).getRight(); | ||
| if (chunkMetadataList == null || chunkMetadataList.isEmpty()) { | ||
| continue; | ||
| } | ||
| boolean hasDifferentDataTypes = false; | ||
| Iterator<Pair<TsFileSequenceReader, List<ChunkMetadata>>> descIterator = | ||
| readerAndChunkMetadataList.descendingIterator(); | ||
| while (descIterator.hasNext()) { | ||
| Pair<TsFileSequenceReader, List<ChunkMetadata>> pair = descIterator.next(); | ||
| List<ChunkMetadata> chunkMetadataList = pair.right; | ||
| TSDataType dataTypeInCurrentFile = null; | ||
| for (ChunkMetadata chunkMetadata : chunkMetadataList) { | ||
| if (chunkMetadata == null) { | ||
| continue; | ||
| if (chunkMetadata != null) { | ||
| dataTypeInCurrentFile = chunkMetadata.getDataType(); | ||
| break; | ||
| } | ||
| correctDataType = chunkMetadata.getDataType(); | ||
| } | ||
| if (correctDataType == null) { | ||
| correctDataType = dataTypeInCurrentFile; | ||
| } else if (correctDataType != dataTypeInCurrentFile) { | ||
| hasDifferentDataTypes = true; | ||
| break; | ||
| } | ||
| } | ||
| if (correctDataType == null) { | ||
| if (!hasDifferentDataTypes) { | ||
| return readerAndChunkMetadataList; | ||
| } | ||
|
|
||
| IMeasurementSchema schema = | ||
| CompactionUtils.getLatestMeasurementSchemasForTreeModel( | ||
| deviceID, Collections.singletonList(measurement)) | ||
| .get(0); | ||
| if (schema != null) { | ||
| correctDataType = schema.getType(); | ||
| } | ||
|
|
||
| LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new LinkedList<>(); | ||
| // check data type consistent and skip compact files with wrong data type | ||
| for (Pair<TsFileSequenceReader, List<ChunkMetadata>> tsFileSequenceReaderListPair : | ||
| readerAndChunkMetadataList) { | ||
| boolean dataTypeConsistent = true; | ||
| for (ChunkMetadata chunkMetadata : tsFileSequenceReaderListPair.getRight()) { | ||
| if (chunkMetadata != null | ||
| && !MetadataUtils.canAlter(chunkMetadata.getDataType(), correctDataType)) { | ||
| dataTypeConsistent = false; | ||
| if (chunkMetadata == null) { | ||
| continue; | ||
| } | ||
| if (chunkMetadata.getDataType() == correctDataType) { | ||
| break; | ||
| } | ||
| if (chunkMetadata != null && chunkMetadata.getDataType() != correctDataType) { | ||
| chunkMetadata.setNewType(correctDataType); | ||
| if (!MetadataUtils.canAlter(chunkMetadata.getDataType(), correctDataType)) { | ||
| dataTypeConsistent = false; | ||
| break; | ||
| } | ||
| chunkMetadata.setNewType(correctDataType); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description