-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29287: Iceberg: [V3] Variant Shredding support #6152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
same thing as apache/iceberg#14297 |
| TableScan | ||
| alias: tbl_shredded_variant | ||
| filterExpr: (UDFToDouble(variant_get(data, '$.age')) > 25.0D) (type: boolean) | ||
| Statistics: Num rows: 3 Data size: 1020 Basic stats: COMPLETE Column stats: NONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PPD is not supported here, would be addressed in a separate JIRA
|
|
I tested variant_type_shredding.q by removing 'variant.shredding.enabled'='true' from the table properties, and the qtest still passes without any failures. so maybe we can add a JUnit test (e.g., TestVariantShredding) that: |
that test was added in iceberg: Expose variantShreddingFunc() in Parquet.DataWriteBuilder plan was to fully cover the functionality with explain plan once PPD support is added. |
1b9fa43 to
d061a53
Compare
aturoczy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As all of the comment is address, can we merge?
| */ | ||
| public void initialize(Supplier<Record> record) { | ||
| if (sampleRecord == null) { | ||
| sampleRecord = record; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only needs to initialize when the sampleRecord is null? Wouldn't be easier just to always initialize? Maybe there is a special place for caller to handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It captures the first record being written and stores it in sampleRecord. the same strategy is applied in Spark to perform variant shredding.
need to get a green build, it's flaky atm |
|
| shell.executeStatement( | ||
| String.format( | ||
| "INSERT INTO %s VALUES " + | ||
| "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
u change this to "(1, parse_json('null'))," +, the whole feature gets disabled. If u remove the assertion values, this fails
assertThat(variantType.containsField("typed_value")).isTrue();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have used DEFAULT values; however, Hive doesn’t support them for STRUCT or VARIANT types.
Default values for payload of type variant are not supported
In Iceberg V4, the execution engine may be able to pass the shredded writer schema, which would make this easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in #6234
| import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; | ||
| import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; | ||
|
|
||
| // TODO: remove class once upgraded to Iceberg v1.11.0 (https://github.com/apache/iceberg/pull/14153) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deniskuzZ We copied a 1KLOC file just to fastrack this? we didn't had a hive release coming either
We have done patching for stuff, for stuff which Iceberg community didn't agree or for some bug fixes or to maintain compat with our current version.
Hardly Iceberg people accept stuff, for accepted stuff we should have waited for an official release and this isn't a small file nor something urgent either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please read the TODO comment carefully.
The proposed three-line change has already been merged into Iceberg but is not yet released.
https://github.com/apache/hive/blame/master/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java#L209-L212
I requested it to be included in version 1.10.1, but it wasn’t accepted since that release is treated as a patch release and only includes bug fixes. It has already been nearly three months since the 1.10.1 release discussion began.
https://lists.apache.org/thread/9tjs060vzs6nnghk2brcw0hv89h4drp0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they would release it at one point, usually if we need something from a thirdparty lib, we wait for the release, or use the snapshot version till then. We already have copied a lot of iceberg code, one more doesn't make it worse though, but we have to maybe set some precedent one day, what all we can copy & when. Maybe not today or here, but some day.
Lets park a ticket for sure so we don't forget to drop it when we upgarde or if someone else does it, they doesn't skip it
| -- Disable vectorized execution until Variant type is supported | ||
| set hive.vectorized.execution.enabled=false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing this isn't throwing any exception. In some cases it is actually giving wrong results. We should fallback ideally if we have a shredded variant column & vectorization enabled.
In your test if you don't have this it gives
Caused by: java.lang.RuntimeException: MALFORMED_VARIANT
at org.apache.hadoop.hive.serde2.variant.VariantUtil.malformedVariant(VariantUtil.java:180)
at org.apache.hadoop.hive.serde2.variant.Variant.convertToByteArray(Variant.java:81)
at org.apache.hadoop.hive.serde2.variant.Variant.from(Variant.java:59)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDFVariantGet.evaluate(GenericUDFVariantGet.java:102)
I tried another case, where it silently gives wrong result
CREATE TABLE t (
id INT,
v VARIANT
)
STORED BY ICEBERG
TBLPROPERTIES (
'format-version'='3',
'variant.shredding.enabled'='true'
);
INSERT INTO t VALUES
(1, parse_json('{"a": 1}')),
(2, parse_json('{"b": 2}'));
SELECT
try_variant_get(v, '$.a'),
try_variant_get(v, '$.b')
FROM t
ORDER BY id;
With vectorization off
1 NULL
NULL 2
With vectorization on
NULL 2
NULL NULL
Without Shredding but vecotrization on
1 NULL
NULL 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a ticket for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| */ | ||
| private static boolean hasVariantColumns(Schema schema) { | ||
| return schema.columns().stream() | ||
| .anyMatch(field -> field.type() instanceof Types.VariantType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if field is of type struct and within that there is a Variant data type?. A table like
CREATE TABLE t_struct_variant (
id INT,
s STRUCT<
user_id: INT,
payload: VARIANT
>
)
STORED BY ICEBERG
TBLPROPERTIES (
'format-version'='3',
'variant.shredding.enabled'='true'
);
-- Insert JSON structures
INSERT INTO t_struct_variant VALUES
(
1,
named_struct(
'user_id', 101,
'payload', parse_json('{"name":"Alice","age":30}')
)
),
(
2,
named_struct(
'user_id', 102,
'payload', parse_json('{"name":"Bob"}')
)
),
(
3,
named_struct(
'user_id', 103,
'payload', parse_json('{"active":true,"score":9.5}')
)
);
It did had a variant column, but it got skipped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not supported, and it generally doesn’t make sense from a modeling perspective. VARIANT is already a container for arbitrary JSON / semi-structured data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is supported afaik. Maps & List aren't supported.

Atleast what this databrics doc says
https://docs.databricks.com/aws/en/delta/variant-shredding#limitations
I was searching a bit more when I was trying this:
This is very common:
STRUCT<
metadata: STRUCT<ts, source, version>,
payload: VARIANT
>
- Strongly typed envelope
- Flexible payload
From a data modeling perspective, this makes perfect sense. Maybe double check once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very common
haven't found this on any resources, documentation of variant shredding is pretty limited.
Storing the schema as:
STRUCT<
metadata: STRUCT<ts, source, version>,
payload: VARIANT
>
has some downsides:
-
Query complexity: Every time you want to filter or join by metadata.ts or metadata.source, you have to dig into the nested struct (PPD for structs is not supported by Hive-Iceberg).
-
Schema evolution: If you want to add new metadata fields later, nested structs can make evolution tricky without rewriting the table.
not sure we even support thisALTER TABLE my_table ADD COLUMN metadata.new_field STRING; -
Analytical usefulness: Often, metadata like ts, source, version is used more frequently for filtering, auditing, or incremental ingestion than the payload itself. Keeping it separate makes this more convenient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll check if that could be easily implemented, though I'm not sure how useful it would be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thnx. I parked one ticket for it: https://issues.apache.org/jira/browse/HIVE-29373
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in #6234
| Map<String, String> properties) { | ||
|
|
||
| // Preconditions: must have variant columns + property enabled | ||
| if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big thing, but i feel in general the check should be flipped
if (!isVariantShreddingEnabled(properties) || !hasVariantColumns(schema))
if isVariantShreddingEnabled isn't enabled, we need not to parse the schema at all, here if it isn't enabled, still the first condition in or will be executed first for no reason, and that is iterating over the entire DS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, I'll change it in a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in #6234
| if (sampleRecord != null) { | ||
| try { | ||
| Object variantValue = sampleRecord.getField(name); | ||
| if (variantValue instanceof Variant variant) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should infer schema for the first non null variantValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #6152 (comment).
To infer the schema from the first non-null value, we must buffer rows until that value is encountered. This adds performance overhead, including increased memory usage for buffering, potential pipeline stalls, and more complex processing.
I think an explicit schema would be a better approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious does some engine follow this way, like inferring shredding schema from first record. I was looking at a spark commit:
apache/spark@3c3d1a6
it seems explicit only, didn't dig deep though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i was checking this: apache/iceberg#14297
btw, it doesn't seem that databricks specifies the shredded variant write schema, but does the inference
https://docs.databricks.com/aws/en/delta/variant-shredding
| if (sampleRecord == null) { | ||
| sampleRecord = record; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking about this, we are taking the first record. Can it lead to Task-level non-determinism, like one insert lead to multiple inserts & each task captures its own first record & schema.
but i think there wasn't a better way, maybe in some later world we allow the user itself to define the columns to be shredded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, something like this, an explicit shredded schema would be a better solution



What changes were proposed in this pull request?
Support for variant shredding, enabling Hive to write shredded variant data into Iceberg tables.
Ideally, this should follow the approach described in the reader/writer API proposal for Iceberg V4, where an execution engine provides the shredded writer schema.
As an interim solution, this PR introduces a writer that infers the shredded schema from the sample record captured before the Parquet writer is initialized.
Why are the changes needed?
Enables data skipping (predicate pushdown)
Does this PR introduce any user-facing change?
No
How was this patch tested?