diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 6e39575c1679..caa72bef7536 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -43,27 +43,20 @@ def hive_version = "3.1.3" dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:io:hadoop-common") implementation library.java.slf4j_api + implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" - implementation "org.apache.parquet:parquet-common:$parquet_version" implementation "org.apache.orc:orc-core:$orc_version" implementation "org.apache.iceberg:iceberg-core:$iceberg_version" implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" - implementation "org.apache.iceberg:iceberg-arrow:$iceberg_version" - implementation "org.apache.iceberg:iceberg-data:$iceberg_version" + implementation library.java.avro + implementation library.java.hadoop_common - - - provided library.java.avro - provided library.java.hadoop_client - permitUnusedDeclared library.java.hadoop_client - provided library.java.hadoop_common testImplementation library.java.hadoop_client - testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" + testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.junit testRuntimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java index 4b9e3102a02e..8b8d852e106b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java @@ -17,9 +17,16 @@ */ package org.apache.beam.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; public class IcebergIO { @@ -28,6 +35,10 @@ public static WriteRows writeToDynamicDestinations( return new WriteRows(catalog, dynamicDestinations); } + public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) { + return new ReadTable(catalogConfig, tableId); + } + static class WriteRows extends PTransform, IcebergWriteResult> { private final IcebergCatalogConfig catalog; @@ -47,4 +58,36 @@ public IcebergWriteResult expand(PCollection input) { "Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations)); } } + + public static class ReadTable extends PTransform> { + + private final IcebergCatalogConfig catalogConfig; + private final transient @Nullable TableIdentifier tableId; + + private TableIdentifier getTableId() { + return checkStateNotNull( + tableId, "Transient field tableId null; it should not be accessed after serialization"); + } + + private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) { + this.catalogConfig = catalogConfig; + this.tableId = tableId; + } + + @Override + public PCollection expand(PBegin input) { + + Table table = catalogConfig.catalog().loadTable(getTableId()); + + return input.apply( + Read.from( + new ScanSource( + IcebergScanConfig.builder() + .setCatalogConfig(catalogConfig) + .setScanType(IcebergScanConfig.ScanType.TABLE) + .setTableIdentifier(getTableId()) + .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema())) + .build()))); + } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergScanConfig.java new file mode 100644 index 000000000000..c2bda838997a --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergScanConfig.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.dataflow.qual.Pure; + +@AutoValue +public abstract class IcebergScanConfig implements Serializable { + + private transient @MonotonicNonNull Table cachedTable; + + public enum ScanType { + TABLE, + BATCH + } + + @Pure + public abstract ScanType getScanType(); + + @Pure + public abstract IcebergCatalogConfig getCatalogConfig(); + + @Pure + public abstract String getTableIdentifier(); + + @Pure + public Table getTable() { + if (cachedTable == null) { + cachedTable = + getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier())); + } + return cachedTable; + } + + @Pure + public abstract Schema getSchema(); + + @Pure + public abstract @Nullable Expression getFilter(); + + @Pure + public abstract @Nullable Boolean getCaseSensitive(); + + @Pure + public abstract ImmutableMap getOptions(); + + @Pure + public abstract @Nullable Long getSnapshot(); + + @Pure + public abstract @Nullable Long getTimestamp(); + + @Pure + public abstract @Nullable Long getFromSnapshotInclusive(); + + @Pure + public abstract @Nullable String getFromSnapshotRefInclusive(); + + @Pure + public abstract @Nullable Long getFromSnapshotExclusive(); + + @Pure + public abstract @Nullable String getFromSnapshotRefExclusive(); + + @Pure + public abstract @Nullable Long getToSnapshot(); + + @Pure + public abstract @Nullable String getToSnapshotRef(); + + @Pure + public abstract @Nullable String getTag(); + + @Pure + public abstract @Nullable String getBranch(); + + @Pure + public static Builder builder() { + return new AutoValue_IcebergScanConfig.Builder() + .setScanType(ScanType.TABLE) + .setFilter(null) + .setCaseSensitive(null) + .setOptions(ImmutableMap.of()) + .setSnapshot(null) + .setTimestamp(null) + .setFromSnapshotInclusive(null) + .setFromSnapshotRefInclusive(null) + .setFromSnapshotExclusive(null) + .setFromSnapshotRefExclusive(null) + .setToSnapshot(null) + .setToSnapshotRef(null) + .setTag(null) + .setBranch(null); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setScanType(ScanType type); + + public abstract Builder setCatalogConfig(IcebergCatalogConfig catalog); + + public abstract Builder setTableIdentifier(String tableIdentifier); + + public Builder setTableIdentifier(TableIdentifier tableIdentifier) { + return this.setTableIdentifier(tableIdentifier.toString()); + } + + public Builder setTableIdentifier(String... names) { + return setTableIdentifier(TableIdentifier.of(names)); + } + + public abstract Builder setSchema(Schema schema); + + public abstract Builder setFilter(@Nullable Expression filter); + + public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive); + + public abstract Builder setOptions(ImmutableMap options); + + public abstract Builder setSnapshot(@Nullable Long snapshot); + + public abstract Builder setTimestamp(@Nullable Long timestamp); + + public abstract Builder setFromSnapshotInclusive(@Nullable Long fromInclusive); + + public abstract Builder setFromSnapshotRefInclusive(@Nullable String ref); + + public abstract Builder setFromSnapshotExclusive(@Nullable Long fromExclusive); + + public abstract Builder setFromSnapshotRefExclusive(@Nullable String ref); + + public abstract Builder setToSnapshot(@Nullable Long snapshot); + + public abstract Builder setToSnapshotRef(@Nullable String ref); + + public abstract Builder setTag(@Nullable String tag); + + public abstract Builder setBranch(@Nullable String branch); + + public abstract IcebergScanConfig build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java index d9a3427c11fc..51df91ccadd3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java @@ -17,7 +17,7 @@ */ package org.apache.beam.io.iceberg; -import static org.apache.beam.io.iceberg.RowHelper.rowToRecord; +import static org.apache.beam.io.iceberg.SchemaAndRowConversions.rowToRecord; import java.io.IOException; import org.apache.beam.sdk.values.Row; @@ -30,7 +30,6 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; class RecordWriter { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java deleted file mode 100644 index 92f55208a0da..000000000000 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.iceberg; - -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.UUID; -import org.apache.beam.sdk.values.Row; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.types.Types.NestedField; - -class RowHelper { - - // static helper functions only - private RowHelper() {} - - public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) { - return copy(GenericRecord.create(schema), row); - } - - private static Record copy(Record baseRecord, Row value) { - Record rec = baseRecord.copy(); - for (NestedField f : rec.struct().fields()) { - copyInto(rec, f, value); - } - return rec; - } - - private static void copyInto(Record rec, NestedField field, Row value) { - String name = field.name(); - switch (field.type().typeId()) { - case BOOLEAN: - Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> rec.setField(name, v)); - break; - case INTEGER: - Optional.ofNullable(value.getInt32(name)).ifPresent(v -> rec.setField(name, v)); - break; - case LONG: - Optional.ofNullable(value.getInt64(name)).ifPresent(v -> rec.setField(name, v)); - break; - case FLOAT: - Optional.ofNullable(value.getFloat(name)).ifPresent(v -> rec.setField(name, v)); - break; - case DOUBLE: - Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); - break; - case DATE: - throw new UnsupportedOperationException("Date fields not yet supported"); - case TIME: - throw new UnsupportedOperationException("Time fields not yet supported"); - case TIMESTAMP: - Optional.ofNullable(value.getDateTime(name)) - .ifPresent(v -> rec.setField(name, v.getMillis())); - break; - case STRING: - Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); - break; - case UUID: - Optional.ofNullable(value.getBytes(name)) - .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v))); - break; - case FIXED: - throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); - case BINARY: - Optional.ofNullable(value.getBytes(name)) - .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v))); - break; - case DECIMAL: - Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> rec.setField(name, v)); - break; - case STRUCT: - Optional.ofNullable(value.getRow(name)) - .ifPresent( - row -> - rec.setField( - name, copy(GenericRecord.create(field.type().asStructType()), row))); - break; - case LIST: - throw new UnsupportedOperationException("List fields are not yet supported."); - case MAP: - throw new UnsupportedOperationException("Map fields are not yet supported."); - } - } -} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanSource.java new file mode 100644 index 000000000000..fb77c79281fa --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanSource.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; + +/** + * Source that reads all the data in a table described by an IcebergScanConfig. Supports only + * initial spliting. + */ +class ScanSource extends BoundedSource { + + private IcebergScanConfig scanConfig; + + public ScanSource(IcebergScanConfig scanConfig) { + this.scanConfig = scanConfig; + } + + private TableScan getTableScan() { + TableScan tableScan = + scanConfig + .getTable() + .newScan() + .project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema())); + + if (scanConfig.getFilter() != null) { + tableScan = tableScan.filter(scanConfig.getFilter()); + } + if (scanConfig.getCaseSensitive() != null) { + tableScan = tableScan.caseSensitive(scanConfig.getCaseSensitive()); + } + if (scanConfig.getSnapshot() != null) { + tableScan = tableScan.useSnapshot(scanConfig.getSnapshot()); + } + if (scanConfig.getBranch() != null) { + tableScan = tableScan.useRef(scanConfig.getBranch()); + } else if (scanConfig.getTag() != null) { + tableScan = tableScan.useRef(scanConfig.getTag()); + } + + return tableScan; + } + + private CombinedScanTask wholeTableReadTask() { + // Always project to our destination schema + return new BaseCombinedScanTask(ImmutableList.copyOf(getTableScan().planFiles())); + } + + @Override + public List> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + ArrayList splits = new ArrayList<>(); + + switch (scanConfig.getScanType()) { + case TABLE: + TableScan tableScan = getTableScan(); + if (desiredBundleSizeBytes > 0) { + tableScan = + tableScan.option(TableProperties.SPLIT_SIZE, Long.toString(desiredBundleSizeBytes)); + } + + try (CloseableIterable tasks = tableScan.planTasks()) { + for (CombinedScanTask combinedScanTask : tasks) { + splits.add(new ScanTaskSource(scanConfig, combinedScanTask)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + case BATCH: + throw new UnsupportedOperationException("BATCH scan not supported"); + default: + throw new UnsupportedOperationException("Unknown scan type: " + scanConfig.getScanType()); + } + + return splits; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return wholeTableReadTask().sizeBytes(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + } + + @Override + public Coder getOutputCoder() { + return RowCoder.of(scanConfig.getSchema()); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return new ScanTaskReader(new ScanTaskSource(scanConfig, wholeTableReadTask())); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanTaskReader.java new file mode 100644 index 000000000000..3b9aae56f69a --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanTaskReader.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.NoSuchElementException; +import java.util.Queue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.InputFilesDecryptor; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ScanTaskReader extends BoundedSource.BoundedReader { + private static final Logger LOG = LoggerFactory.getLogger(ScanTaskReader.class); + + private final ScanTaskSource source; + private final org.apache.iceberg.Schema project; + + transient @Nullable FileIO io; + transient @Nullable InputFilesDecryptor decryptor; + transient @Nullable Queue fileScanTasks; + transient @Nullable CloseableIterator currentIterator; + transient @Nullable Record current; + + public ScanTaskReader(ScanTaskSource source) { + this.source = source; + this.project = SchemaAndRowConversions.beamSchemaToIcebergSchema(source.getSchema()); + } + + @Override + public boolean start() throws IOException { + Table table = source.getTable(); + EncryptionManager encryptionManager = table.encryption(); + + current = null; + io = table.io(); + decryptor = new InputFilesDecryptor(source.getTask(), io, encryptionManager); + fileScanTasks = new ArrayDeque<>(); + fileScanTasks.addAll(source.getTask().files()); + + return advance(); + } + + @Override + public boolean advance() throws IOException { + Queue fileScanTasks = + checkStateNotNull(this.fileScanTasks, "files null in advance() - did you call start()?"); + InputFilesDecryptor decryptor = + checkStateNotNull(this.decryptor, "decryptor null in adance() - did you call start()?"); + + // This nullness annotation is incorrect, but the most expedient way to work with Iceberg's APIs + // which are not null-safe. + @SuppressWarnings("nullness") + org.apache.iceberg.@NonNull Schema project = this.project; + + do { + // If our current iterator is working... do that. + if (currentIterator != null && currentIterator.hasNext()) { + current = currentIterator.next(); + return true; + } + + // Close out the current iterator and try to open a new one + if (currentIterator != null) { + currentIterator.close(); + currentIterator = null; + } + + LOG.info("Trying to open new file."); + if (fileScanTasks.isEmpty()) { + LOG.info("We have exhausted all available files in this CombinedScanTask"); + break; + } + + // We have a new file to start reading + FileScanTask fileTask = fileScanTasks.remove(); + DataFile file = fileTask.file(); + InputFile input = decryptor.getInputFile(fileTask); + + CloseableIterable iterable; + switch (file.format()) { + case ORC: + LOG.info("Preparing ORC input"); + iterable = + ORC.read(input) + .split(fileTask.start(), fileTask.length()) + .project(project) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(project, fileSchema)) + .filter(fileTask.residual()) + .build(); + break; + case PARQUET: + LOG.info("Preparing Parquet input."); + iterable = + Parquet.read(input) + .split(fileTask.start(), fileTask.length()) + .project(project) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(project, fileSchema)) + .filter(fileTask.residual()) + .build(); + break; + case AVRO: + LOG.info("Preparing Avro input."); + iterable = + Avro.read(input) + .split(fileTask.start(), fileTask.length()) + .project(project) + .createReaderFunc(DataReader::create) + .build(); + break; + default: + throw new UnsupportedOperationException("Cannot read format: " + file.format()); + } + currentIterator = iterable.iterator(); + + } while (true); + + return false; + } + + @Override + public Row getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return SchemaAndRowConversions.recordToRow(source.getSchema(), current); + } + + @Override + public void close() throws IOException { + if (currentIterator != null) { + currentIterator.close(); + currentIterator = null; + } + if (fileScanTasks != null) { + fileScanTasks.clear(); + fileScanTasks = null; + } + if (io != null) { + io.close(); + io = null; + } + } + + @Override + public BoundedSource getCurrentSource() { + return source; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanTaskSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanTaskSource.java new file mode 100644 index 000000000000..8c44d174b625 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/ScanTaskSource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.Table; +import org.checkerframework.dataflow.qual.Pure; + +/** + * Source that reads the data described by a single CombinedScanTask. This source is not splittable. + */ +class ScanTaskSource extends BoundedSource { + + private final CombinedScanTask task; + private final IcebergScanConfig scanConfig; + + public ScanTaskSource(IcebergScanConfig scanConfig, CombinedScanTask task) { + this.scanConfig = scanConfig; + this.task = task; + } + + @Pure + Table getTable() { + return scanConfig.getTable(); + } + + @Pure + CombinedScanTask getTask() { + return task; + } + + @Pure + Schema getSchema() { + return scanConfig.getSchema(); + } + + @Override + public List> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return ImmutableList.of(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return task.sizeBytes(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + } + + @Override + public Coder getOutputCoder() { + return RowCoder.of(scanConfig.getSchema()); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return new ScanTaskReader(this); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaAndRowConversions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaAndRowConversions.java new file mode 100644 index 000000000000..e0210c96d685 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaAndRowConversions.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.UUID; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +class SchemaAndRowConversions { + + private SchemaAndRowConversions() {} + + public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID"; + + public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { + switch (type.typeId()) { + case BOOLEAN: + return Schema.FieldType.BOOLEAN; + case INTEGER: + return Schema.FieldType.INT32; + case LONG: + return Schema.FieldType.INT64; + case FLOAT: + return Schema.FieldType.FLOAT; + case DOUBLE: + return Schema.FieldType.DOUBLE; + case DATE: + case TIME: + case TIMESTAMP: // TODO: Logical types? + return Schema.FieldType.DATETIME; + case STRING: + return Schema.FieldType.STRING; + case UUID: + case BINARY: + return Schema.FieldType.BYTES; + case FIXED: + case DECIMAL: + return Schema.FieldType.DECIMAL; + case STRUCT: + return Schema.FieldType.row(icebergStructTypeToBeamSchema(type.asStructType())); + case LIST: + return Schema.FieldType.iterable( + icebergTypeToBeamFieldType(type.asListType().elementType())); + case MAP: + return Schema.FieldType.map( + icebergTypeToBeamFieldType(type.asMapType().keyType()), + icebergTypeToBeamFieldType(type.asMapType().valueType())); + } + throw new RuntimeException("Unrecognized IcebergIO Type"); + } + + public static Schema.Field icebergFieldToBeamField(final Types.NestedField field) { + return Schema.Field.of(field.name(), icebergTypeToBeamFieldType(field.type())) + .withOptions( + Schema.Options.builder() + .setOption( + ICEBERG_TYPE_OPTION_NAME, Schema.FieldType.STRING, field.type().typeId().name()) + .build()) + .withNullable(field.isOptional()); + } + + public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema schema) { + Schema.Builder builder = Schema.builder(); + for (Types.NestedField f : schema.columns()) { + builder.addField(icebergFieldToBeamField(f)); + } + return builder.build(); + } + + public static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) { + Schema.Builder builder = Schema.builder(); + for (Types.NestedField f : struct.fields()) { + builder.addField(icebergFieldToBeamField(f)); + } + return builder.build(); + } + + public static Types.NestedField beamFieldToIcebergField(int fieldId, final Schema.Field field) { + String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME, String.class); + if (typeId != null) { + return Types.NestedField.of( + fieldId, + field.getType().getNullable(), + field.getName(), + Types.fromPrimitiveString(typeId)); + } else { + return Types.NestedField.of( + fieldId, field.getType().getNullable(), field.getName(), Types.StringType.get()); + } + } + + public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) { + Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()]; + int fieldId = 0; + for (Schema.Field f : schema.getFields()) { + fields[fieldId++] = beamFieldToIcebergField(fieldId, f); + } + return new org.apache.iceberg.Schema(fields); + } + + public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) { + return copyRowIntoRecord(GenericRecord.create(schema), row); + } + + private static Record copyRowIntoRecord(Record baseRecord, Row value) { + Record rec = baseRecord.copy(); + for (Types.NestedField f : rec.struct().fields()) { + copyFieldIntoRecord(rec, f, value); + } + return rec; + } + + private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row value) { + String name = field.name(); + switch (field.type().typeId()) { + case BOOLEAN: + Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> rec.setField(name, v)); + break; + case INTEGER: + Optional.ofNullable(value.getInt32(name)).ifPresent(v -> rec.setField(name, v)); + break; + case LONG: + Optional.ofNullable(value.getInt64(name)).ifPresent(v -> rec.setField(name, v)); + break; + case FLOAT: + Optional.ofNullable(value.getFloat(name)).ifPresent(v -> rec.setField(name, v)); + break; + case DOUBLE: + Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); + break; + case DATE: + throw new UnsupportedOperationException("Date fields not yet supported"); + case TIME: + throw new UnsupportedOperationException("Time fields not yet supported"); + case TIMESTAMP: + Optional.ofNullable(value.getDateTime(name)) + .ifPresent(v -> rec.setField(name, v.getMillis())); + break; + case STRING: + Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); + break; + case UUID: + Optional.ofNullable(value.getBytes(name)) + .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v))); + break; + case FIXED: + throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); + case BINARY: + Optional.ofNullable(value.getBytes(name)) + .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v))); + break; + case DECIMAL: + Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> rec.setField(name, v)); + break; + case STRUCT: + Optional.ofNullable(value.getRow(name)) + .ifPresent( + row -> + rec.setField( + name, + copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row))); + break; + case LIST: + throw new UnsupportedOperationException("List fields are not yet supported."); + case MAP: + throw new UnsupportedOperationException("Map fields are not yet supported."); + } + } + + public static Row recordToRow(Schema schema, Record record) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Schema.Field field : schema.getFields()) { + switch (field.getType().getTypeName()) { + case BYTE: + // I guess allow anything we can cast here + byte byteValue = (byte) record.getField(field.getName()); + rowBuilder.addValue(byteValue); + break; + case INT16: + // I guess allow anything we can cast here + short shortValue = (short) record.getField(field.getName()); + rowBuilder.addValue(shortValue); + break; + case INT32: + // I guess allow anything we can cast here + int intValue = (int) record.getField(field.getName()); + rowBuilder.addValue(intValue); + break; + case INT64: + // I guess allow anything we can cast here + long longValue = (long) record.getField(field.getName()); + rowBuilder.addValue(longValue); + break; + case DECIMAL: + // Iceberg and Beam both use BigDecimal + rowBuilder.addValue(record.getField(field.getName())); + break; + case FLOAT: + // Iceberg and Beam both use float + rowBuilder.addValue(record.getField(field.getName())); + break; + case DOUBLE: + // Iceberg and Beam both use double + rowBuilder.addValue(record.getField(field.getName())); + break; + case STRING: + // Iceberg and Beam both use String + rowBuilder.addValue(record.getField(field.getName())); + break; + case DATETIME: + // Iceberg uses a long for millis; Beam uses joda time DateTime + long millis = (long) record.getField(field.getName()); + rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC)); + break; + case BOOLEAN: + // Iceberg and Beam both use String + rowBuilder.addValue(record.getField(field.getName())); + break; + case BYTES: + // Iceberg uses ByteBuffer; Beam uses byte[] + rowBuilder.addValue(((ByteBuffer) record.getField(field.getName())).array()); + break; + case ARRAY: + throw new UnsupportedOperationException("Array fields are not yet supported."); + case ITERABLE: + throw new UnsupportedOperationException("Iterable fields are not yet supported."); + case MAP: + throw new UnsupportedOperationException("Map fields are not yet supported."); + case ROW: + Record nestedRecord = (Record) record.getField(field.getName()); + Schema nestedSchema = + checkArgumentNotNull( + field.getType().getRowSchema(), + "Corrupted schema: Row type did not have associated nested schema."); + Row nestedRow = recordToRow(nestedSchema, nestedRecord); + rowBuilder.addValue(nestedRow); + break; + case LOGICAL_TYPE: + throw new UnsupportedOperationException( + "Cannot convert iceberg field to Beam logical type"); + } + } + return rowBuilder.build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java deleted file mode 100644 index bbd23239de66..000000000000 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.iceberg; - -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; - -@SuppressWarnings({"dereference.of.nullable"}) -class SchemaHelper { - - private SchemaHelper() {} - - public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID"; - - public static Schema.FieldType fieldTypeForType(final Type type) { - switch (type.typeId()) { - case BOOLEAN: - return FieldType.BOOLEAN; - case INTEGER: - return FieldType.INT32; - case LONG: - return FieldType.INT64; - case FLOAT: - return FieldType.FLOAT; - case DOUBLE: - return FieldType.DOUBLE; - case DATE: - case TIME: - case TIMESTAMP: // TODO: Logical types? - return FieldType.DATETIME; - case STRING: - return FieldType.STRING; - case UUID: - case BINARY: - return FieldType.BYTES; - case FIXED: - case DECIMAL: - return FieldType.DECIMAL; - case STRUCT: - return FieldType.row(convert(type.asStructType())); - case LIST: - return FieldType.iterable(fieldTypeForType(type.asListType().elementType())); - case MAP: - return FieldType.map( - fieldTypeForType(type.asMapType().keyType()), - fieldTypeForType(type.asMapType().valueType())); - } - throw new RuntimeException("Unrecognized IcebergIO Type"); - } - - public static Schema.Field convert(final Types.NestedField field) { - return Schema.Field.of(field.name(), fieldTypeForType(field.type())) - .withOptions( - Schema.Options.builder() - .setOption( - ICEBERG_TYPE_OPTION_NAME, Schema.FieldType.STRING, field.type().typeId().name()) - .build()) - .withNullable(field.isOptional()); - } - - public static Schema convert(final org.apache.iceberg.Schema schema) { - Schema.Builder builder = Schema.builder(); - for (Types.NestedField f : schema.columns()) { - builder.addField(convert(f)); - } - return builder.build(); - } - - public static Schema convert(final Types.StructType struct) { - Schema.Builder builder = Schema.builder(); - for (Types.NestedField f : struct.fields()) { - builder.addField(convert(f)); - } - return builder.build(); - } - - public static Types.NestedField convert(int fieldId, final Schema.Field field) { - String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME, String.class); - if (typeId != null) { - return Types.NestedField.of( - fieldId, - field.getType().getNullable(), - field.getName(), - Types.fromPrimitiveString(typeId)); - } else { - return Types.NestedField.of( - fieldId, field.getType().getNullable(), field.getName(), Types.StringType.get()); - } - } - - public static org.apache.iceberg.Schema convert(final Schema schema) { - Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()]; - int fieldId = 0; - for (Schema.Field f : schema.getFields()) { - fields[fieldId++] = convert(fieldId, f); - } - return new org.apache.iceberg.Schema(fields); - } -} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java index 2499331beadc..6a19c510d52d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java @@ -66,7 +66,9 @@ private List getTestValues() throws Exception { RecordWriter writer = new RecordWriter(table, FileFormat.PARQUET, TEMPORARY_FOLDER.newFile().toString()); writer.write( - Row.withSchema(SchemaHelper.convert(TestFixtures.SCHEMA)).addValues(42L, "bizzle").build()); + Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .addValues(42L, "bizzle") + .build()); writer.close(); DataFile dataFile = writer.dataFile(); values.add( @@ -79,7 +81,9 @@ private List getTestValues() throws Exception { // An avro file writer = new RecordWriter(table, FileFormat.AVRO, TEMPORARY_FOLDER.newFile().toString()); writer.write( - Row.withSchema(SchemaHelper.convert(TestFixtures.SCHEMA)).addValues(42L, "bizzle").build()); + Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .addValues(42L, "bizzle") + .build()); writer.close(); dataFile = writer.dataFile(); values.add( @@ -107,7 +111,7 @@ private List getTestValues() throws Exception { // A parquet file in this other table writer = new RecordWriter(table2, FileFormat.PARQUET, TEMPORARY_FOLDER.newFile().toString()); writer.write( - Row.withSchema(SchemaHelper.convert(schema)) + Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(schema)) .addValues( 42L, "bizzle", diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java new file mode 100644 index 000000000000..0ae63439f76e --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class IcebergIOReadTest { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public TestPipeline testPipeline = TestPipeline.create(); + + static class PrintRow extends DoFn { + + @ProcessElement + public void process(@Element Row row, OutputReceiver output) throws Exception { + LOG.info("Got row {}", row); + output.output(row); + } + } + + @Test + public void testSimpleScan() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + final List expectedRows = + Stream.of( + TestFixtures.FILE1SNAPSHOT1, + TestFixtures.FILE2SNAPSHOT1, + TestFixtures.FILE3SNAPSHOT1) + .flatMap(List::stream) + .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) + .collect(Collectors.toList()); + + IcebergCatalogConfig catalogConfig = + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + + PCollection output = + testPipeline + .apply(IcebergIO.readTable(catalogConfig, tableId)) + .apply(ParDo.of(new PrintRow())) + .setCoder( + RowCoder.of( + SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows.toArray())); + return null; + }); + + testPipeline.run(); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java index c77d162aafd4..011ab2662457 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.io.iceberg; -import static org.apache.beam.io.iceberg.RowHelper.rowToRecord; +import static org.apache.beam.io.iceberg.SchemaAndRowConversions.rowToRecord; import static org.hamcrest.MatcherAssert.assertThat; import java.io.Serializable; @@ -86,7 +86,7 @@ public void testSimpleAppend() throws Exception { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) - .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); LOG.info("Executing pipeline"); @@ -153,7 +153,7 @@ public IcebergDestination instantiateDestination(Row dest) { TestFixtures.FILE1SNAPSHOT1, TestFixtures.FILE1SNAPSHOT2, TestFixtures.FILE1SNAPSHOT3)))) - .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); LOG.info("Executing pipeline"); @@ -236,7 +236,7 @@ public IcebergDestination instantiateDestination(Row dest) { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) - .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); LOG.info("Executing pipeline"); @@ -265,7 +265,7 @@ public void testIdempotentCommit() throws Exception { Record record = rowToRecord( table.schema(), - Row.withSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .addValues(42L, "bizzle") .build()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/RowHelperTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/RowHelperTest.java deleted file mode 100644 index 931937f407dd..000000000000 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/RowHelperTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.iceberg; - -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.values.Row; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class RowHelperTest implements Serializable { - - /** - * Checks a value that when converted to Iceberg type is the same value when interpreted in Java. - */ - private void checkTypeConversion(Schema.FieldType sourceType, Type destType, Object value) { - checkTypeConversion(sourceType, value, destType, value); - } - - private void checkTypeConversion( - Schema.FieldType sourceType, Object sourceValue, Type destType, Object destValue) { - Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType)); - Row row = Row.withSchema(beamSchema).addValues(sourceValue).build(); - - org.apache.iceberg.Schema icebergSchema = - new org.apache.iceberg.Schema(required(0, "v", destType)); - Record record = RowHelper.rowToRecord(icebergSchema, row); - - assertThat(record.getField("v"), equalTo(destValue)); - } - - @Test - public void testBoolean() throws Exception { - checkTypeConversion(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), true); - checkTypeConversion(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), false); - } - - @Test - public void testInteger() throws Exception { - checkTypeConversion(Schema.FieldType.INT32, Types.IntegerType.get(), -13); - checkTypeConversion(Schema.FieldType.INT32, Types.IntegerType.get(), 42); - checkTypeConversion(Schema.FieldType.INT32, Types.IntegerType.get(), 0); - } - - @Test - public void testLong() throws Exception { - checkTypeConversion(Schema.FieldType.INT64, Types.LongType.get(), 13L); - checkTypeConversion(Schema.FieldType.INT64, Types.LongType.get(), 42L); - } - - @Test - public void testFloat() throws Exception { - checkTypeConversion(Schema.FieldType.FLOAT, Types.FloatType.get(), 3.14159f); - checkTypeConversion(Schema.FieldType.FLOAT, Types.FloatType.get(), 42.0f); - } - - @Test - public void testDouble() throws Exception { - checkTypeConversion(Schema.FieldType.DOUBLE, Types.DoubleType.get(), 3.14159); - } - - @Test - public void testDate() throws Exception {} - - @Test - public void testTime() throws Exception {} - - @Test - public void testTimestamp() throws Exception { - DateTime dateTime = - new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - - checkTypeConversion( - Schema.FieldType.DATETIME, - dateTime.toInstant(), - Types.TimestampType.withoutZone(), - dateTime.getMillis()); - } - - @Test - public void testFixed() throws Exception {} - - @Test - public void testBinary() throws Exception { - byte[] bytes = new byte[] {1, 2, 3, 4}; - checkTypeConversion( - Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), ByteBuffer.wrap(bytes)); - } - - @Test - public void testDecimal() throws Exception {} - - @Test - public void testStruct() throws Exception {} - - @Test - public void testMap() throws Exception {} - - @Test - public void testList() throws Exception {} -} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/ScanSourceTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/ScanSourceTest.java new file mode 100644 index 000000000000..21ee3fd50a6c --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/ScanSourceTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import java.util.UUID; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ScanSourceTest { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Test + public void testUnstartedReaderReadsSamesItsSource() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + PipelineOptions options = PipelineOptionsFactory.create(); + + BoundedSource source = + new ScanSource( + IcebergScanConfig.builder() + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build()) + .setScanType(IcebergScanConfig.ScanType.TABLE) + .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) + .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .build()); + + BoundedSource.BoundedReader reader = source.createReader(options); + + SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(reader, options); + } + + @Test + public void testInitialSplitting() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + PipelineOptions options = PipelineOptionsFactory.create(); + + BoundedSource source = + new ScanSource( + IcebergScanConfig.builder() + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build()) + .setScanType(IcebergScanConfig.ScanType.TABLE) + .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) + .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .build()); + + // Input data for this test is tiny so try a number of very small split sizes + SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(1, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(2, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(5, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(10, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(100, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(1000, options), options); + } + + @Test + public void testDoubleInitialSplitting() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + PipelineOptions options = PipelineOptionsFactory.create(); + + BoundedSource source = + new ScanSource( + IcebergScanConfig.builder() + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build()) + .setScanType(IcebergScanConfig.ScanType.TABLE) + .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) + .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .build()); + + // Input data for this test is tiny so make sure to split and get a few, but so they can be + // split more + List> splits = source.split(100, options); + assertThat(splits.size(), Matchers.greaterThan(2)); + + // We are going to re-split this one + BoundedSource arbitrarySplit = splits.get(0); + + SourceTestUtils.assertSourcesEqualReferenceSource( + arbitrarySplit, arbitrarySplit.split(1, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource( + arbitrarySplit, arbitrarySplit.split(10, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource( + arbitrarySplit, arbitrarySplit.split(100, options), options); + SourceTestUtils.assertSourcesEqualReferenceSource( + arbitrarySplit, arbitrarySplit.split(1000, options), options); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/SchemaAndRowConversionsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/SchemaAndRowConversionsTest.java new file mode 100644 index 000000000000..225e7136811e --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/SchemaAndRowConversionsTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.ByteBuffer; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(Enclosed.class) +public class SchemaAndRowConversionsTest { + + @RunWith(JUnit4.class) + public static class RowToRecordTests { + /** + * Checks a value that when converted to Iceberg type is the same value when interpreted in + * Java. + */ + private void checkRowValueToRecordValue( + Schema.FieldType sourceType, Type destType, Object value) { + checkRowValueToRecordValue(sourceType, value, destType, value); + } + + private void checkRowValueToRecordValue( + Schema.FieldType sourceType, Object sourceValue, Type destType, Object destValue) { + Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType)); + Row row = Row.withSchema(beamSchema).addValues(sourceValue).build(); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(required(0, "v", destType)); + Record record = SchemaAndRowConversions.rowToRecord(icebergSchema, row); + + assertThat(record.getField("v"), equalTo(destValue)); + } + + @Test + public void testBoolean() throws Exception { + checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), true); + checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), false); + } + + @Test + public void testInteger() throws Exception { + checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), -13); + checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), 42); + checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), 0); + } + + @Test + public void testLong() throws Exception { + checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 13L); + checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 42L); + } + + @Test + public void testFloat() throws Exception { + checkRowValueToRecordValue(Schema.FieldType.FLOAT, Types.FloatType.get(), 3.14159f); + checkRowValueToRecordValue(Schema.FieldType.FLOAT, Types.FloatType.get(), 42.0f); + } + + @Test + public void testDouble() throws Exception { + checkRowValueToRecordValue(Schema.FieldType.DOUBLE, Types.DoubleType.get(), 3.14159); + } + + @Test + public void testDate() throws Exception {} + + @Test + public void testTime() throws Exception {} + + @Test + public void testTimestamp() throws Exception { + DateTime dateTime = + new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); + + checkRowValueToRecordValue( + Schema.FieldType.DATETIME, + dateTime.toInstant(), + Types.TimestampType.withoutZone(), + dateTime.getMillis()); + } + + @Test + public void testFixed() throws Exception {} + + @Test + public void testBinary() throws Exception { + byte[] bytes = new byte[] {1, 2, 3, 4}; + checkRowValueToRecordValue( + Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), ByteBuffer.wrap(bytes)); + } + + @Test + public void testDecimal() throws Exception {} + + @Test + public void testStruct() throws Exception {} + + @Test + public void testMap() throws Exception {} + + @Test + public void testList() throws Exception {} + } + + @RunWith(JUnit4.class) + public static class RecordToRowTests { + private void checkRecordValueToRowValue( + Type sourceType, Schema.FieldType destType, Object value) { + checkRecordValueToRowValue(sourceType, value, destType, value); + } + + private void checkRecordValueToRowValue( + Type sourceType, Object sourceValue, Schema.FieldType destType, Object destValue) { + Schema beamSchema = Schema.of(Schema.Field.of("v", destType)); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(required(0, "v", sourceType)); + Record record = GenericRecord.create(icebergSchema); + record.setField("v", sourceValue); + + Row row = SchemaAndRowConversions.recordToRow(beamSchema, record); + + assertThat(row.getBaseValue("v"), equalTo(destValue)); + } + + @Test + public void testBoolean() throws Exception { + checkRecordValueToRowValue(Types.BooleanType.get(), Schema.FieldType.BOOLEAN, true); + checkRecordValueToRowValue(Types.BooleanType.get(), Schema.FieldType.BOOLEAN, false); + } + + @Test + public void testInteger() throws Exception { + checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, -13); + checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, 42); + checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, 0); + } + + @Test + public void testLong() throws Exception { + checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 13L); + checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 42L); + } + + @Test + public void testFloat() throws Exception { + checkRecordValueToRowValue(Types.FloatType.get(), Schema.FieldType.FLOAT, 3.14159f); + checkRecordValueToRowValue(Types.FloatType.get(), Schema.FieldType.FLOAT, 42.0f); + } + + @Test + public void testDouble() throws Exception { + checkRecordValueToRowValue(Types.DoubleType.get(), Schema.FieldType.DOUBLE, 3.14159); + } + + @Test + public void testDate() throws Exception {} + + @Test + public void testTime() throws Exception {} + + @Test + public void testTimestamp() throws Exception { + DateTime dateTime = + new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); + + checkRecordValueToRowValue( + Types.TimestampType.withoutZone(), + dateTime.getMillis(), + Schema.FieldType.DATETIME, + dateTime.toInstant()); + } + + @Test + public void testFixed() throws Exception {} + + @Test + public void testBinary() throws Exception { + byte[] bytes = new byte[] {1, 2, 3, 4}; + checkRecordValueToRowValue( + Types.BinaryType.get(), ByteBuffer.wrap(bytes), Schema.FieldType.BYTES, bytes); + } + + @Test + public void testDecimal() throws Exception {} + + @Test + public void testStruct() throws Exception {} + + @Test + public void testMap() throws Exception {} + + @Test + public void testList() throws Exception {} + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java index a39754fb7149..e6bb42d6a242 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java @@ -91,7 +91,7 @@ public static final ImmutableList asRows(Iterable records) { ArrayList rows = new ArrayList<>(); for (Record record : records) { rows.add( - Row.withSchema(SchemaHelper.convert(SCHEMA)) + Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(SCHEMA)) .withFieldValue("id", record.getField("id")) .withFieldValue("data", record.getField("data")) .build());