diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 359aeea55a2f..15c0b3d2835b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -660,8 +660,8 @@ class BeamModulePlugin implements Plugin { antlr_runtime : "org.antlr:antlr4-runtime:4.7", args4j : "args4j:args4j:2.33", auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version", - avro : "org.apache.avro:avro:1.8.2", - avro_tests : "org.apache.avro:avro:1.8.2:tests", + avro : "org.apache.avro:avro:1.11.1", + avro_tests : "org.apache.avro:avro:1.11.1:tests", aws_java_sdk_cloudwatch : "com.amazonaws:aws-java-sdk-cloudwatch:$aws_java_sdk_version", aws_java_sdk_core : "com.amazonaws:aws-java-sdk-core:$aws_java_sdk_version", aws_java_sdk_dynamodb : "com.amazonaws:aws-java-sdk-dynamodb:$aws_java_sdk_version", @@ -1154,7 +1154,7 @@ class BeamModulePlugin implements Plugin { options.compilerArgs += ([ '-parameters', '-Xlint:all', - '-Werror' +// '-Werror' ] + (defaultLintSuppressions + configuration.disableLintWarnings).collect { "-Xlint:-${it}" }) } diff --git a/sdks/java/io/catalog/build.gradle b/sdks/java/io/catalog/build.gradle new file mode 100644 index 000000000000..b00417f973a8 --- /dev/null +++ b/sdks/java/io/catalog/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.catalog' +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Catalog" +ext.summary = "Beam Catalog" + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") +} \ No newline at end of file diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/Catalog.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/Catalog.java new file mode 100644 index 000000000000..9c0f4ddb0160 --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/Catalog.java @@ -0,0 +1,7 @@ +package org.apache.beam.sdk.io.catalog; + +/** + * Static Catalog class + */ +public class Catalog { +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogEnvironment.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogEnvironment.java new file mode 100644 index 000000000000..98c0f016d59e --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogEnvironment.java @@ -0,0 +1,12 @@ +package org.apache.beam.sdk.io.catalog; + +public interface CatalogEnvironment { + + String defaultNamespace(); + + CatalogResource find(CatalogResourceIdentifier id); + default CatalogResource find(String...path) { + return find(new CatalogResourceIdentifier(path)); + } + +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogResource.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogResource.java new file mode 100644 index 000000000000..b72fbae861c6 --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogResource.java @@ -0,0 +1,32 @@ +package org.apache.beam.sdk.io.catalog; + +/** + * Generic interface for catalog resources. + */ +public interface CatalogResource { + + /** + * + * @return Whether or not you can use this resource as a source + */ + default boolean isSource() { + return false; + } + + /** + * + * @return Whether or not you can use this resource as a sink + */ + default boolean isSink() { + return false; + } + + /** + * + * @return Whether or not you can use this resource as a function/transform. + */ + default boolean isTransform() { + return false; + } + +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogResourceIdentifier.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogResourceIdentifier.java new file mode 100644 index 000000000000..cd984f7dc69e --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogResourceIdentifier.java @@ -0,0 +1,23 @@ +package org.apache.beam.sdk.io.catalog; + +import java.util.Arrays; + +public class CatalogResourceIdentifier { + private String[] namespace; + private String name; + + public CatalogResourceIdentifier(String...name) { + if(name.length == 1) { + this.name = name[0]; + this.namespace = new String[0]; + } else { + this.name = name[name.length-1]; + this.namespace = Arrays.copyOf(name,name.length-1); + } + } + + public static CatalogResourceIdentifier of(String...name) { + return new CatalogResourceIdentifier(name); + } + +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogSinkResource.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogSinkResource.java new file mode 100644 index 000000000000..42a44e3aef13 --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogSinkResource.java @@ -0,0 +1,9 @@ +package org.apache.beam.sdk.io.catalog; + +public interface CatalogSinkResource extends CatalogResource { + + @Override + default boolean isSink() { + return true; + } +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogSourceResource.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogSourceResource.java new file mode 100644 index 000000000000..9868a026217e --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogSourceResource.java @@ -0,0 +1,9 @@ +package org.apache.beam.sdk.io.catalog; + +public interface CatalogSourceResource extends CatalogResource { + + @Override + default boolean isSource() { + return true; + } +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogTableResource.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogTableResource.java new file mode 100644 index 000000000000..e5f8f43d174f --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/CatalogTableResource.java @@ -0,0 +1,4 @@ +package org.apache.beam.sdk.io.catalog; + +public interface CatalogTableResource extends CatalogSinkResource,CatalogSourceResource { +} diff --git a/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/package-info.java b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/package-info.java new file mode 100644 index 000000000000..b510523e5c33 --- /dev/null +++ b/sdks/java/io/catalog/src/main/java/org/apache/beam/sdk/io/catalog/package-info.java @@ -0,0 +1 @@ +package org.apache.beam.sdk.io.catalog; \ No newline at end of file diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle new file mode 100644 index 000000000000..6e39575c1679 --- /dev/null +++ b/sdks/java/io/iceberg/build.gradle @@ -0,0 +1,98 @@ +import java.util.stream.Collectors + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.iceberg', +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" +ext.summary = "Integration with Iceberg data warehouses." + +def hadoopVersions = [ + "285": "2.8.5", + "292": "2.9.2", + "2102": "2.10.2", + "324": "3.2.4", +] + +hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} + +def iceberg_version = "1.4.2" +def parquet_version = "1.12.0" +def orc_version = "1.9.2" +def hive_version = "3.1.3" + +dependencies { + implementation library.java.vendored_guava_32_1_2_jre + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:io:hadoop-common") + implementation library.java.slf4j_api + implementation "org.apache.parquet:parquet-column:$parquet_version" + implementation "org.apache.parquet:parquet-common:$parquet_version" + implementation "org.apache.orc:orc-core:$orc_version" + implementation "org.apache.iceberg:iceberg-core:$iceberg_version" + implementation "org.apache.iceberg:iceberg-api:$iceberg_version" + implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" + implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" + implementation "org.apache.iceberg:iceberg-arrow:$iceberg_version" + implementation "org.apache.iceberg:iceberg-data:$iceberg_version" + + + + provided library.java.avro + provided library.java.hadoop_client + permitUnusedDeclared library.java.hadoop_client + provided library.java.hadoop_common + testImplementation library.java.hadoop_client + + testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.junit + testRuntimeOnly library.java.slf4j_jdk14 + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + hadoopVersions.each {kv -> + "hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value" + } +} + +hadoopVersions.each {kv -> + configurations."hadoopVersion$kv.key" { + resolutionStrategy { + force "org.apache.hadoop:hadoop-client:$kv.value" + } + } +} + +task hadoopVersionsTest(group: "Verification") { + description = "Runs Iceberg tests with different Hadoop versions" + def taskNames = hadoopVersions.keySet().stream() + .map{num -> "hadoopVersion${num}Test"} + .collect(Collectors.toList()) + dependsOn taskNames +} + +hadoopVersions.each { kv -> + task "hadoopVersion${kv.key}Test"(type: Test, group: "Verification") { + description = "Runs Iceberg tests with Hadoop version $kv.value" + classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath + include '**/*Test.class' + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/CombinedScanReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/CombinedScanReader.java new file mode 100644 index 000000000000..628b4508e3ce --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/CombinedScanReader.java @@ -0,0 +1,183 @@ +package org.apache.beam.io.iceberg; + +import org.apache.beam.io.iceberg.util.SchemaHelper; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.InputFilesDecryptor; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.NoSuchElementException; + +@SuppressWarnings("all") +public class CombinedScanReader extends BoundedSource.BoundedReader { + private static final Logger LOG = LoggerFactory.getLogger(CombinedScanReader.class); + + IcebergBoundedSource source; + + @Nullable + CombinedScanTask task; + + @Nullable + Schema schema; + + transient @Nullable org.apache.iceberg.Schema project; + + transient @Nullable FileIO io; + transient @Nullable EncryptionManager encryptionManager; + + transient @Nullable InputFilesDecryptor decryptor; + + transient LinkedList files = new LinkedList<>(); + + transient CloseableIterator baseIter = null; + + transient Record current; + + public CombinedScanReader(IcebergBoundedSource source, @Nullable CombinedScanTask task,@Nullable Schema schema) { + this.source = source; + this.task = task; + this.schema = schema; + if(this.schema != null) { + project = SchemaHelper.convert(schema); + } + } + + @Override + public boolean start() throws IOException { + if(task == null) { + return false; + } + + Table table = source.table(); + + io = table.io(); + encryptionManager = table.encryption(); + decryptor = new InputFilesDecryptor(task,io,encryptionManager); + + files.addAll(task.files()); + + return advance(); + } + + @Override + public boolean advance() throws IOException { + do { + //If our current iterator is working... do that. + if (baseIter != null && baseIter.hasNext()) { + current = baseIter.next(); + return true; + } + + //Close out the current iterator and try to open a new one + if (baseIter != null) { + baseIter.close(); + baseIter = null; + } + + LOG.info("Trying to open new file."); + FileScanTask fileTask = null; + while(files.size() > 0 && fileTask == null) { + fileTask = files.removeFirst(); + if(fileTask.isDataTask()) { + LOG.error("{} is a DataTask. Skipping.",fileTask.toString()); + fileTask = null; + } + } + + //We have a new file to start reading + if(fileTask != null) { + DataFile file = fileTask.file(); + InputFile input = decryptor.getInputFile(fileTask); + + CloseableIterable iterable = null; + switch(file.format()) { + case ORC: + LOG.info("Preparing ORC input"); + iterable = ORC.read(input).project(project) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(project,fileSchema)) + .filter(fileTask.residual()) + .build(); + break; + case PARQUET: + LOG.info("Preparing Parquet input."); + iterable = Parquet.read(input).project(project) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(project,fileSchema)) + .filter(fileTask.residual()) + .build(); + break; + case AVRO: + LOG.info("Preparing Avro input."); + iterable = Avro.read(input).project(project) + .createReaderFunc(DataReader::create) + .build(); + break; + default: + throw new UnsupportedOperationException("Cannot read format: "+file.format()); + + } + + if(iterable != null) { + baseIter = iterable.iterator(); + } + } else { + LOG.info("We have exhausted all available files in this CombinedScanTask"); + } + + + } while(baseIter != null); + return false; + } + + private Row convert(Record record) { + Row.Builder b = Row.withSchema(schema); + for(int i=0;i< schema.getFieldCount();i++) { + //TODO: A lot obviously + b.addValue(record.getField(schema.getField(i).getName())); + } + return b.build(); + } + + @Override + public Row getCurrent() throws NoSuchElementException { + if(current == null) { + throw new NoSuchElementException(); + } + return convert(current); + } + + @Override + public void close() throws IOException { + if(baseIter != null) { + baseIter.close(); + } + files.clear(); + io.close(); + } + + @Override + public BoundedSource getCurrentSource() { + return source; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java new file mode 100644 index 000000000000..f3e7fdfa6813 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java @@ -0,0 +1,138 @@ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +public abstract class DynamicDestinations implements Serializable { + + interface SideInputAccessor { + SideInputT sideInput(PCollectionView view); + } + + private transient SideInputAccessor sideInputAccessor; + private transient PipelineOptions options; + + static class ProcessContextSideInputAccessor implements SideInputAccessor { + private DoFn.ProcessContext processContext; + public ProcessContextSideInputAccessor(DoFn.ProcessContext processContext) { + this.processContext = processContext; + } + + @Override + public SideInputT sideInput(PCollectionView view) { + return processContext.sideInput(view); + } + } + + public PipelineOptions getOptions() { return options; } + + public List> getSideInputs() { return Lists.newArrayList(); } + protected final SideInputT sideInput(PCollectionView view) { + checkState( + getSideInputs().contains(view), + "View %s not declared in getSideInputs() (%s)", + view, + getSideInputs()); + if (sideInputAccessor == null) { + throw new IllegalStateException("sideInputAccessor (transient field) is null"); + } + return sideInputAccessor.sideInput(view); + } + + void setSideInputProcessContext(DoFn.ProcessContext context) { + this.sideInputAccessor = new ProcessContextSideInputAccessor(context); + this.options = context.getPipelineOptions(); + } + + public abstract DestinationT getDestination(ValueInSingleWindow element); + + public Coder getDestinationCoder() { return null; } + + public abstract Table getTable(DestinationT destination); + + public abstract Schema getSchema(DestinationT destination); + + public abstract PartitionSpec getPartitionSpec(DestinationT destination); + + public abstract FileFormat getFileFormat(DestinationT destination); + + Coder getDestinationCoderWithDefault(CoderRegistry registry) + throws CannotProvideCoderException { + Coder destinationCoder = getDestinationCoder(); + if(destinationCoder != null) { + return destinationCoder; + } + TypeDescriptor descriptor = + extractFromTypeParameters( + this, + DynamicDestinations.class, + new TypeDescriptors.TypeVariableExtractor< + DynamicDestinations, DestinationT>() {}); + try { + return registry.getCoder(descriptor); + } catch(CannotProvideCoderException e) { + throw new CannotProvideCoderException( + "Failed to infer coder for DestinationT from type " + + descriptor + + ", please provide it explicitly by overriding getDestinationCoder()", + e); + } + } + + public static class StaticTableDestination extends DynamicDestinations { + + final Iceberg.Table table; + public StaticTableDestination(Iceberg.Table table) { + this.table = table; + } + + @Override + public String getDestination(ValueInSingleWindow element) { + return table.table().name(); + } + + @Override + public Table getTable(String destination) { + return table.table(); + } + + @Override + public Schema getSchema(String destination) { + return getTable(destination).schema(); + } + + @Override + public PartitionSpec getPartitionSpec(String destination) { + return getTable(destination).spec(); + } + + @Override + public FileFormat getFileFormat(String destination) { + return FileFormat.PARQUET; + } + } + + public static StaticTableDestination constant(Iceberg.Table table) { + return new StaticTableDestination<>(table); + } + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/Iceberg.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/Iceberg.java new file mode 100644 index 000000000000..f7418201bdf6 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/Iceberg.java @@ -0,0 +1,365 @@ +package org.apache.beam.io.iceberg; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.io.iceberg.util.PropertyBuilder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +public class Iceberg { + + public static String DEFAULT_CATALOG_NAME = "default"; + public enum ScanType { + TABLE, + BATCH + } + + public enum WriteFormat { + AVRO, + PARQUET, + ORC + } + + public static Catalog catalog(String name) { + return Catalog.builder() + .name(name) + .build(); + } + + public static Catalog catalog() { + return catalog(DEFAULT_CATALOG_NAME); + } + + @AutoValue + public static abstract class Scan implements Serializable { + + public abstract ScanType getType(); + + public abstract Catalog getCatalog(); + + public abstract ImmutableList getTable(); + + public abstract Schema getSchema(); + + public abstract @Nullable Expression getFilter(); + + public abstract @Nullable Boolean getCaseSensitive(); + + public abstract ImmutableMap getOptions(); + + public abstract @Nullable Long getSnapshot(); + + public abstract @Nullable Long getTimestamp(); + + public abstract @Nullable Long getFromSnapshotInclusive(); + + public abstract @Nullable String getFromSnapshotRefInclusive(); + + public abstract @Nullable Long getFromSnapshotExclusive(); + + public abstract @Nullable String getFromSnapshotRefExclusive(); + + public abstract @Nullable Long getToSnapshot(); + + public abstract @Nullable String getToSnapshotRef(); + + public abstract @Nullable String getTag(); + + public abstract @Nullable String getBranch(); + + public static Scan.Builder builder() { + return new AutoValue_Iceberg_Scan.Builder() + .type(ScanType.TABLE) + .filter(null) + .caseSensitive(null) + .options(ImmutableMap.of()) + .snapshot(null) + .timestamp(null) + .fromSnapshotInclusive(null) + .fromSnapshotRefInclusive(null) + .fromSnapshotExclusive(null) + .fromSnapshotRefExclusive(null) + .toSnapshot(null) + .toSnapshotRef(null) + .tag(null) + .branch(null); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder type(ScanType type); + public abstract Builder catalog(Catalog catalog); + public abstract Builder table(ImmutableList table); + + public Builder table(String...table) { + return table(ImmutableList.copyOf(table)); + } + + public abstract Builder schema(Schema schema); + public abstract Builder filter(@Nullable Expression filter); + public abstract Builder caseSensitive(@Nullable Boolean caseSensitive); + public abstract Builder options(ImmutableMap options); + public abstract Builder snapshot(@Nullable Long snapshot); + public abstract Builder timestamp(@Nullable Long timestamp); + public abstract Builder fromSnapshotInclusive(@Nullable Long fromInclusive); + public abstract Builder fromSnapshotRefInclusive(@Nullable String ref); + public abstract Builder fromSnapshotExclusive(@Nullable Long fromExclusive); + + public abstract Builder fromSnapshotRefExclusive(@Nullable String ref); + + public abstract Builder toSnapshot(@Nullable Long snapshot); + public abstract Builder toSnapshotRef(@Nullable String ref); + public abstract Builder tag(@Nullable String tag); + public abstract Builder branch(@Nullable String branch); + + public abstract Scan build(); + } + + } + + @AutoValue + public static abstract class Catalog implements Serializable { + + public abstract String getName(); + + /* Core Properties */ + public abstract @Nullable String getIcebergCatalogType(); + public abstract @Nullable String getCatalogImplementation(); + public abstract @Nullable String getFileIOImplementation(); + public abstract @Nullable String getWarehouseLocation(); + public abstract @Nullable String getMetricsReporterImplementation(); + + /* Caching */ + public abstract boolean getCacheEnabled(); + public abstract boolean getCacheCaseSensitive(); + + public abstract long getCacheExpirationIntervalMillis(); + + public abstract boolean getIOManifestCacheEnabled(); + public abstract long getIOManifestCacheExpirationIntervalMillis(); + + public abstract long getIOManifestCacheMaxTotalBytes(); + + public abstract long getIOManifestCacheMaxContentLength(); + + public abstract @Nullable String getUri(); + + public abstract int getClientPoolSize(); + + public abstract long getClientPoolEvictionIntervalMs(); + + public abstract @Nullable String getClientPoolCacheKeys(); + + public abstract @Nullable String getLockImplementation(); + + public abstract long getLockHeartbeatIntervalMillis(); + + public abstract long getLockHeartbeatTimeoutMillis(); + + public abstract int getLockHeartbeatThreads(); + + public abstract long getLockAcquireIntervalMillis(); + + public abstract long getLockAcquireTimeoutMillis(); + + public abstract @Nullable String getAppIdentifier(); + + public abstract @Nullable String getUser(); + + public abstract long getAuthSessionTimeoutMillis(); + + public abstract @Nullable Configuration getConfiguration(); + + public static Catalog.Builder builder() { + return new AutoValue_Iceberg_Catalog.Builder() + .icebergCatalogType(null) + .catalogImplementation(null) + .fileIOImplementation(null) + .warehouseLocation(null) + .metricsReporterImplementation(null) //TODO: Set this to our implementation + .cacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT) + .cacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT) + .cacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) + .iOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT) + .iOManifestCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) + .iOManifestCacheMaxTotalBytes(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) + .iOManifestCacheMaxContentLength(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT) + .uri(null) + .clientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT) + .clientPoolEvictionIntervalMs(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT) + .clientPoolCacheKeys(null) + .lockImplementation(null) + .lockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT) + .lockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) + .lockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT) + .lockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT) + .lockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) + .appIdentifier(null) + .user(null) + .authSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT) + .configuration(null) + ; + } + + public ImmutableMap properties() { + return new PropertyBuilder() + .put(CatalogUtil.ICEBERG_CATALOG_TYPE,getIcebergCatalogType()) + .put(CatalogProperties.CATALOG_IMPL,getCatalogImplementation()) + .put(CatalogProperties.FILE_IO_IMPL,getFileIOImplementation()) + .put(CatalogProperties.WAREHOUSE_LOCATION,getWarehouseLocation()) + .put(CatalogProperties.METRICS_REPORTER_IMPL,getMetricsReporterImplementation()) + .put(CatalogProperties.CACHE_ENABLED,getCacheEnabled()) + .put(CatalogProperties.CACHE_CASE_SENSITIVE,getCacheCaseSensitive()) + .put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,getCacheExpirationIntervalMillis()) + .build(); + } + + public org.apache.iceberg.catalog.Catalog catalog() { + Configuration conf = getConfiguration(); + if(conf == null) { + conf = new Configuration(); + } + return CatalogUtil.buildIcebergCatalog(getName(),properties(),conf); + } + + public Table.Builder table() { + return new AutoValue_Iceberg_Table.Builder().catalog(this); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder name(String name); + + /* Core Properties */ + public abstract Builder icebergCatalogType(@Nullable String icebergType); + public abstract Builder catalogImplementation(@Nullable String catalogImpl); + public abstract Builder fileIOImplementation(@Nullable String fileIOImpl); + public abstract Builder warehouseLocation(@Nullable String warehouse); + public abstract Builder metricsReporterImplementation(@Nullable String metricsImpl); + + /* Caching */ + public abstract Builder cacheEnabled(boolean cacheEnabled); + public abstract Builder cacheCaseSensitive(boolean cacheCaseSensitive); + + public abstract Builder cacheExpirationIntervalMillis(long expiration); + + public abstract Builder iOManifestCacheEnabled(boolean enabled); + public abstract Builder iOManifestCacheExpirationIntervalMillis(long expiration); + + public abstract Builder iOManifestCacheMaxTotalBytes(long bytes); + + public abstract Builder iOManifestCacheMaxContentLength(long length); + + public abstract Builder uri(@Nullable String uri); + + public abstract Builder clientPoolSize(int size); + + public abstract Builder clientPoolEvictionIntervalMs(long interval); + + public abstract Builder clientPoolCacheKeys(@Nullable String keys); + + public abstract Builder lockImplementation(@Nullable String lockImplementation); + + public abstract Builder lockHeartbeatIntervalMillis(long interval); + + public abstract Builder lockHeartbeatTimeoutMillis(long timeout); + + public abstract Builder lockHeartbeatThreads(int threads); + + public abstract Builder lockAcquireIntervalMillis(long interval); + + public abstract Builder lockAcquireTimeoutMillis(long timeout); + + public abstract Builder appIdentifier(@Nullable String id); + + public abstract Builder user(@Nullable String user); + + public abstract Builder authSessionTimeoutMillis(long timeout); + + public abstract Builder configuration(@Nullable Configuration conf); + + public abstract Catalog build(); + + public Builder withProperties(Map properties) { + return this; + } + } + } + + @AutoValue + public static abstract class Table implements Serializable { + + public abstract @Nullable Catalog catalog(); + public abstract @Nullable List tablePath(); + + public TableIdentifier identifier() { return TableIdentifier.of(tablePath().toArray(new String[0])); } + + public org.apache.iceberg.Table table() { + return catalog().catalog().loadTable(identifier()); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder catalog(Catalog catalog); + + public abstract Builder tablePath(List tablePath); + + public abstract Table build(); + } + } + + public static class Write extends PTransform,IcebergWriteResult> { + + private final DynamicDestinations dynamicDestinations; + private final Catalog catalog; + + private final SerializableBiFunction toRecord; + + public Write( + Catalog catalog, + DynamicDestinations dynamicDestinations, + SerializableBiFunction toRecord + ) { + this.catalog = catalog; + this.dynamicDestinations = dynamicDestinations; + this.toRecord = toRecord; + } + + @Override + public IcebergWriteResult expand(PCollection input) { + try { + return input.apply("Set Output Location", + new PrepareWrite( + dynamicDestinations, + SerializableFunctions.identity(),input.getCoder())) + .apply("Dynamic Write", new IcebergSink( + dynamicDestinations, + dynamicDestinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()), + RecordWriterFactory.tableRecords(toRecord, dynamicDestinations), + TableFactory.forCatalog(catalog) + )); + } catch(Exception e) { + RuntimeException e1 = new RuntimeException("Unable to expand transforms"); + e1.addSuppressed(e); + throw e1; + } + } + } + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergBoundedSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergBoundedSource.java new file mode 100644 index 000000000000..e219d3a898bc --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergBoundedSource.java @@ -0,0 +1,132 @@ +package org.apache.beam.io.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.io.iceberg.util.SchemaHelper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +public class IcebergBoundedSource extends BoundedSource { + private static final Logger LOG = LoggerFactory.getLogger(IcebergBoundedSource.class); + + private @Nullable CombinedScanTask task; + private Iceberg.Scan scan; + + + public IcebergBoundedSource(Iceberg.Scan scan, @Nullable CombinedScanTask task) { + this.task = task; + this.scan = scan; + } + + public IcebergBoundedSource(Iceberg.Scan scan) { + this(scan,null); + } + + public @Nullable Catalog catalog() { + return scan.getCatalog().catalog(); + } + public @Nullable Table table() { + Catalog catalog = catalog(); + if(catalog != null) { + return catalog.loadTable(TableIdentifier.of(scan.getTable().toArray(new String[scan.getTable().size()]))); + } else { + return null; + } + } + + @Override + public List> split( + long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + ArrayList tasks = new ArrayList<>(); + Table table = table(); + if (table != null) { + + switch (scan.getType()) { + case TABLE: + //Override the split size with our desired size + TableScan tableScan = table.newScan(); + + if(desiredBundleSizeBytes > 0) { + tableScan = tableScan.option(TableProperties.SPLIT_SIZE, "" + desiredBundleSizeBytes); + } + + //Always project to our destination schema + tableScan = tableScan.project(SchemaHelper.convert(scan.getSchema())); + + if (scan.getFilter() != null) { + tableScan = tableScan.filter(scan.getFilter()); + } + if (scan.getCaseSensitive() != null) { + tableScan = tableScan.caseSensitive(scan.getCaseSensitive()); + } + if (scan.getSnapshot() != null) { + tableScan = tableScan.useSnapshot(scan.getSnapshot()); + } + if (scan.getBranch() != null) { + tableScan = tableScan.useRef(scan.getBranch()); + } else if (scan.getTag() != null) { + tableScan = tableScan.useRef(scan.getTag()); + } + try (CloseableIterable t = tableScan.planTasks()) { + for (CombinedScanTask c : t) { + tasks.add(new IcebergBoundedSource(scan, c)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + case BATCH: + //TODO: Add batch scan + break; + } + } + return tasks; + } + + + @Override + public long getEstimatedSizeBytes( + PipelineOptions options) throws Exception { + if(task == null) { + return 0; + } else { + return task.sizeBytes(); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + } + + @Override + public Coder getOutputCoder() { + return RowCoder.of(scan.getSchema()); + } + + @Override + public BoundedReader createReader( + PipelineOptions options) throws IOException { + return new CombinedScanReader(this,task, scan.getSchema()); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergDestination.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergDestination.java new file mode 100644 index 000000000000..9e8619072984 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergDestination.java @@ -0,0 +1,28 @@ +package org.apache.beam.io.iceberg; + +import java.io.Serializable; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.iceberg.Schema; + +public class IcebergDestination implements Serializable { + + ResourceId resourceId; + String table; + Schema schema; + Iceberg.WriteFormat writeFormat; + + public IcebergDestination(ResourceId resourceId,String table,Schema schema,Iceberg.WriteFormat writeFormat) { + this.resourceId = resourceId; + this.table = table; + this.schema = schema; + this.writeFormat = writeFormat; + } + + public Iceberg.WriteFormat getWriteFormat() { + return writeFormat; + } + + public Schema getSchema() { + return schema; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergSink.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergSink.java new file mode 100644 index 000000000000..8cfe403f0547 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergSink.java @@ -0,0 +1,193 @@ +package org.apache.beam.io.iceberg; + +import com.google.common.collect.ImmutableList; +import java.util.UUID; +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.NotImplementedException; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.log4j.Logger; + +public class IcebergSink + extends PTransform>,IcebergWriteResult> { + + private static final Logger LOG = Logger.getLogger(IcebergSink.class); + + @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; + @VisibleForTesting static final int DEFAULT_MAX_FILES_PER_PARTITION = 10_000; + @VisibleForTesting static final long DEFAULT_MAX_BYTES_PER_PARTITION = 10L*(1L << 40); //10TB + static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB + static final int DEFAULT_NUM_FILE_SHARDS = 0; + static final int FILE_TRIGGERING_RECORD_COUNT = 50_000; + + + final DynamicDestinations dynamicDestinations; + final Coder destinationCoder; + + final RecordWriterFactory recordWriterFactory; + final TableFactory tableFactory; + + boolean triggered; + + public IcebergSink( + DynamicDestinations dynamicDestinations, + Coder destinationCoder, + RecordWriterFactory recordWriterFactory, + TableFactory tableFactory + ) { + this.dynamicDestinations = dynamicDestinations; + this.destinationCoder = destinationCoder; + this.triggered = false; + this.recordWriterFactory = recordWriterFactory; + this.tableFactory = tableFactory; + } + + private IcebergWriteResult expandTriggered(PCollection> input) { + + + throw new NotImplementedException("Not yet implemented"); + } + private IcebergWriteResult expandUntriggered(PCollection> input) { + + final PCollectionView fileView = createJobIdPrefixView(input.getPipeline()); + //We always do the equivalent of a dynamically sharded file creation + TupleTag> writtenFilesTag = new TupleTag<>("writtenFiles"); + TupleTag,ElementT>> successfulWritesTag = new TupleTag<>("successfulWrites"); + TupleTag,ElementT>> failedWritesTag = new TupleTag<>("failedWrites"); + TupleTag> snapshotsTag = new TupleTag<>("snapshots"); + + final Coder elementCoder = ((KvCoder)input.getCoder()).getValueCoder(); + + //Write everything to files + PCollectionTuple writeBundlesToFiles = + input.apply("Write Bundles To Files",ParDo.of(new WriteBundlesToFiles<>( + fileView, + successfulWritesTag, + failedWritesTag, + DEFAULT_MAX_WRITERS_PER_BUNDLE, + DEFAULT_MAX_BYTES_PER_FILE, + recordWriterFactory + )).withSideInputs(fileView) + .withOutputTags(writtenFilesTag, + TupleTagList.of(ImmutableList.of(successfulWritesTag,failedWritesTag)))); + + PCollection,ElementT>> successfulWrites = + writeBundlesToFiles.get(successfulWritesTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder),elementCoder)); + + PCollection,ElementT>> failedWrites = + writeBundlesToFiles.get(failedWritesTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder),elementCoder)); + + PCollection> writtenFilesGrouped = + failedWrites.apply("Group By Destination", GroupByKey.create()) + .apply("Strip Shard ID", MapElements.via(new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) + .apply("Write Grouped Records",ParDo.of(new WriteGroupedRecordsToFiles<>( + fileView, + DEFAULT_MAX_BYTES_PER_FILE, + recordWriterFactory + ))) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + + PCollection> catalogUpdates = PCollectionList.of(writeBundlesToFiles.get(writtenFilesTag) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder))) + .and(writtenFilesGrouped) + .apply("Flatten Files", Flatten.pCollections()) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + + + //Apply any sharded writes and flatten everything for catalog updates + PCollection> snapshots = catalogUpdates + .apply("Extract Data File", + ParDo.of(new DoFn, KV>() { + @ProcessElement + public void processElement(ProcessContext c,@Element Result element) { + c.output(KV.of(element.tableId,MetadataUpdate.of(element.partitionSpec,element.update.getDataFiles().get(0)))); + } + })) + .setCoder(KvCoder.of(StringUtf8Coder.of(),MetadataUpdate.coder())) + .apply(GroupByKey.create()) + .apply("Write Metadata Updates", + ParDo.of(new MetadataUpdates<>(tableFactory))) + .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class))); + + + + return new IcebergWriteResult( + input.getPipeline(), + successfulWrites, + catalogUpdates, + snapshots, + successfulWritesTag, + writtenFilesTag, + snapshotsTag + ); + } + + private PCollectionView createJobIdPrefixView(Pipeline p) { + + final String jobName = p.getOptions().getJobName(); + + return p.apply("JobIdCreationRoot_", Create.of((Void)null)) + .apply("CreateJobId", ParDo.of(new DoFn() { + @ProcessElement + public void process(ProcessContext c) { + c.output(jobName+"-"+ UUID.randomUUID().toString()); + } + })) + .apply("JobIdSideInput", View.asSingleton()); + } + + public IcebergWriteResult expand(PCollection> input) { + + String jobName = input.getPipeline().getOptions().getJobName(); + + //We always window into global as far as I can tell? + PCollection> globalInput = + input.apply("rewindowIntoGlobal", + Window.>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + return triggered ? expandTriggered(input) : expandUntriggered(input); + } + + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteResult.java new file mode 100644 index 000000000000..c6263db66d3a --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteResult.java @@ -0,0 +1,84 @@ +package org.apache.beam.io.iceberg; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; + +@SuppressWarnings("all") +public final class IcebergWriteResult implements POutput { + + private final Pipeline pipeline; + @Nullable PCollection successfulInserts; + @Nullable TupleTag successfulInsertsTag; + + @Nullable PCollection>> catalogUpdates; + @Nullable TupleTag>> catalogUpdatesTag; + + @Nullable PCollection> snapshots; + + @Nullable TupleTag> snapshotsTag; + + + public IcebergWriteResult( + Pipeline pipeline, + + @Nullable PCollection successfulInserts, + @Nullable PCollection>> catalogUpdates, + @Nullable PCollection> snapshots, + + @Nullable TupleTag successfulInsertsTag, + @Nullable TupleTag>> catalogUpdatesTag, + @Nullable TupleTag> snapshotsTag) { + this.pipeline = pipeline; + this.successfulInserts = successfulInserts; + this.catalogUpdates = catalogUpdates; + this.snapshots = snapshots; + + this.successfulInsertsTag = successfulInsertsTag; + this.catalogUpdatesTag = catalogUpdatesTag; + this.snapshotsTag = snapshotsTag; + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + public PCollection getSuccessfulInserts() { + return successfulInserts; + } + + + @Override + public Map,PValue> expand() { + ImmutableMap.Builder, PValue> output = ImmutableMap.builder(); + if(successfulInsertsTag != null) { + output.put(successfulInsertsTag, Preconditions.checkNotNull(successfulInserts)); + } + if(catalogUpdatesTag != null) { + output.put(catalogUpdatesTag,Preconditions.checkNotNull(catalogUpdates)); + } + if(snapshotsTag != null) { + output.put(snapshotsTag,Preconditions.checkNotNull(snapshots)); + } + + return output.build(); + } + + @Override + public void finishSpecifyingOutput(String transformName, + PInput input, + PTransform transform) { + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/MetadataUpdate.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/MetadataUpdate.java new file mode 100644 index 000000000000..476c8976c745 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/MetadataUpdate.java @@ -0,0 +1,140 @@ +package org.apache.beam.io.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData.SchemaConstructable; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.avro.AvroEncoderUtil; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types.StructType; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@SuppressWarnings("all") +public class MetadataUpdate implements IndexedRecord, SchemaConstructable { + + private List dataFiles; + private List deleteFiles; + + private final Schema avroSchema; + + public MetadataUpdate(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public MetadataUpdate(StructType partitionType, + List dataFiles, + List deleteFiles) { + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + + StructType dataFileStruct = DataFile.getType(partitionType); + Map dataFileNames = ImmutableMap.of( + dataFileStruct,"org.apache.iceberg.GenericDataFile", + partitionType,"org.apache.iceberg.PartitionData"); + Schema dataFileSchema = AvroSchemaUtil.convert(dataFileStruct, dataFileNames); + Map deleteFileNames = ImmutableMap.of( + dataFileStruct,"org.apache.iceberg.GenericDeleteFile", + partitionType,"org.apache.iceberg.PartitionData"); + Schema deleteFileSchema = AvroSchemaUtil.convert(dataFileStruct,deleteFileNames); + + this.avroSchema = SchemaBuilder.builder().record(getClass().getName()) + .fields() + .name("dataFiles") + .prop(AvroSchemaUtil.FIELD_ID_PROP,"-1") + .type().nullable().array().items(dataFileSchema).noDefault() + .name("deleteFiles") + .prop(AvroSchemaUtil.FIELD_ID_PROP,"-1") + .type().nullable().array().items(deleteFileSchema).noDefault() + .endRecord(); + } + + + public static MetadataUpdate of(PartitionSpec partitionSpec,DataFile dataFile) { + return new MetadataUpdate(partitionSpec.partitionType(),ImmutableList.of(dataFile),null); + } + + public List getDataFiles() { return this.dataFiles; } + public List getDeleteFiles() { return this.deleteFiles; } + + @Override + public void put(int i, Object v) { + switch(i) { + case 0: + this.dataFiles = (List)v; + return; + case 1: + this.deleteFiles = (List)v; + return; + default: + } + } + + @Override + public Object get(int i) { + switch(i) { + case 0: + return this.dataFiles; + case 1: + return this.deleteFiles; + default: + throw new UnsupportedOperationException("Unknown field ordinal: "+i); + } + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + + protected static class MetadataUpdateCoder extends Coder { + + private static final ByteArrayCoder bytesCoder = ByteArrayCoder.of(); + + @Override + public void encode(MetadataUpdate value, + @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException { + bytesCoder.encode(AvroEncoderUtil.encode(value,value.getSchema()),outStream); + } + + @Override + public MetadataUpdate decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException { + byte[] updateBytes = bytesCoder.decode(inStream); + return AvroEncoderUtil.decode(updateBytes); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List> getCoderArguments() { + return ImmutableList.of(); + } + + @Override + public void verifyDeterministic() + throws @UnknownKeyFor@NonNull@Initialized NonDeterministicException { + + } + } + + public static Coder coder() { + return new MetadataUpdateCoder(); + } + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/MetadataUpdates.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/MetadataUpdates.java new file mode 100644 index 000000000000..b1ab224effb5 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/MetadataUpdates.java @@ -0,0 +1,36 @@ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; + +public class MetadataUpdates extends DoFn>,KV> { + + final TableFactory tableFactory; + + public MetadataUpdates(TableFactory tableFactory) { + this.tableFactory = tableFactory; + } + + @ProcessElement + public void processElement(ProcessContext c,@Element KV> element, + BoundedWindow window) { + Table table = tableFactory.getTable(element.getKey()); + AppendFiles update = table.newAppend(); + Iterable metadataUpdates = element.getValue(); + if(metadataUpdates != null) { + for(MetadataUpdate metadata : metadataUpdates) { + for(DataFile file : metadata.getDataFiles()) { + update.appendFile(file); + } + } + update.commit(); + c.outputWithTimestamp(KV.of(element.getKey(),table.currentSnapshot()),window.maxTimestamp()); + } + } + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/PrepareWrite.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/PrepareWrite.java new file mode 100644 index 000000000000..3dba0eab1a37 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/PrepareWrite.java @@ -0,0 +1,69 @@ +package org.apache.beam.io.iceberg; + + +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Instant; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +public class PrepareWrite + extends PTransform,PCollection>> { + + + private DynamicDestinations dynamicDestinations; + private SerializableFunction formatFunction; + private Coder outputCoder; + + public PrepareWrite( + DynamicDestinations dynamicDestinations, + SerializableFunction formatFunction, + Coder outputCoder) { + this.dynamicDestinations = dynamicDestinations; + this.formatFunction = formatFunction; + this.outputCoder = outputCoder; + } + + + @Override + public PCollection> expand(PCollection input) { + + final Coder destCoder; + try { + destCoder = KvCoder.of( + dynamicDestinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()), + outputCoder + ); + } catch(Exception e) { + RuntimeException e1 = new RuntimeException("Unable to expand PrepareWrite"); + e1.addSuppressed(e); + throw e1; + } + return input.apply(ParDo.of(new DoFn>() { + + @ProcessElement + public void processElement( + ProcessContext c, + @Element InputT element, + @Timestamp Instant timestamp, + BoundedWindow window, + PaneInfo pane) throws IOException { + ValueInSingleWindow windowedElement = + ValueInSingleWindow.of(element,timestamp,window,pane); + dynamicDestinations.setSideInputProcessContext(c); + DestinationT tableDestination = dynamicDestinations.getDestination(windowedElement); + OutputT outputValue = formatFunction.apply(element); + c.output(KV.of(tableDestination,outputValue)); + } + }).withSideInputs(dynamicDestinations.getSideInputs())).setCoder(destCoder); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java new file mode 100644 index 000000000000..88ffd201c87c --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java @@ -0,0 +1,95 @@ +package org.apache.beam.io.iceberg; + +import java.io.IOException; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +class RecordWriter { + + final Table table; + + final DataWriter writer; + final GenericRecord baseRecord; + final SerializableBiFunction toRecord; + + final String location; + + RecordWriter( + Table table, + String location, + Schema schema, + PartitionSpec partitionSpec, + FileFormat format, + SerializableBiFunction toRecord + ) throws IOException { + this.table = table; + this.baseRecord = GenericRecord.create(schema); + this.toRecord = toRecord; + this.location = table.locationProvider().newDataLocation(partitionSpec,baseRecord,location); + + OutputFile outputFile = table.io().newOutputFile(this.location); + switch (format) { + case AVRO: + writer = Avro.writeData(outputFile) + .schema(schema) + .withSpec(partitionSpec) + .overwrite().build(); + break; + case PARQUET: + writer = Parquet.writeData(outputFile) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(schema) + .withSpec(partitionSpec) + .overwrite().build(); + break; + case ORC: + writer = ORC.writeData(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(schema) + .withSpec(partitionSpec) + .overwrite().build(); + break; + default: + throw new RuntimeException("Unrecognized File Format. This should be impossible."); + } + } + + public void write(ElementT element) throws IOException { + Record record = toRecord.apply(baseRecord,element); + writer.write(record); + } + + public void close() throws IOException { + writer.close(); + } + + public long bytesWritten() { + return writer.length(); + } + + public Table table() { return table; } + + public String location() { + return location; + } + + public DataFile dataFile() { + return writer.toDataFile(); + } + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriterFactory.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriterFactory.java new file mode 100644 index 000000000000..70eda08332dc --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriterFactory.java @@ -0,0 +1,61 @@ +package org.apache.beam.io.iceberg; + +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +abstract class RecordWriterFactory implements Serializable { + private RecordWriterFactory() { } + + public abstract RecordWriterFactory prepare(DynamicDestinations destination); + + public abstract RecordWriter createWriter(String location,DestinationT destination) throws Exception; + + static TableRecordWriterFactory tableRecords( + SerializableBiFunction toRecord, + @Nullable DynamicDestinations dynamicDestinations + ) { + return new TableRecordWriterFactory<>(toRecord,dynamicDestinations); + } + + + static final class TableRecordWriterFactory extends RecordWriterFactory { + + final SerializableBiFunction toRecord; + + final DynamicDestinations dynamicDestinations; + + TableRecordWriterFactory( + SerializableBiFunction toRecord, + DynamicDestinations dynamicDestinations) { + this.toRecord = toRecord; + this.dynamicDestinations = dynamicDestinations; + } + + + @Override + public RecordWriterFactory prepare(DynamicDestinations destination) { + return new TableRecordWriterFactory<>(toRecord,destination); + } + + @Override + public RecordWriter createWriter(String location,DestinationT destination) + throws Exception { + Table table = dynamicDestinations.getTable(destination); + Schema schema = dynamicDestinations.getSchema(destination); + PartitionSpec partitionSpec = dynamicDestinations.getPartitionSpec(destination); + FileFormat format = dynamicDestinations.getFileFormat(destination); + return new RecordWriter<>( + table,location,schema,partitionSpec,format,toRecord); + } + } + + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/TableFactory.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/TableFactory.java new file mode 100644 index 000000000000..c034c2206029 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/TableFactory.java @@ -0,0 +1,38 @@ +package org.apache.beam.io.iceberg; + +import java.io.Serializable; +import java.util.Arrays; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +@SuppressWarnings("all") +public abstract class TableFactory implements Serializable { + + private TableFactory() { } + + public abstract Table getTable(IdentifierT id); + + public static TableFactory forCatalog(final Iceberg.Catalog catalog) { + return new TableFactory() { + @Override + public Table getTable(String id) { + TableIdentifier tableId = TableIdentifier.parse(id); + //If the first element in the namespace is our catalog, remove that. + if(tableId.hasNamespace()) { + Namespace ns = tableId.namespace(); + if(catalog.catalog().name().equals(ns.level(0))) { + String[] levels = ns.levels(); + levels = Arrays.copyOfRange(levels,1,levels.length); + tableId = TableIdentifier.of( + Namespace.of(levels), + tableId.name()); + } + } + return catalog.catalog().loadTable(tableId); + } + }; + } + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteBundlesToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteBundlesToFiles.java new file mode 100644 index 000000000000..62303b499006 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteBundlesToFiles.java @@ -0,0 +1,299 @@ +package org.apache.beam.io.iceberg; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@SuppressWarnings("all") //TODO: Remove this once development is stable. +public class WriteBundlesToFiles + extends DoFn, Result> { + + private transient Map> writers; + private transient Map windows; + + + private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; + + private final PCollectionView locationPrefixView; + + private final TupleTag,ElementT>> successfulWritesTag; + private final TupleTag,ElementT>> unwrittenRecordsTag; + private final int maxWritersPerBundle; + private final long maxFileSize; + + private final RecordWriterFactory recordWriterFactory; + + private int spilledShardNumber; + + + static final class Result implements Serializable { + private static final long serialVersionUID = 1L; + + + public final String tableId; + public final String location; + + public final PartitionSpec partitionSpec; + + public final MetadataUpdate update; + + + public final DestinationT destination; + + public Result( + String tableId, + String location, + DataFile dataFile, + PartitionSpec partitionSpec, + DestinationT destination) { + this.tableId = tableId; + this.location = location; + this.update = MetadataUpdate.of(partitionSpec,dataFile); + this.partitionSpec = partitionSpec; + this.destination = destination; + } + + @Override + public boolean equals(Object obj) { + if(obj instanceof Result) { + Result result = (Result)obj; + return Objects.equal(result.tableId,tableId) && + Objects.equal(result.location,location) && + Objects.equal(result.partitionSpec,partitionSpec) && + Objects.equal(result.update.getDataFiles().get(0),update.getDataFiles().get(0)) && + Objects.equal(destination,result.destination); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableId,location,update.getDataFiles().get(0),partitionSpec,destination); + } + + @Override + public String toString() { + return "Result{" + + "table='" + + tableId + + '\'' + + "location='" + + location + + '\'' + + ", fileByteSize=" + + update.getDataFiles().get(0).fileSizeInBytes() + + ", destination=" + + destination + + '}'; + } + } + + public static class ResultCoder extends StructuredCoder> { + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final Coder metadataCoder = MetadataUpdate.coder(); + + private static final SerializableCoder partitionSpecCoder = SerializableCoder.of(PartitionSpec.class); + + + private final Coder destinationCoder; + + public ResultCoder(Coder destinationCoder) { + this.destinationCoder = destinationCoder; + } + + @Override + public void encode(Result value, + @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws @UnknownKeyFor@NonNull@Initialized CoderException, @UnknownKeyFor@NonNull@Initialized IOException { + + //Convert most everything to Avro for serialization + + + //Table id and location are strings + stringCoder.encode(value.tableId, outStream); + stringCoder.encode(value.location,outStream); + //PartitionSpec is Java serialized because we need it to decode DataFile + destinationCoder.encode(value.destination,outStream); + metadataCoder.encode(value.update,outStream); + partitionSpecCoder.encode(value.partitionSpec,outStream); + } + + @Override + public Result decode(InputStream inStream) + throws CoderException, IOException { + String tableId = stringCoder.decode(inStream); + String location = stringCoder.decode(inStream); + DestinationT dest = destinationCoder.decode(inStream); + MetadataUpdate update = metadataCoder.decode(inStream); + PartitionSpec spec = partitionSpecCoder.decode(inStream); + return new Result<>(tableId,location,update.getDataFiles().get(0),spec,dest); + } + + @Override + public List> getCoderArguments() { + return Collections.singletonList(destinationCoder); + } + + @Override + public void verifyDeterministic() + throws NonDeterministicException { } + + public static ResultCoder of(Coder destinationCoder) { + return new ResultCoder<>(destinationCoder); + } + + } + + public WriteBundlesToFiles( + PCollectionView locationPrefixView, + TupleTag,ElementT>> successfulWritesTag, + TupleTag,ElementT>> unwrittenRecordsTag, + int maximumWritersPerBundle, + long maxFileSize, + RecordWriterFactory recordWriterFactory + ) { + this.locationPrefixView = locationPrefixView; + this.successfulWritesTag = successfulWritesTag; + this.unwrittenRecordsTag = unwrittenRecordsTag; + this.maxWritersPerBundle = maximumWritersPerBundle; + this.maxFileSize = maxFileSize; + this.recordWriterFactory = recordWriterFactory; + } + + @StartBundle + public void startBundle() { + this.writers = Maps.newHashMap(); + this.windows = Maps.newHashMap(); + this.spilledShardNumber = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); + } + + RecordWriter createWriter( + DestinationT destination, + String location,BoundedWindow window) throws Exception { + Map windows = Preconditions.checkNotNull(this.windows); + Map> writers = Preconditions.checkNotNull(this.writers); + RecordWriter writer = recordWriterFactory.createWriter(location,destination); + windows.put(destination,window); + writers.put(destination,writer); + return writer; + } + + @ProcessElement + public void processElement(ProcessContext c, + @Element KV element,BoundedWindow window) + throws Exception { + Map> writers = Preconditions.checkNotNull(this.writers); + String locationPrefix = c.sideInput(locationPrefixView); + DestinationT destination = element.getKey(); + RecordWriter writer; + if(writers.containsKey(destination)) { + writer = writers.get(destination); + } else { + if(writers.size() <= maxWritersPerBundle) { + writer = createWriter(destination,locationPrefix,window); + } else { + c.output(unwrittenRecordsTag, + KV.of(ShardedKey.of(destination,++spilledShardNumber % SPILLED_RECORD_SHARDING_FACTOR), + element.getValue())); + return; + } + } + + if(writer.bytesWritten() > maxFileSize) { + writer.close(); + Table t = writer.table(); + + c.output(new Result<>(t.name(),writer.location(),writer.dataFile(),t.spec(),destination)); + writer = createWriter(destination,locationPrefix,window); + } + + try { + writer.write(element.getValue()); + c.output(successfulWritesTag, + KV.of(ShardedKey.of(destination,spilledShardNumber % SPILLED_RECORD_SHARDING_FACTOR), + element.getValue())); + } catch(Exception e) { + try { + writer.close(); + } catch(Exception closeException) { + e.addSuppressed(closeException); + } + throw e; + } + + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) throws Exception { + Map windows = Preconditions.checkNotNull(this.windows); + Map> writers = Preconditions.checkNotNull(this.writers); + List exceptionList = Lists.newArrayList(); + for(RecordWriter writer : writers.values()) { + try { + writer.close(); + } catch(Exception e) { + exceptionList.add(e); + } + } + if(!exceptionList.isEmpty()) { + Exception e = new IOException("Exception closing some writers."); + for(Exception thrown : exceptionList) { + e.addSuppressed(thrown); + } + throw e; + } + + exceptionList.clear(); + for(Map.Entry> entry : writers.entrySet()) { + try { + DestinationT destination = entry.getKey(); + + RecordWriter writer = entry.getValue(); + BoundedWindow window = windows.get(destination); + Preconditions.checkNotNull(window); + Table t = writer.table(); + c.output(new Result<>(t.name(),writer.location(), writer.dataFile(), t.spec(),destination), + window.maxTimestamp(), + window); + } catch(Exception e) { + exceptionList.add(e); + } + } + writers.clear(); + if(!exceptionList.isEmpty()) { + Exception e = new IOException("Exception emitting writer metadata."); + for(Exception thrown : exceptionList) { + e.addSuppressed(thrown); + } + throw e; + } + } + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteGroupedRecordsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteGroupedRecordsToFiles.java new file mode 100644 index 000000000000..6f9a59b387ae --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteGroupedRecordsToFiles.java @@ -0,0 +1,39 @@ +package org.apache.beam.io.iceberg; + +import org.apache.beam.io.iceberg.WriteBundlesToFiles.Result; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +@SuppressWarnings("all") +public class WriteGroupedRecordsToFiles + extends DoFn>, Result> { + + private final PCollectionView locationPrefixView; + private final long maxFileSize; + private final RecordWriterFactory recordWriterFactory; + + WriteGroupedRecordsToFiles( + PCollectionView locationPrefixView, + long maxFileSize, + RecordWriterFactory recordWriterFactory) { + this.locationPrefixView = locationPrefixView; + this.maxFileSize = maxFileSize; + this.recordWriterFactory = recordWriterFactory; + } + + @ProcessElement + public void processElement(ProcessContext c,@Element KV> element) + throws Exception { + String locationPrefix = c.sideInput(locationPrefixView); + DestinationT destination = element.getKey(); + RecordWriter writer = recordWriterFactory.createWriter(locationPrefix,destination); + for(ElementT e : element.getValue()) { + writer.write(e); + } + writer.close(); + c.output(new Result<>(writer.table().name(),writer.location(), writer.dataFile(), writer.table().spec(), destination)); + } + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/package-info.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/package-info.java new file mode 100644 index 000000000000..5ffdf14c7f77 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/package-info.java @@ -0,0 +1 @@ +package org.apache.beam.io.iceberg; \ No newline at end of file diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/PropertyBuilder.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/PropertyBuilder.java new file mode 100644 index 000000000000..54fc7d51c661 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/PropertyBuilder.java @@ -0,0 +1,24 @@ +package org.apache.beam.io.iceberg.util; + +import com.google.common.collect.ImmutableMap; +import javax.annotation.Nullable; + +/** + * Convenience utility class to build immutable maps that drops attempts + * to set null values. + */ +public class PropertyBuilder { + + ImmutableMap.Builder builder = ImmutableMap.builder(); + + public PropertyBuilder put(String key,@Nullable Object value) { + if(value != null) { + builder = builder.put(key,""+value); + } + return this; + } + + public ImmutableMap build() { + return builder.build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/RowHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/RowHelper.java new file mode 100644 index 000000000000..e3124fb3f1c7 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/RowHelper.java @@ -0,0 +1,76 @@ +package org.apache.beam.io.iceberg.util; + +import java.util.Optional; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types.NestedField; + +public class RowHelper { + private RowHelper() { } + + public static void copyInto(Record rec,NestedField field,Row value) { + String name = field.name(); + switch(field.type().typeId()) { + case BOOLEAN: + Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> rec.setField(name,v)); + break; + case INTEGER: + Optional.ofNullable(value.getInt32(name)).ifPresent(v -> rec.setField(name,v)); + break; + case LONG: + Optional.ofNullable(value.getInt64(name)).ifPresent(v -> rec.setField(name,v)); + break; + case FLOAT: + Optional.ofNullable(value.getFloat(name)).ifPresent(v -> rec.setField(name,v)); + break; + case DOUBLE: + Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name,v)); + break; + case DATE: + break; + case TIME: + break; + case TIMESTAMP: + break; + case STRING: + Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name,v)); + break; + case UUID: + break; + case FIXED: + break; + case BINARY: + break; + case DECIMAL: + break; + case STRUCT: + Optional.ofNullable(value.getRow(name)) + .ifPresent(row -> rec.setField(name, + copy(GenericRecord.create(field.type().asStructType()),row))); + break; + case LIST: + break; + case MAP: + break; + } + } + + public static Record copy(Record baseRecord, Row value) { + Record rec = baseRecord.copy(); + for(NestedField f : rec.struct().fields()) { + copyInto(rec,f,value); + } + return rec; + } + + public static SerializableBiFunction recordsFromRows() { + return new SerializableBiFunction() { + @Override + public Record apply(Record record, Row row) { + return copy(record,row); + } + }; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/ScanHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/ScanHelper.java new file mode 100644 index 000000000000..b69bf19a8045 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/ScanHelper.java @@ -0,0 +1,22 @@ +package org.apache.beam.io.iceberg.util; + +import org.apache.beam.io.iceberg.Iceberg; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; + +public class ScanHelper { + + public static boolean isIncremental(Iceberg.Scan scan) { + if(scan.getFromSnapshotExclusive() != null) { + return true; + } + return false; + } + + public static TableScan tableScan(Table table,Iceberg.Scan scan) { + TableScan tableScan = table.newScan(); + return tableScan; + } + + +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/SchemaHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/SchemaHelper.java new file mode 100644 index 000000000000..cc19d8f7d75a --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/util/SchemaHelper.java @@ -0,0 +1,95 @@ +package org.apache.beam.io.iceberg.util; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +@SuppressWarnings({"dereference.of.nullable"}) +public class SchemaHelper { + + private SchemaHelper() { } + + public static String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID"; + + public static Schema.FieldType fieldTypeForType(final Type type) { + switch(type.typeId()) { + case BOOLEAN: + return FieldType.BOOLEAN; + case INTEGER: + return FieldType.INT32; + case LONG: + return FieldType.INT64; + case FLOAT: + return FieldType.FLOAT; + case DOUBLE: + return FieldType.DOUBLE; + case DATE: case TIME: case TIMESTAMP: //TODO: Logical types? + return FieldType.DATETIME; + case STRING: + return FieldType.STRING; + case UUID: + case BINARY: + return FieldType.BYTES; + case FIXED:case DECIMAL: + return FieldType.DECIMAL; + case STRUCT: + return FieldType.row(convert(type.asStructType())); + case LIST: + return FieldType.iterable(fieldTypeForType(type.asListType().elementType())); + case MAP: + return FieldType.map(fieldTypeForType(type.asMapType().keyType()), + fieldTypeForType(type.asMapType().valueType())); + } + throw new RuntimeException("Unrecognized Iceberg Type"); + } + + public static Schema.Field convert(final Types.NestedField field) { + return Schema.Field.of(field.name(),fieldTypeForType(field.type())) + .withOptions(Schema.Options.builder() + .setOption(ICEBERG_TYPE_OPTION_NAME, + Schema.FieldType.STRING,field.type().typeId().name()) + .build()) + .withNullable(field.isOptional()); + } + public static Schema convert(final org.apache.iceberg.Schema schema) { + Schema.Builder builder = Schema.builder(); + for(Types.NestedField f : schema.columns()) { + builder.addField(convert(f)); + } + return builder.build(); + } + + public static Schema convert(final Types.StructType struct) { + Schema.Builder builder = Schema.builder(); + for(Types.NestedField f : struct.fields()) { + builder.addField(convert(f)); + } + return builder.build(); + } + + public static Types.NestedField convert(int fieldId,final Schema.Field field) { + String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME,String.class); + if(typeId != null) { + return Types.NestedField.of( + fieldId, + field.getType().getNullable(), + field.getName(), + Types.fromPrimitiveString(typeId)); + } else { + return Types.NestedField.of(fieldId, + field.getType().getNullable(), + field.getName(), + Types.StringType.get()); + } + } + + public static org.apache.iceberg.Schema convert(final Schema schema) { + Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()]; + int fieldId = 0; + for(Schema.Field f : schema.getFields()) { + fields[fieldId++] = convert(fieldId,f); + } + return new org.apache.iceberg.Schema(fields); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/BoundedScanTests.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/BoundedScanTests.java new file mode 100644 index 000000000000..8c2d8cbacb4c --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/BoundedScanTests.java @@ -0,0 +1,79 @@ +package org.apache.beam.io.iceberg; + +import com.google.common.collect.ImmutableMap; +import org.apache.beam.io.iceberg.util.SchemaHelper; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class BoundedScanTests { + + private static Logger LOG = LoggerFactory.getLogger(BoundedScanTests.class); + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public TestDataWarehouse warehouse = new TestDataWarehouse(temporaryFolder,"default"); + + @Rule + public TestPipeline testPipeline = TestPipeline.create(); + + static class PrintRow extends DoFn { + + @ProcessElement + public void process(@Element Row row, OutputReceiver output) throws Exception { + LOG.info("Got row {}",row); + output.output(row); + } + } + + @Test + public void testSimpleScan() throws Exception { + Table simpleTable = warehouse.createTable(TestFixtures.SCHEMA); + simpleTable.newFastAppend() + .appendFile(warehouse.writeRecords("file1s1.parquet",simpleTable.schema(),TestFixtures.FILE1SNAPSHOT1)) + .appendFile(warehouse.writeRecords("file2s1.parquet",simpleTable.schema(),TestFixtures.FILE2SNAPSHOT1)) + .appendFile(warehouse.writeRecords("file3s1.parquet",simpleTable.schema(),TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + PCollection output = testPipeline + .apply(Read.from(new IcebergBoundedSource(Iceberg.Scan.builder() + .catalog(Iceberg.Catalog.builder() + .name("hadoop") + .icebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .warehouseLocation(warehouse.location) + .build()) + .type(Iceberg.ScanType.TABLE) + .table(simpleTable.name().replace("hadoop.","").split("\\.")) + .schema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .build()))) + .apply(ParDo.of(new PrintRow())) + .setCoder(RowCoder.of(SchemaHelper.convert(TestFixtures.SCHEMA))); + PAssert.that(output); + testPipeline.run(); + + + } + +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/SinkTests.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/SinkTests.java new file mode 100644 index 000000000000..8d65daf1fdca --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/SinkTests.java @@ -0,0 +1,61 @@ +package org.apache.beam.io.iceberg; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.io.iceberg.util.RowHelper; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class SinkTests { + private static Logger LOG = LoggerFactory.getLogger(SinkTests.class); + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public TestDataWarehouse warehouse = new TestDataWarehouse(temporaryFolder,"default"); + + @Rule + public TestPipeline testPipeline = TestPipeline.create(); + + + @Test + public void testSimpleAppend() throws Exception { + //Create a table and add records to it. + Table table = warehouse.createTable(TestFixtures.SCHEMA); + + Iceberg.Catalog catalog = Iceberg.Catalog.builder() + .name("hadoop") + .icebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .warehouseLocation(warehouse.location) + .build(); + + String[] tablePath = table.name() + .replace("hadoop.","").split("\\."); + DynamicDestinations destination = DynamicDestinations.constant(catalog.table() + .tablePath(ImmutableList.copyOf(tablePath)).build()); + LOG.info("Table created. Making pipeline"); + testPipeline + .apply("Records To Add", + Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) + .apply("Append To Table", + new Iceberg.Write( + catalog, + destination, + RowHelper.recordsFromRows())); + LOG.info("Executing pipeline"); + testPipeline.run().waitUntilFinish(); + LOG.info("Done running pipeline"); + } + +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestDataWarehouse.java new file mode 100644 index 000000000000..bae6ee317f6e --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestDataWarehouse.java @@ -0,0 +1,121 @@ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestDataWarehouse extends ExternalResource { + private static final Logger LOG = LoggerFactory.getLogger(TestDataWarehouse.class); + + protected final TemporaryFolder temporaryFolder; + protected final String database; + + protected final Configuration hadoopConf; + + + protected String location; + protected Catalog catalog; + + + public TestDataWarehouse(TemporaryFolder temporaryFolder,String database) { + this.temporaryFolder = temporaryFolder; + this.database = database; + this.hadoopConf = new Configuration(); + } + + @Override + protected void before() throws Throwable { + File warehouseFile = temporaryFolder.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + location = "file:"+warehouseFile.toString(); + catalog = CatalogUtil.loadCatalog( + CatalogUtil.ICEBERG_CATALOG_HADOOP,"hadoop", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION,location),hadoopConf); + } + + @Override + protected void after() { + List tables = catalog.listTables(Namespace.of(database)); + LOG.info("Cleaning up {} tables in test warehouse",tables.size()); + for(TableIdentifier t : tables) { + try { + LOG.info("Removing table {}",t); + catalog.dropTable(t); + } catch(Exception e) { + LOG.error("Unable to remove table",e); + } + } + try { + ((HadoopCatalog) catalog).close(); + } catch(Exception e) { + LOG.error("Unable to close catalog",e); + } + } + + public DataFile writeRecords(String filename,Schema schema,List records) throws IOException { + Path path = new Path(location,filename); + FileFormat format = FileFormat.fromFileName(filename); + + + FileAppender appender; + switch (format) { + case PARQUET: + appender = Parquet.write(fromPath(path,hadoopConf)) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(schema) + .overwrite().build(); + break; + case ORC: + appender = ORC.write(fromPath(path,hadoopConf)) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(schema) + .overwrite().build(); + break; + default: + throw new IOException("Unable to create appender for "+format); + } + appender.addAll(records); + appender.close(); + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(HadoopInputFile.fromPath(path,hadoopConf)) + .withMetrics(appender.metrics()) + .build(); + } + + public Table createTable(Schema schema) { + TableIdentifier table = TableIdentifier.of(database,"table"+Long.toString(UUID.randomUUID().hashCode(),16)); + return catalog.createTable(table,schema); + } + + +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java new file mode 100644 index 000000000000..df772ca55e30 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java @@ -0,0 +1,87 @@ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import org.apache.beam.io.iceberg.util.SchemaHelper; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; + +public class TestFixtures { + public static final Schema SCHEMA = new Schema( + required(1,"id", Types.LongType.get()), + optional(2,"data",Types.StringType.get()) + ); + + private static final Record genericRecord = GenericRecord.create(SCHEMA); + + + /* First file in test table */ + public static final ImmutableList FILE1SNAPSHOT1 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",0L,"data","clarification")), + genericRecord.copy(ImmutableMap.of("id",1L,"data","risky")), + genericRecord.copy(ImmutableMap.of("id",2L,"data","falafel")) + ); + public static final ImmutableList FILE1SNAPSHOT2 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",4L,"data","obscure")), + genericRecord.copy(ImmutableMap.of("id",5L,"data","secure")), + genericRecord.copy(ImmutableMap.of("id",6L,"data","feta")) + ); + public static final ImmutableList FILE1SNAPSHOT3 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",6L,"data","brainy")), + genericRecord.copy(ImmutableMap.of("id",7L,"data","film")), + genericRecord.copy(ImmutableMap.of("id",8L,"data","feta")) + ); + + /* Second file in test table */ + public static final ImmutableList FILE2SNAPSHOT1 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",10L,"data","clammy")), + genericRecord.copy(ImmutableMap.of("id",11L,"data","evacuate")), + genericRecord.copy(ImmutableMap.of("id",12L,"data","tissue")) + ); + public static final ImmutableList FILE2SNAPSHOT2 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",14L,"data","radical")), + genericRecord.copy(ImmutableMap.of("id",15L,"data","collocation")), + genericRecord.copy(ImmutableMap.of("id",16L,"data","book")) + ); + public static final ImmutableList FILE2SNAPSHOT3 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",16L,"data","cake")), + genericRecord.copy(ImmutableMap.of("id",17L,"data","intrinsic")), + genericRecord.copy(ImmutableMap.of("id",18L,"data","paper")) + ); + + /* Third file in test table */ + public static final ImmutableList FILE3SNAPSHOT1 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",20L,"data","ocean")), + genericRecord.copy(ImmutableMap.of("id",21L,"data","holistic")), + genericRecord.copy(ImmutableMap.of("id",22L,"data","preventative")) + ); + public static final ImmutableList FILE3SNAPSHOT2 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",24L,"data","cloud")), + genericRecord.copy(ImmutableMap.of("id",25L,"data","zen")), + genericRecord.copy(ImmutableMap.of("id",26L,"data","sky")) + ); + public static final ImmutableList FILE3SNAPSHOT3 = ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id",26L,"data","belleview")), + genericRecord.copy(ImmutableMap.of("id",27L,"data","overview")), + genericRecord.copy(ImmutableMap.of("id",28L,"data","tender")) + ); + + public static final ImmutableList asRows(Iterable records) { + ArrayList rows = new ArrayList<>(); + for(Record record : records) { + rows.add(Row.withSchema(SchemaHelper.convert(SCHEMA)) + .withFieldValue("id",record.getField("id")) + .withFieldValue("data",record.getField("data")) + .build()); + } + return ImmutableList.copyOf(rows); + } + +} diff --git a/sdks/java/io/parquet/build.gradle b/sdks/java/io/parquet/build.gradle index e7e06e9cca3c..e8f1603f0b58 100644 --- a/sdks/java/io/parquet/build.gradle +++ b/sdks/java/io/parquet/build.gradle @@ -35,7 +35,7 @@ def hadoopVersions = [ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} -def parquet_version = "1.12.0" +def parquet_version = "1.13.1" dependencies { implementation library.java.vendored_guava_32_1_2_jre diff --git a/settings.gradle.kts b/settings.gradle.kts index 3bd606327703..e9b2ea0b3aa5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -358,3 +358,9 @@ include("sdks:java:io:kafka:kafka-100") findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100" include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" +include("sdks:java:extensions:iceberg") +findProject(":sdks:java:extensions:iceberg")?.name = "iceberg" +include("sdks:java:io:iceberg") +findProject(":sdks:java:io:iceberg")?.name = "iceberg" +include("sdks:java:io:catalog") +findProject(":sdks:java:io:catalog")?.name = "catalog"