diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..34a6e02150e7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 4 } diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json index 8fab48cc672a..5abe02fc09c7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 5 + "modification": 1 } diff --git a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json index 8fab48cc672a..5abe02fc09c7 100644 --- a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 5 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 5ac8a7f3f6ee..833fd9b0d174 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 4 + "modification": 2 } diff --git a/CHANGES.md b/CHANGES.md index 73bc6850b57b..f74f633638de 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Upgraded Iceberg dependency to 1.9.2 ([#35981](https://github.com/apache/beam/pull/35981)) ## New Features / Improvements @@ -98,6 +99,7 @@ improves support for BigQuery and other SQL dialects. Note: Minor behavior changes are observed such as output significant digits related to casting. * (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. +* Dropped Java 8 support for [IO expansion-service](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service). Cross-language pipelines using this expansion service will need a Java11+ runtime ([#35981](https://github.com/apache/beam/pull/35981). ## Deprecations diff --git a/build.gradle.kts b/build.gradle.kts index f4174d6376d5..c1167f8f72e2 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -253,6 +253,7 @@ tasks.register("javaPreCommit") { dependsOn(":examples:java:sql:preCommit") dependsOn(":examples:java:twitter:build") dependsOn(":examples:java:twitter:preCommit") + dependsOn(":examples:java:iceberg:build") dependsOn(":examples:multi-language:build") dependsOn(":model:fn-execution:build") dependsOn(":model:job-management:build") @@ -380,6 +381,7 @@ tasks.register("sqlPreCommit") { dependsOn(":sdks:java:extensions:sql:datacatalog:build") dependsOn(":sdks:java:extensions:sql:expansion-service:build") dependsOn(":sdks:java:extensions:sql:hcatalog:build") + dependsOn(":sdks:java:extensions:sql:iceberg:build") dependsOn(":sdks:java:extensions:sql:jdbc:build") dependsOn(":sdks:java:extensions:sql:jdbc:preCommit") dependsOn(":sdks:java:extensions:sql:perf-tests:build") @@ -426,6 +428,7 @@ tasks.register("sqlPostCommit") { dependsOn(":sdks:java:extensions:sql:postCommit") dependsOn(":sdks:java:extensions:sql:jdbc:postCommit") dependsOn(":sdks:java:extensions:sql:datacatalog:postCommit") + dependsOn(":sdks:java:extensions:sql:iceberg:integrationTest") dependsOn(":sdks:java:extensions:sql:hadoopVersionsTest") } diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 0f1a1f7ef7e8..6f35a109998c 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -71,9 +71,6 @@ dependencies { implementation project(":sdks:java:extensions:python") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") - runtimeOnly project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:bqms") - implementation project(":sdks:java:managed") implementation project(":sdks:java:extensions:ml") implementation library.java.avro implementation library.java.bigdataoss_util diff --git a/examples/java/iceberg/build.gradle b/examples/java/iceberg/build.gradle new file mode 100644 index 000000000000..09ef64d32ee3 --- /dev/null +++ b/examples/java/iceberg/build.gradle @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +plugins { + id 'java' + id 'org.apache.beam.module' + id 'com.gradleup.shadow' +} + +applyJavaNature( + exportJavadoc: false, + automaticModuleName: 'org.apache.beam.examples.iceberg', + // iceberg requires Java11+ + requireJavaVersion: JavaVersion.VERSION_11 +) + +description = "Apache Beam :: Examples :: Java :: Iceberg" +ext.summary = """Apache Beam Java SDK examples using IcebergIO.""" + +/** Define the list of runners which execute a precommit test. + * Some runners are run from separate projects, see the preCommit task below + * for details. + */ +def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"] +// The following runners have configuration created but not added to preCommit +def nonPreCommitRunners = ["dataflowRunner", "prismRunner"] +for (String runner : preCommitRunners) { + configurations.create(runner + "PreCommit") +} +for (String runner: nonPreCommitRunners) { + configurations.create(runner + "PreCommit") +} +configurations.sparkRunnerPreCommit { + // Ban certain dependencies to prevent a StackOverflow within Spark + // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14 + exclude group: "org.slf4j", module: "jul-to-slf4j" + exclude group: "org.slf4j", module: "slf4j-jdk14" +} + +dependencies { + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + runtimeOnly project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:extensions:google-cloud-platform-core") + implementation project(":sdks:java:io:google-cloud-platform") + implementation project(":sdks:java:managed") + implementation library.java.google_auth_library_oauth2_http + implementation library.java.joda_time + runtimeOnly project(path: ":runners:direct-java", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre + runtimeOnly library.java.hadoop_client + runtimeOnly library.java.bigdataoss_gcs_connector + + // Add dependencies for the PreCommit configurations + // For each runner a project level dependency on the examples project. + for (String runner : preCommitRunners) { + delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntimeMigration")) + } + directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") + flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}") + sparkRunnerPreCommit project(":runners:spark:3") + sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system") + dataflowRunnerPreCommit project(":runners:google-cloud-dataflow-java") + dataflowRunnerPreCommit project(":runners:google-cloud-dataflow-java:worker") // v2 worker + dataflowRunnerPreCommit project(":sdks:java:harness") // v2 worker + prismRunnerPreCommit project(":runners:prism:java") + + // Add dependency if requested on command line for runner + if (project.hasProperty("runnerDependency")) { + runtimeOnly project(path: project.getProperty("runnerDependency")) + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java index 458f2b545450..2a5f85e524ed 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import java.io.IOException; import java.util.Map; diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java index ecc047a56949..4229e401ab94 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC; diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java index 63dc4ff7056c..0ea73cdf0c87 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import com.google.auth.oauth2.GoogleCredentials; import java.io.IOException; diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java index 446d11d03be4..5b4fe1b9b913 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import java.util.Arrays; import java.util.Map; diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index 2ad4ddf306bb..afbc87f8eeba 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -76,10 +76,6 @@ dependencies { fmppTask "org.freemarker:freemarker:2.3.31" fmppTemplates library.java.vendored_calcite_1_40_0 implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:managed") - implementation project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:bqms") - runtimeOnly project(":sdks:java:io:iceberg:hive") implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:join-library") permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761 @@ -120,9 +116,6 @@ dependencies { permitUnusedDeclared library.java.hadoop_client provided library.java.kafka_clients - implementation "org.apache.iceberg:iceberg-api:1.6.1" - permitUnusedDeclared "org.apache.iceberg:iceberg-api:1.6.1" // errorprone crash cannot find this transient dep - testImplementation "org.apache.iceberg:iceberg-core:1.6.1" testImplementation library.java.vendored_calcite_1_40_0 testImplementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit diff --git a/sdks/java/extensions/sql/iceberg/build.gradle b/sdks/java/extensions/sql/iceberg/build.gradle new file mode 100644 index 000000000000..d5f9e74c53bd --- /dev/null +++ b/sdks/java/extensions/sql/iceberg/build.gradle @@ -0,0 +1,81 @@ +import groovy.json.JsonOutput + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog', + // iceberg requires Java11+ + requireJavaVersion: JavaVersion.VERSION_11, +) + +dependencies { + implementation project(":sdks:java:extensions:sql") + implementation project(":sdks:java:core") + implementation project(":sdks:java:managed") + implementation project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + runtimeOnly project(":sdks:java:io:iceberg:hive") + // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency + provided "org.immutables:value:2.8.8" + permitUnusedDeclared "org.immutables:value:2.8.8" + implementation library.java.slf4j_api + implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.vendored_calcite_1_40_0 + implementation library.java.jackson_databind + + testImplementation library.java.joda_time + testImplementation library.java.junit + testImplementation library.java.google_api_services_bigquery + testImplementation "org.apache.iceberg:iceberg-api:1.9.2" + testImplementation "org.apache.iceberg:iceberg-core:1.9.2" + testImplementation project(":sdks:java:io:google-cloud-platform") + testImplementation project(":sdks:java:extensions:google-cloud-platform-core") +} + +task integrationTest(type: Test) { + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' + + // Disable Gradle cache (it should not be used because the IT's won't run). + outputs.upToDateWhen { false } + + def pipelineOptions = [ + "--project=${gcpProject}", + "--tempLocation=${gcsTempRoot}", + "--blockOnRun=false"] + + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) + + include '**/*IT.class' + + maxParallelForks 4 + classpath = project(":sdks:java:extensions:sql:iceberg") + .sourceSets + .test + .runtimeClasspath + testClassesDirs = files(project(":sdks:java:extensions:sql:iceberg").sourceSets.test.output.classesDirs) + useJUnit { } +} + +configurations.all { + // iceberg-core needs avro:1.12.0 + resolutionStrategy.force 'org.apache.avro:avro:1.12.0' +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java new file mode 100644 index 000000000000..03c524f7b0fc --- /dev/null +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogRegistrar; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +@AutoService(CatalogRegistrar.class) +public class IcebergCatalogRegistrar implements CatalogRegistrar { + @Override + public Iterable> getCatalogs() { + return ImmutableList.>builder().add(IcebergCatalog.class).build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java similarity index 100% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java similarity index 100% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java similarity index 98% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java index a223194b8e91..a7b128b2bca3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java @@ -19,7 +19,6 @@ import static java.lang.String.format; import static java.util.Arrays.asList; -import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; @@ -64,6 +63,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.joda.time.Duration; +import org.joda.time.format.DateTimeFormat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -238,7 +238,9 @@ public void runSqlWriteAndRead(boolean withPartitionFields) (float) 1.0, 1.0, true, - parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZoneUTC() + .parseDateTime("2018-05-28 20:17:40.123"), "varchar", "char", asList("123", "456"), diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java similarity index 100% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java similarity index 98% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java index 7343c9b9a52f..bdd710c861e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql; +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java index 2d94e19c1689..afffa24e6cd7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java @@ -18,16 +18,12 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import com.google.auto.service.AutoService; -import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @AutoService(CatalogRegistrar.class) public class InMemoryCatalogRegistrar implements CatalogRegistrar { @Override public Iterable> getCatalogs() { - return ImmutableList.>builder() - .add(InMemoryCatalog.class) - .add(IcebergCatalog.class) - .build(); + return ImmutableList.>builder().add(InMemoryCatalog.class).build(); } } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index c139315d925f..f115ea5c6467 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -25,6 +25,8 @@ applyJavaNature( exportJavadoc: false, validateShadowJar: false, shadowClosure: {}, + // iceberg requires Java11+ + requireJavaVersion: JavaVersion.VERSION_11 ) // We don't want to use the latest version for the entire beam sdk since beam Java users can override it themselves. @@ -66,18 +68,17 @@ dependencies { permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 implementation project(":sdks:java:managed") permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 - implementation project(":sdks:java:io:iceberg") - permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 - // **** IcebergIO catalogs **** - // HiveCatalog - runtimeOnly project(path: ":sdks:java:io:iceberg:hive") - // BigQueryMetastoreCatalog (Java 11+) - runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') { + // iceberg ended support for Java 8 in 1.7.0 + runtimeOnly project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:hive") + runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + } runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index ba1d27b0e3e6..0f0fa0a2bb9f 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -23,6 +23,8 @@ import java.util.stream.Collectors plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg', + // iceberg ended support for Java 8 in 1.7.0 + requireJavaVersion: JavaVersion.VERSION_11, ) description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" @@ -37,10 +39,7 @@ def hadoopVersions = [ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} -// we cannot upgrade this since the newer iceberg requires Java 11 -// many other modules like examples/expansion use Java 8 and have the iceberg dependency -// def iceberg_version = "1.9.0" -def iceberg_version = "1.6.1" +def iceberg_version = "1.9.2" def parquet_version = "1.15.2" def orc_version = "1.9.2" def hive_version = "3.1.3" @@ -107,6 +106,11 @@ dependencies { } } +configurations.all { + // iceberg-core needs avro:1.12.0 + resolutionStrategy.force 'org.apache.avro:avro:1.12.0' +} + hadoopVersions.each {kv -> configurations."hadoopVersion$kv.key" { resolutionStrategy { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index b240442deb6d..36b74967f0b2 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -34,8 +34,8 @@ import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,11 +59,15 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.ReadableDateTime; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -532,13 +536,35 @@ public void testIdentityPartitioning() throws IOException { assertEquals(1, dataFile.getRecordCount()); // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str List expectedPartitions = new ArrayList<>(); - List dateTypes = Arrays.asList("date", "time", "datetime", "datetime_tz"); - for (Schema.Field field : primitiveTypeSchema.getFields()) { - Object val = checkStateNotNull(row.getValue(field.getName())); - if (dateTypes.contains(field.getName())) { - val = URLEncoder.encode(val.toString(), UTF_8.toString()); + + for (PartitionField field : spec.fields()) { + String name = field.name(); + Type type = spec.schema().findType(name); + Transform transform = (Transform) field.transform(); + String val; + switch (name) { + case "date": + LocalDate localDate = checkStateNotNull(row.getValue(name)); + Integer day = Integer.parseInt(String.valueOf(localDate.toEpochDay())); + val = transform.toHumanString(type, day); + break; + case "time": + LocalTime localTime = checkStateNotNull(row.getValue(name)); + val = transform.toHumanString(type, localTime.toNanoOfDay() / 1000); + break; + case "datetime": + LocalDateTime ldt = checkStateNotNull(row.getValue(name)); + val = transform.toHumanString(type, DateTimeUtil.microsFromTimestamp(ldt)); + break; + case "datetime_tz": + ReadableDateTime dt = checkStateNotNull(row.getDateTime(name)); + val = transform.toHumanString(type, dt.getMillis() * 1000); + break; + default: + val = transform.toHumanString(type, checkStateNotNull(row.getValue(name))); + break; } - expectedPartitions.add(field.getName() + "=" + val); + expectedPartitions.add(name + "=" + URLEncoder.encode(val, UTF_8.toString())); } String expectedPartitionPath = String.join("/", expectedPartitions); assertEquals(expectedPartitionPath, dataFile.getPartitionPath()); diff --git a/settings.gradle.kts b/settings.gradle.kts index 135d9da42b05..a773571e6ca6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -372,3 +372,7 @@ include("sdks:java:io:iceberg:bqms") findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms" include("it:clickhouse") findProject(":it:clickhouse")?.name = "clickhouse" +include("sdks:java:extensions:sql:iceberg") +findProject(":sdks:java:extensions:sql:iceberg")?.name = "iceberg" +include("examples:java:iceberg") +findProject(":examples:java:iceberg")?.name = "iceberg"