-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(schema): Migrate HoodieRecord methods to use HoodieSchema instead of Avro.Schema #17772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
5c8cf22 to
691a8ae
Compare
691a8ae to
8cc8fa2
Compare
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
Outdated
Show resolved
Hide resolved
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
Outdated
Show resolved
Hide resolved
.../hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
Outdated
Show resolved
Hide resolved
...ent/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
Outdated
Show resolved
Hide resolved
88e0a16 to
b892713
Compare
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
Outdated
Show resolved
Hide resolved
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
|
Sorry, went through the changes again and found that there are more things that can be improved. I believe this should be all. Let's wait for @the-other-tim-brown's review too after you've made the changes i suggested. Thank you so much for the contribution, we really appreciate it! |
.../hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
Outdated
Show resolved
Hide resolved
57bec67 to
d40ef89
Compare
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
Show resolved
Hide resolved
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
Outdated
Show resolved
Hide resolved
5887e38 to
4608f20
Compare
...spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
Outdated
Show resolved
Hide resolved
...spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
Outdated
Show resolved
Hide resolved
...ient/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
Outdated
Show resolved
Hide resolved
...ient/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
Outdated
Show resolved
Hide resolved
...ient/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
Outdated
Show resolved
Hide resolved
...ient/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
Outdated
Show resolved
Hide resolved
...ient/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
Outdated
Show resolved
Hide resolved
voonhous
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| return OrderingValues.getDefault(); | ||
| } else { | ||
| return OrderingValues.create(orderingFields, field -> { | ||
| if (recordSchema.getField(field) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieSchema#getField never returns null. It returns Option.empty() if the field is not found. So all invocations of HoodieSchema#getField should be revisited and revised so that they have parity as before (i.e., use recordSchema.getField(field).isEmpty() or recordSchema.getField(field).isPresent() instead of null check).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Four places to check in HoodieFlinkRecord
doGetOrderingValue(HoodieSchema, Properties, String[])
if (recordSchema.getField(field) == null) {
getOrderingValueAsJava(HoodieSchema, Properties, String[])
if (recordSchema.getField(field) == null) {
getRecordKey(HoodieSchema, Option<BaseKeyGenerator>)
ValidationUtils.checkArgument(recordSchema.getField(RECORD_KEY_METADATA_FIELD) != null,
updateMetaField(HoodieSchema, int, String)
boolean withOperation = recordSchema.getField(OPERATION_METADATA_FIELD) != null;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also could you add unit tests to cover this method?
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience on the review
| return OrderingValues.getDefault(); | ||
| } else { | ||
| return OrderingValues.create(orderingFields, field -> { | ||
| if (recordSchema.getField(field) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Four places to check in HoodieFlinkRecord
doGetOrderingValue(HoodieSchema, Properties, String[])
if (recordSchema.getField(field) == null) {
getOrderingValueAsJava(HoodieSchema, Properties, String[])
if (recordSchema.getField(field) == null) {
getRecordKey(HoodieSchema, Option<BaseKeyGenerator>)
ValidationUtils.checkArgument(recordSchema.getField(RECORD_KEY_METADATA_FIELD) != null,
updateMetaField(HoodieSchema, int, String)
boolean withOperation = recordSchema.getField(OPERATION_METADATA_FIELD) != null;
|
|
||
| @Override | ||
| public Comparable<?> getOrderingValueAsJava(Schema recordSchema, Properties props, String[] orderingFields) { | ||
| public Comparable<?> getOrderingValueAsJava(HoodieSchema recordSchema, Properties props, String[] orderingFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same on if (recordSchema.getField(field) == null) { to avoid null check
| public String getRecordKey(HoodieSchema recordSchema, String keyFieldName) { | ||
| if (key == null) { | ||
| String recordKey = Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data)); | ||
| String recordKey = Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema()).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is RowDataAvroQueryContexts going to be migrated away from Avro schema separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I ask is that there's still back and forth conversion to Avro Schema for RowDataAvroQueryContexts to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll take this on in #17739
| return OrderingValues.getDefault(); | ||
| } else { | ||
| return OrderingValues.create(orderingFields, field -> { | ||
| if (recordSchema.getField(field) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also could you add unit tests to cover this method?
| int seqId = 1; | ||
| for (HoodieRecord record : records) { | ||
| GenericRecord avroRecord = (GenericRecord) record.rewriteRecordWithNewSchema(schema, CollectionUtils.emptyProps(), schema).getData(); | ||
| GenericRecord avroRecord = (GenericRecord) record.rewriteRecordWithNewSchema(HoodieSchema.fromAvroSchema(schema), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The Avro schema can be converted beforehand once, instead of conversion per record.
| int seqId = 1; | ||
| for (HoodieRecord record : records) { | ||
| GenericRecord avroRecord = (GenericRecord) record.toIndexedRecord(schema, CollectionUtils.emptyProps()).get().getData(); | ||
| GenericRecord avroRecord = (GenericRecord) record.toIndexedRecord(HoodieSchema.fromAvroSchema(schema), CollectionUtils.emptyProps()).get().getData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: similar
| public String getRecordKey(HoodieSchema recordSchema, String keyFieldName) { | ||
| if (key == null) { | ||
| String recordKey = Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data)); | ||
| String recordKey = Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema()).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I ask is that there's still back and forth conversion to Avro Schema for RowDataAvroQueryContexts to use.
| while (recordItr.hasNext()) { | ||
| HoodieRecord record = recordItr.next(); | ||
| String recordKey = record.getRecordKey(readerSchema, keyFieldName); | ||
| String recordKey = record.getRecordKey(HoodieSchema.fromAvroSchema(readerSchema), keyFieldName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieSchema should be generated outside the while loop
| void testConvertColumnValueForLogicalTypeWithNullValue() { | ||
| Schema dateSchema = Schema.create(Schema.Type.INT); | ||
| LogicalTypes.date().addToSchema(dateSchema); | ||
| HoodieSchema dateSchema = HoodieSchema.create(HoodieSchemaType.INT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be HoodieSchema.createDate();?
Describe the issue this Pull Request addresses
This PR addresses the migration of HoodieRecord API from using
org.apache.avro.Schemato org.apache.hudi.common.schema.HoodieSchema for all record-related methods. This change reduces coupling with Avro-specific implementations.Closes #17689
Summary and Changelog
Users can now use HoodieSchema consistently across all HoodieRecord methods. This provides a unified schema abstraction layer.
Impact
All HoodieRecord methods that previously accepted
org.apache.avro.Schemanow acceptorg.apache.hudi.common.schema.HoodieSchemaRisk Level
Low
Documentation Update
N/A
Contributor's checklist