From b46175116bec44cc5e099bb21e5159581e25cbb8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 9 Jan 2025 18:09:14 +0000 Subject: [PATCH 1/2] [Managed Iceberg] Fix partition value race condition (#33549) * fix and update tests * dont mention df yet * add PR link * whitespace --- .../IO_Iceberg_Integration_Tests.json | 4 +- CHANGES.md | 42 +++++++++++++++++++ .../sdk/io/iceberg/RecordWriterManager.java | 20 +++++---- .../beam/sdk/io/iceberg/IcebergIOIT.java | 28 ++++++++++++- 4 files changed, 83 insertions(+), 11 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 2160d3c68005..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 5 + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index b8adeeb64362..410b6388bbdb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,38 @@ ## I/Os +* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## New Features / Improvements + +* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)). + +## Breaking Changes + +* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). +* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)), but forced Debezium IO to use protobuf 3 ([#33541](https://github.com/apache/beam/issues/33541) because Debezium clients are not protobuf 4 compatible. This may cause conflicts when using clients which are only compatible with protobuf 4. + +## Deprecations + +* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). + +## Bugfixes + +* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) + +## Security Fixes +* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). + +## Known Issues + +* ([#X](https://github.com/apache/beam/issues/X)). + +# [2.62.0] - Unreleased + +## I/Os + * gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939)) @@ -92,6 +124,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). * [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) +* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). @@ -138,6 +171,11 @@ * Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) * (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)). +## Known Issues + +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 + # [2.60.0] - 2024-10-17 ## Highlights @@ -192,6 +230,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.59.0] - 2024-09-11 @@ -240,6 +280,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.58.1] - 2024-08-15 diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 4c21a0175ab0..63186f26fb5a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -96,7 +96,9 @@ class DestinationState { private final IcebergDestination icebergDestination; private final PartitionSpec spec; private final org.apache.iceberg.Schema schema; - private final PartitionKey partitionKey; + // used to determine the partition to which a record belongs + // must not be directly used to create a writer + private final PartitionKey routingPartitionKey; private final Table table; private final String stateToken = UUID.randomUUID().toString(); final Cache writers; @@ -109,7 +111,7 @@ class DestinationState { this.icebergDestination = icebergDestination; this.schema = table.schema(); this.spec = table.spec(); - this.partitionKey = new PartitionKey(spec, schema); + this.routingPartitionKey = new PartitionKey(spec, schema); this.table = table; for (PartitionField partitionField : spec.fields()) { partitionFieldMap.put(partitionField.name(), partitionField); @@ -154,12 +156,12 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(getPartitionableRecord(record)); + routingPartitionKey.partition(getPartitionableRecord(record)); - if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) { return false; } - RecordWriter writer = fetchWriterForPartition(partitionKey); + RecordWriter writer = fetchWriterForPartition(routingPartitionKey); writer.write(record); return true; } @@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { RecordWriter recordWriter = writers.getIfPresent(partitionKey); if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { + // each writer must have its own PartitionKey object + PartitionKey copy = partitionKey.copy(); // calling invalidate for a non-existent key is a safe operation - writers.invalidate(partitionKey); - recordWriter = createWriter(partitionKey); - writers.put(partitionKey, recordWriter); + writers.invalidate(copy); + recordWriter = createWriter(copy); + writers.put(copy, recordWriter); } return recordWriter; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index a060bc16d6c7..39b8899456b5 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -29,10 +29,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -68,6 +71,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -78,7 +82,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.PartitionUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -263,6 +270,22 @@ private List populateTable(Table table) throws IOException { return expectedRows; } + private static Map constantsMap( + FileScanTask task, + BiFunction converter, + org.apache.iceberg.Schema schema) { + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); + + if (projectsIdentityPartitionColumns) { + return PartitionUtil.constantsMap(task, converter); + } else { + return Collections.emptyMap(); + } + } + private List readRecords(Table table) { Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); @@ -271,13 +294,16 @@ private List readRecords(Table table) { InputFilesDecryptor descryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); for (FileScanTask fileTask : task.files()) { + Map idToConstants = + constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema); InputFile inputFile = descryptor.getInputFile(fileTask); CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) .project(tableSchema) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(tableSchema, fileSchema, idToConstants)) .filter(fileTask.residual()) .build(); From 8f8971cb935d1ab2cd3ca83fe03d66247fd51452 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 9 Jan 2025 13:23:09 -0500 Subject: [PATCH 2/2] cleanup --- CHANGES.md | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 410b6388bbdb..66a10f69417c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,38 +63,6 @@ ## I/Os -* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). - -## New Features / Improvements - -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)). - -## Breaking Changes - -* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). -* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)), but forced Debezium IO to use protobuf 3 ([#33541](https://github.com/apache/beam/issues/33541) because Debezium clients are not protobuf 4 compatible. This may cause conflicts when using clients which are only compatible with protobuf 4. - -## Deprecations - -* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). - -## Bugfixes - -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) - -## Security Fixes -* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). - -## Known Issues - -* ([#X](https://github.com/apache/beam/issues/X)). - -# [2.62.0] - Unreleased - -## I/Os - * gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))