Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
4 changes: 3 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ dependencies {
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
implementation library.java.hadoop_common
implementation library.java.jackson_core
implementation library.java.jackson_databind

testImplementation project(":sdks:java:managed")
testImplementation library.java.hadoop_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -54,7 +53,8 @@
import org.slf4j.LoggerFactory;

class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {
extends PTransform<
PCollection<FileWriteResult>, PCollection<KV<TableIdentifier, SnapshotInfo>>> {
private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
private final IcebergCatalogConfig catalogConfig;
private final String manifestFilePrefix;
Expand All @@ -65,28 +65,31 @@ class AppendFilesToTables
}

@Override
public PCollection<KV<String, SnapshotInfo>> expand(PCollection<FileWriteResult> writtenFiles) {
public PCollection<KV<TableIdentifier, SnapshotInfo>> expand(
PCollection<FileWriteResult> writtenFiles) {

// Apply any sharded writes and flatten everything for catalog updates
return writtenFiles
.apply(
"Key metadata updates by table",
WithKeys.of(
new SerializableFunction<FileWriteResult, String>() {
new SerializableFunction<FileWriteResult, TableIdentifier>() {
@Override
public String apply(FileWriteResult input) {
return input.getTableIdentifier().toString();
public TableIdentifier apply(FileWriteResult input) {
return input.getTableIdentifier();
}
}))
.setCoder(KvCoder.of(TableIdentifierCoder.of(), FileWriteResult.CODER))
.apply("Group metadata updates by table", GroupByKey.create())
.apply(
"Append metadata updates to tables",
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
.setCoder(KvCoder.of(TableIdentifierCoder.of(), SnapshotInfo.CODER));
}

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
extends DoFn<
KV<TableIdentifier, Iterable<FileWriteResult>>, KV<TableIdentifier, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
private final Distribution committedDataFileByteSize =
Expand Down Expand Up @@ -123,17 +126,17 @@ private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> fileWri

@ProcessElement
public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
@Element KV<TableIdentifier, Iterable<FileWriteResult>> element,
OutputReceiver<KV<TableIdentifier, SnapshotInfo>> out,
BoundedWindow window)
throws IOException {
String tableStringIdentifier = element.getKey();
TableIdentifier tableIdentifier = element.getKey();
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (!fileWriteResults.iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
Table table = getCatalog().loadTable(tableIdentifier);

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
Expand All @@ -146,7 +149,7 @@ public void processElement(
}

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
LOG.info("Created new snapshot for table '{}': {}", tableIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -29,6 +28,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.catalog.TableIdentifier;
import org.joda.time.Instant;

/**
Expand All @@ -37,7 +37,8 @@
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<KV<String, Row>>> {
class AssignDestinations
extends PTransform<PCollection<Row>, PCollection<KV<TableIdentifier, Row>>> {

private final DynamicDestinations dynamicDestinations;

Expand All @@ -46,27 +47,30 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
}

@Override
public PCollection<KV<String, Row>> expand(PCollection<Row> input) {
public PCollection<KV<TableIdentifier, Row>> expand(PCollection<Row> input) {
return input
.apply(
ParDo.of(
new DoFn<Row, KV<String, Row>>() {
new DoFn<Row, KV<TableIdentifier, Row>>() {
@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<KV<String, Row>> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
OutputReceiver<KV<TableIdentifier, Row>> out) {
TableIdentifier tableIdentifier =
dynamicDestinations
.getTableIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo))
.toTableIdentifier();
Row data = dynamicDestinations.getData(element);

out.output(KV.of(tableIdentifier, data));
}
}))
.setCoder(
KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
KvCoder.of(
TableIdentifierCoder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public interface DynamicDestinations extends Serializable {

Row getData(Row element);

IcebergDestination instantiateDestination(String destination);
IcebergDestination instantiateDestination(TableIdentifier destination);

String getTableStringIdentifier(ValueInSingleWindow<Row> element);
SerializableTableIdentifier getTableIdentifier(ValueInSingleWindow<Row> element);

static DynamicDestinations singleTable(TableIdentifier tableId, Schema inputSchema) {
return new OneTableDynamicDestinations(tableId, inputSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.google.auto.value.AutoValue;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.iceberg.DataFile;
Expand All @@ -30,18 +33,29 @@
@AutoValue
@DefaultSchema(AutoValueSchema.class)
abstract class FileWriteResult {
public static final SchemaCoder<FileWriteResult> CODER;

static {
try {
SchemaRegistry registry = SchemaRegistry.createDefault();
CODER = registry.getSchemaCoder(FileWriteResult.class);

} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
}

private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
private transient @MonotonicNonNull DataFile cachedDataFile;

abstract String getTableIdentifierString();
abstract SerializableTableIdentifier getSerializableTableIdentifier();

abstract SerializableDataFile getSerializableDataFile();

@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
cachedTableIdentifier = getSerializableTableIdentifier().toTableIdentifier();
}
return cachedTableIdentifier;
}
Expand All @@ -61,13 +75,13 @@ public static Builder builder() {
@AutoValue.Builder
abstract static class Builder {

abstract Builder setTableIdentifierString(String tableIdString);
abstract Builder setSerializableTableIdentifier(SerializableTableIdentifier tableId);

abstract Builder setSerializableDataFile(SerializableDataFile dataFile);

@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
return setSerializableTableIdentifier(SerializableTableIdentifier.of(tableId));
}

public abstract FileWriteResult build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.catalog.TableIdentifier;

/**
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and
Expand Down Expand Up @@ -86,7 +85,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.getPipeline()
.apply(
IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable())));
.from(IcebergUtils.parseTableIdentifier(configuration.getTable())));

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ public enum ScanType {
public abstract IcebergCatalogConfig getCatalogConfig();

@Pure
public abstract String getTableIdentifier();
abstract SerializableTableIdentifier getSerializableTableIdentifier();

@Pure
public TableIdentifier getTableIdentifier() {
return getSerializableTableIdentifier().toTableIdentifier();
}

@Pure
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
cachedTable = getCatalogConfig().catalog().loadTable(getTableIdentifier());
}
return cachedTable;
}
Expand Down Expand Up @@ -123,10 +127,10 @@ public abstract static class Builder {

public abstract Builder setCatalogConfig(IcebergCatalogConfig catalog);

public abstract Builder setTableIdentifier(String tableIdentifier);
abstract Builder setSerializableTableIdentifier(SerializableTableIdentifier tableIdentifier);

public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
return this.setTableIdentifier(tableIdentifier.toString());
return this.setSerializableTableIdentifier(SerializableTableIdentifier.of(tableIdentifier));
}

public Builder setTableIdentifier(String... names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -36,6 +39,8 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
Expand All @@ -47,6 +52,9 @@

/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand Down Expand Up @@ -506,4 +514,13 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}

public static TableIdentifier parseTableIdentifier(String table) {
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(table);
return TableIdentifierParser.fromJson(jsonNode);
} catch (JsonProcessingException e) {
return TableIdentifier.parse(table);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;

public final class IcebergWriteResult implements POutput {

Expand All @@ -35,13 +36,13 @@ public final class IcebergWriteResult implements POutput {

private final Pipeline pipeline;

private final PCollection<KV<String, SnapshotInfo>> snapshots;
private final PCollection<KV<TableIdentifier, SnapshotInfo>> snapshots;

public PCollection<KV<String, SnapshotInfo>> getSnapshots() {
public PCollection<KV<TableIdentifier, SnapshotInfo>> getSnapshots() {
return snapshots;
}

IcebergWriteResult(Pipeline pipeline, PCollection<KV<String, SnapshotInfo>> snapshots) {
IcebergWriteResult(Pipeline pipeline, PCollection<KV<TableIdentifier, SnapshotInfo>> snapshots) {
this.pipeline = pipeline;
this.snapshots = snapshots;
}
Expand Down
Loading
Loading