Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
}
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
## Bugfixes

* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)).

## Security Fixes

Expand Down Expand Up @@ -157,6 +158,11 @@
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))
* (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)).

## Known Issues

* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.60.0] - 2024-10-17

## Highlights
Expand Down Expand Up @@ -211,6 +217,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
* Fixed in 2.61.0.
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.59.0] - 2024-09-11

Expand Down Expand Up @@ -259,6 +267,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
* Fixed in 2.61.0.
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.58.1] - 2024-08-15

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class DestinationState {
private final IcebergDestination icebergDestination;
private final PartitionSpec spec;
private final org.apache.iceberg.Schema schema;
private final PartitionKey partitionKey;
// used to determine the partition to which a record belongs
// must not be directly used to create a writer
private final PartitionKey routingPartitionKey;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
final Cache<PartitionKey, RecordWriter> writers;
Expand All @@ -109,7 +111,7 @@ class DestinationState {
this.icebergDestination = icebergDestination;
this.schema = table.schema();
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.routingPartitionKey = new PartitionKey(spec, schema);
this.table = table;
for (PartitionField partitionField : spec.fields()) {
partitionFieldMap.put(partitionField.name(), partitionField);
Expand Down Expand Up @@ -154,12 +156,12 @@ class DestinationState {
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
*/
boolean write(Record record) {
partitionKey.partition(getPartitionableRecord(record));
routingPartitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) {
if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You aren't guaranteed that asMap is cheap. It could be linear time to convert from some other underlying data structure. It is better to call get and check if the result was null. (since this is not a LoadingCache you don't have to worry about accidentally creating too many entries)

(maybe think about it for future changes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip, will do

return false;
}
RecordWriter writer = fetchWriterForPartition(partitionKey);
RecordWriter writer = fetchWriterForPartition(routingPartitionKey);
writer.write(record);
return true;
}
Expand All @@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) {
RecordWriter recordWriter = writers.getIfPresent(partitionKey);

if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) {
// each writer must have its own PartitionKey object
PartitionKey copy = partitionKey.copy();
// calling invalidate for a non-existent key is a safe operation
writers.invalidate(partitionKey);
recordWriter = createWriter(partitionKey);
writers.put(partitionKey, recordWriter);
writers.invalidate(copy);
recordWriter = createWriter(copy);
writers.put(copy, recordWriter);
}
return recordWriter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -65,6 +68,7 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand All @@ -74,7 +78,10 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -289,6 +296,22 @@ private List<Row> populateTable(Table table) throws IOException {
return expectedRows;
}

private static Map<Integer, ?> constantsMap(
FileScanTask task,
BiFunction<Type, Object, Object> converter,
org.apache.iceberg.Schema schema) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();

if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
} else {
return Collections.emptyMap();
}
}

private List<Record> readRecords(Table table) {
org.apache.iceberg.Schema tableSchema = table.schema();
TableScan tableScan = table.newScan().project(tableSchema);
Expand All @@ -297,13 +320,16 @@ private List<Record> readRecords(Table table) {
InputFilesDecryptor descryptor =
new InputFilesDecryptor(task, table.io(), table.encryption());
for (FileScanTask fileTask : task.files()) {
Map<Integer, ?> idToConstants =
constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema);
InputFile inputFile = descryptor.getInputFile(fileTask);
CloseableIterable<Record> iterable =
Parquet.read(inputFile)
.split(fileTask.start(), fileTask.length())
.project(tableSchema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema))
fileSchema ->
GenericParquetReaders.buildReader(tableSchema, fileSchema, idToConstants))
.filter(fileTask.residual())
.build();

Expand Down
Loading