Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 4
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Improvements to the performance of BigqueryIO when using withPropagateSuccessfulStorageApiWrites(true) method (Java) ([#31840](https://github.com/apache/beam/pull/31840)).
* [Managed Iceberg] Added support for writing to partitioned tables ([#32102](https://github.com/apache/beam/pull/32102))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;

import java.io.IOException;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -34,23 +31,37 @@
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RecordWriter {

private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters");
private final DataWriter<Record> icebergDataWriter;

private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;

RecordWriter(Catalog catalog, IcebergDestination destination, String filename)
RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
throws IOException {
this(
catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename);
catalog.loadTable(destination.getTableIdentifier()),
destination.getFileFormat(),
filename,
partitionKey);
}

RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException {
RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey)
throws IOException {
this.table = table;
this.absoluteFilename = table.location() + "/" + filename;
this.fileFormat = fileFormat;
if (table.spec().isUnpartitioned()) {
absoluteFilename = table.locationProvider().newDataLocation(filename);
} else {
absoluteFilename =
table.locationProvider().newDataLocation(table.spec(), partitionKey, filename);
}
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);

switch (fileFormat) {
Expand All @@ -60,6 +71,7 @@ class RecordWriter {
.createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.overwrite()
.build();
break;
Expand All @@ -69,6 +81,7 @@ class RecordWriter {
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.overwrite()
.build();
break;
Expand All @@ -77,34 +90,38 @@ class RecordWriter {
default:
throw new RuntimeException("Unknown File Format: " + fileFormat);
}
activeWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
absoluteFilename);
}

public void write(Row row) {
Record record = beamRowToIcebergRecord(table.schema(), row);
public void write(Record record) {
icebergDataWriter.write(record);
}

public void close() throws IOException {
icebergDataWriter.close();
}

public Table getTable() {
return table;
try {
icebergDataWriter.close();
} catch (IOException e) {
throw new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
}
activeWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
}

public long bytesWritten() {
return icebergDataWriter.length();
}

public ManifestFile getManifestFile() throws IOException {
String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest");
OutputFile outputFile = table.io().newOutputFile(manifestFilename);
ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(getTable().spec(), outputFile)) {
openWriter.add(icebergDataWriter.toDataFile());
manifestWriter = openWriter;
}

return manifestWriter.toManifestFile();
public DataFile getDataFile() {
return icebergDataWriter.toDataFile();
}
}
Loading