diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 89e73b29da05..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 6 + "modification": 3 } diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index cdc145356730..d6a9f7b67f24 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,7 +78,7 @@ * [IcebergIO] Now available with Beam SQL! ([#34799](https://github.com/apache/beam/pull/34799)) * [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856)) * [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827)) -* [IcebergIO] Create tables with a specified partition spec ([#34966](https://github.com/apache/beam/pull/34966)) +* [IcebergIO] Create tables with a specified partition spec ([#34966](https://github.com/apache/beam/pull/34966), [#35268](https://github.com/apache/beam/pull/35268)) * [IcebergIO] Dynamically create namespaces if needed ([#35228](https://github.com/apache/beam/pull/35228)) ## New Features / Improvements diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index a73bd2518fe0..259402aa16fb 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -114,6 +114,9 @@ dependencies { provided library.java.hadoop_client permitUnusedDeclared library.java.hadoop_client provided library.java.kafka_clients + + testImplementation "org.apache.iceberg:iceberg-api:1.6.1" + testImplementation "org.apache.iceberg:iceberg-core:1.6.1" testImplementation library.java.vendored_calcite_1_28_0 testImplementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp index 677230cd87b9..a51d0125a3f1 100644 --- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -45,6 +45,7 @@ data: { "LOCATION" "TBLPROPERTIES" "PROPERTIES" + "PARTITIONED" ] # List of keywords from "keywords" section that are not reserved. diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index 234c745168e3..78940ee69380 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -255,12 +255,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) : } } +SqlNodeList PartitionFieldList() : +{ + final List list = new ArrayList(); + SqlNode field; +} +{ + field = StringLiteral() { list.add(field); } + ( + field = StringLiteral() { list.add(field); } + )* + { + return new SqlNodeList(list, getPos()); + } +} + /** * Note: This example is probably out of sync with the code. * - * CREATE TABLE ( IF NOT EXISTS )? + * CREATE EXTERNAL TABLE ( IF NOT EXISTS )? * ( database_name '.' )? table_name '(' column_def ( ',' column_def )* ')' * TYPE type_name + * ( PARTITIONED BY '(' partition_field ( ',' partition_field )* ')' )? * ( COMMENT comment_string )? * ( LOCATION location_string )? * ( TBLPROPERTIES tbl_properties )? @@ -271,6 +287,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) : final SqlIdentifier id; List fieldList = null; final SqlNode type; + SqlNodeList partitionFields = null; SqlNode comment = null; SqlNode location = null; SqlNode tblProperties = null; @@ -290,6 +307,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) : | type = SimpleIdentifier() ) + [ partitionFields = PartitionFieldList() ] [ comment = StringLiteral() ] [ location = StringLiteral() ] [ tblProperties = StringLiteral() ] @@ -302,6 +320,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) : id, fieldList, type, + partitionFields, comment, location, tblProperties); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java index 3a94ad8e592c..6275e8c00c38 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java @@ -19,9 +19,11 @@ import static org.apache.beam.sdk.schemas.Schema.toSchema; import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Static.RESOURCE; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; @@ -33,12 +35,14 @@ import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNodeList; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlSpecialOperator; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlUtil; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlWriter; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Pair; +import org.checkerframework.checker.nullness.qual.Nullable; /** Parse tree for {@code CREATE EXTERNAL TABLE} statement. */ @SuppressWarnings({ @@ -51,6 +55,7 @@ public class SqlCreateExternalTable extends SqlCreate implements BeamSqlParser.E private final SqlNode comment; private final SqlNode location; private final SqlNode tblProperties; + private final @Nullable SqlNodeList partitionFields; private static final SqlOperator OPERATOR = new SqlSpecialOperator("CREATE EXTERNAL TABLE", SqlKind.OTHER_DDL); @@ -63,6 +68,7 @@ public SqlCreateExternalTable( SqlIdentifier name, List columnList, SqlNode type, + SqlNodeList partitionFields, SqlNode comment, SqlNode location, SqlNode tblProperties) { @@ -70,6 +76,7 @@ public SqlCreateExternalTable( this.name = checkNotNull(name); this.columnList = columnList; // may be null this.type = checkNotNull(type); + this.partitionFields = partitionFields; this.comment = comment; // may be null this.location = location; // may be null this.tblProperties = tblProperties; // may be null @@ -98,6 +105,19 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { } writer.keyword("TYPE"); type.unparse(writer, 0, 0); + if (partitionFields != null) { + writer.keyword("PARTITIONED"); + writer.keyword("BY"); + writer.sep("("); + for (int i = 0; i < partitionFields.size(); i++) { + if (i > 0) { + writer.sep(","); + } + SqlNode field = partitionFields.get(i); + field.unparse(writer, 0, 0); + } + writer.sep(")"); + } if (comment != null) { writer.keyword("COMMENT"); comment.unparse(writer, 0, 0); @@ -130,7 +150,17 @@ public void execute(CalcitePrepare.Context context) { name.getParserPosition(), RESOURCE.internal("Schema is not instanceof BeamCalciteSchema")); } + BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema; + if (partitionFields != null) { + checkArgument( + schema.resolveMetastore().supportsPartitioning(), + "Invalid use of 'PARTITIONED BY()': Table '%s' of type '%s' " + + "does not support partitioning.", + SqlDdlNodes.name(name), + SqlDdlNodes.getString(type)); + } + schema.resolveMetastore().createTable(toTable()); } @@ -149,11 +179,19 @@ private void unparseColumn(SqlWriter writer, Schema.Field column) { } } + private @Nullable List parsePartitionFields() { + if (partitionFields == null) { + return null; + } + return partitionFields.stream().map(SqlDdlNodes::getString).collect(Collectors.toList()); + } + private Table toTable() { return Table.builder() .type(SqlDdlNodes.getString(type)) .name(SqlDdlNodes.name(name)) .schema(columnList.stream().collect(toSchema())) + .partitionFields(parsePartitionFields()) .comment(SqlDdlNodes.getString(comment)) .location(SqlDdlNodes.getString(location)) .properties( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java index 23f301dd2455..3b72baa9b38e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.value.AutoValue; import java.io.Serializable; +import java.util.List; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.schemas.Schema; import org.checkerframework.checker.nullness.qual.Nullable; @@ -34,6 +35,8 @@ public abstract class Table implements Serializable { public abstract Schema getSchema(); + public abstract @Nullable List getPartitionFields(); + public abstract @Nullable String getComment(); public abstract @Nullable String getLocation(); @@ -55,6 +58,8 @@ public abstract static class Builder { public abstract Builder schema(Schema getSchema); + public abstract Builder partitionFields(List fields); + public abstract Builder comment(String name); public abstract Builder location(String location); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index 8757c2357861..68ab13ef6187 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -25,7 +25,7 @@ public class InMemoryCatalog implements Catalog { private final String name; private final Map properties; - protected final InMemoryMetaStore metaStore = new InMemoryMetaStore(); + private final InMemoryMetaStore metaStore = new InMemoryMetaStore(); public InMemoryCatalog(String name, Map properties) { this.name = name; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java index a689d89132c0..148d2ee30dea 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java @@ -79,4 +79,8 @@ default Set getSubProviders() { default TableProvider getSubProvider(String name) { return null; } + + default boolean supportsPartitioning() { + return false; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java index 85fd6f4efc17..e8ff54fcdcfc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java @@ -19,13 +19,21 @@ import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog; +import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; public class IcebergCatalog extends InMemoryCatalog { + private final IcebergMetastore metaStore = new IcebergMetastore(); + public IcebergCatalog(String name, Map properties) { super(name, properties); metaStore.registerProvider(new IcebergTableProvider(name, properties)); } + @Override + public MetaStore metaStore() { + return metaStore; + } + @Override public String type() { return "iceberg"; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java new file mode 100644 index 000000000000..57c2c7c100a7 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; + +public class IcebergMetastore extends InMemoryMetaStore { + @Override + public boolean supportsPartitioning() { + return true; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java index 9a87edff2a21..596a1d6d0457 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java @@ -64,6 +64,7 @@ class IcebergTable extends SchemaBaseBeamTable { @VisibleForTesting final String tableIdentifier; @VisibleForTesting final IcebergCatalogConfig catalogConfig; @VisibleForTesting @Nullable Integer triggeringFrequency; + @VisibleForTesting final @Nullable List partitionFields; IcebergTable(Table table, IcebergCatalogConfig catalogConfig) { super(table.getSchema()); @@ -74,6 +75,7 @@ class IcebergTable extends SchemaBaseBeamTable { if (properties.has(TRIGGERING_FREQUENCY_FIELD)) { this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt(); } + this.partitionFields = table.getPartitionFields(); } @Override @@ -83,6 +85,9 @@ public POutput buildIOWriter(PCollection input) { if (triggeringFrequency != null) { configBuilder.put(TRIGGERING_FREQUENCY_FIELD, triggeringFrequency); } + if (partitionFields != null) { + configBuilder.put("partition_fields", partitionFields); + } return input.apply(Managed.write(Managed.ICEBERG).withConfig(configBuilder.build())); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java index e337074c9585..4c64dad1b8dd 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java @@ -60,4 +60,9 @@ public String getTableType() { public BeamSqlTable buildBeamSqlTable(Table table) { return new IcebergTable(table, catalogConfig); } + + @Override + public boolean supportsPartitioning() { + return true; + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java index 87f26d5f4f15..dcbb0cfec09a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java @@ -53,6 +53,26 @@ public class BeamSqlCliTest { @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Test + public void testExecute_createTextTable_invalidPartitioningError() { + InMemoryMetaStore metaStore = new InMemoryMetaStore(); + metaStore.registerProvider(new TextTableProvider()); + + BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Invalid use of 'PARTITIONED BY()': Table 'person' of type 'text' does not support partitioning."); + cli.execute( + "CREATE EXTERNAL TABLE person (\n" + + "id int COMMENT 'id', \n" + + "name varchar COMMENT 'name', \n" + + "age int COMMENT 'age') \n" + + "TYPE 'text' \n" + + "PARTITIONED BY ('id', 'name') \n" + + "COMMENT '' LOCATION '/home/admin/orders'"); + } + @Test public void testExecute_createTextTable() throws Exception { InMemoryMetaStore metaStore = new InMemoryMetaStore(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java index 59f3b7650c7c..c757843b5f1f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.junit.Assert.assertEquals; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpStatusCodes; @@ -36,6 +37,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.BackOff; @@ -45,6 +47,11 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.AfterClass; @@ -65,6 +72,8 @@ public class PubsubToIcebergIT implements Serializable { @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Rule public transient TestPubsub pubsub = TestPubsub.create(); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("PubsubToIcebergIT"); + private static final String BQMS_CATALOG = + "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"; static final String DATASET = "sql_pubsub_to_iceberg_it_" + System.nanoTime(); static String warehouse; protected static final GcpOptions OPTIONS = @@ -84,7 +93,7 @@ public static void createDataset() throws IOException, InterruptedException { "CREATE CATALOG my_catalog \n" + "TYPE iceberg \n" + "PROPERTIES (\n" - + " 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n" + + format(" 'catalog-impl' = '%s', \n", BQMS_CATALOG) + " 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n" + format(" 'warehouse' = '%s', \n", warehouse) + format(" 'gcp_project' = '%s', \n", OPTIONS.getProject()) @@ -128,6 +137,7 @@ public void testSimpleInsert() throws Exception { + " name VARCHAR \n " + ") \n" + "TYPE 'iceberg' \n" + + "PARTITIONED BY('id', 'truncate(name, 3)') \n" + "LOCATION '" + tableIdentifier + "' \n" @@ -156,6 +166,25 @@ public void testSimpleInsert() throws Exception { pubsub.publish(messages); validateRowsWritten(); + + // verify the table was created with the right partition spec + Catalog icebergCatalog = + CatalogUtil.loadCatalog( + BQMS_CATALOG, + "my_catalog", + ImmutableMap.builder() + .put("gcp_project", OPTIONS.getProject()) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .build(), + null); + PartitionSpec expectedSpec = + PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(SOURCE_SCHEMA)) + .identity("id") + .truncate("name", 3) + .build(); + Table table = icebergCatalog.loadTable(TableIdentifier.parse(tableIdentifier)); + assertEquals(expectedSpec, table.spec()); } @Test diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java index fd3c18b6072a..bd1bbcef5576 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java @@ -48,12 +48,19 @@ import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -90,6 +97,8 @@ public class IcebergReadWriteIT { @Rule public TestName testName = new TestName(); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("IcebergReadWriteIT"); + private static final String BQMS_CATALOG = + "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"; static final String DATASET = "iceberg_sql_tests_" + System.nanoTime(); static String warehouse; protected static final GcpOptions OPTIONS = @@ -113,6 +122,16 @@ public static void deleteDataset() { @Test public void testSqlWriteAndRead() throws IOException, InterruptedException { + runSqlWriteAndRead(false); + } + + @Test + public void testSqlWriteWithPartitionFieldsAndRead() throws IOException, InterruptedException { + runSqlWriteAndRead(true); + } + + public void runSqlWriteAndRead(boolean withPartitionFields) + throws IOException, InterruptedException { BeamSqlEnv sqlEnv = BeamSqlEnv.builder(new InMemoryCatalogManager()) .setPipelineOptions(PipelineOptionsFactory.create()) @@ -124,7 +143,7 @@ public void testSqlWriteAndRead() throws IOException, InterruptedException { "CREATE CATALOG my_catalog \n" + "TYPE iceberg \n" + "PROPERTIES (\n" - + " 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n" + + format(" 'catalog-impl' = '%s', \n", BQMS_CATALOG) + " 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n" + format(" 'warehouse' = '%s', \n", warehouse) + format(" 'gcp_project' = '%s', \n", OPTIONS.getProject()) @@ -136,6 +155,10 @@ public void testSqlWriteAndRead() throws IOException, InterruptedException { sqlEnv.executeDdl(setCatalog); // 3) create beam table + String partitionFields = + withPartitionFields + ? "PARTITIONED BY ('bucket(c_integer, 5)', 'c_boolean', 'hour(c_timestamp)', 'truncate(c_varchar, 3)') \n" + : ""; String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n" + " c_bigint BIGINT, \n" @@ -150,6 +173,7 @@ public void testSqlWriteAndRead() throws IOException, InterruptedException { + " c_arr_struct ARRAY, c_arr_struct_integer INTEGER>> \n" + ") \n" + "TYPE 'iceberg' \n" + + partitionFields + "LOCATION '" + tableIdentifier + "'"; @@ -206,6 +230,30 @@ public void testSqlWriteAndRead() throws IOException, InterruptedException { PAssert.that(output).containsInAnyOrder(expectedRow); PipelineResult.State state = readPipeline.run().waitUntilFinish(); assertThat(state, equalTo(PipelineResult.State.DONE)); + + // 6) verify the table was created with the right partition spec + Catalog icebergCatalog = + CatalogUtil.loadCatalog( + BQMS_CATALOG, + "my_catalog", + ImmutableMap.builder() + .put("gcp_project", OPTIONS.getProject()) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .build(), + null); + PartitionSpec expectedSpec = PartitionSpec.unpartitioned(); + if (withPartitionFields) { + expectedSpec = + PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(SOURCE_SCHEMA)) + .bucket("c_integer", 5) + .identity("c_boolean") + .hour("c_timestamp") + .truncate("c_varchar", 3) + .build(); + } + Table table = icebergCatalog.loadTable(TableIdentifier.parse(tableIdentifier)); + assertEquals(expectedSpec, table.spec()); } @Test diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java index 316028d7599f..6cde6a626c90 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java @@ -23,12 +23,17 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.List; import java.util.stream.Stream; import org.apache.beam.sdk.extensions.sql.TableUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.calcite.v1_28_0.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Test; @@ -55,29 +60,55 @@ public void testBuildBeamSqlTable() throws Exception { ObjectMapper mapper = new ObjectMapper(); String propertiesString = mapper.writeValueAsString(properties); - Table table = fakeTableWithProperties("my_table", propertiesString); + Table table = + fakeTableBuilder("my_table") + .properties(TableUtils.parseProperties(propertiesString)) + .build(); BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); assertNotNull(sqlTable); assertTrue(sqlTable instanceof IcebergTable); IcebergTable icebergTable = (IcebergTable) sqlTable; - assertEquals("namespace.table", icebergTable.tableIdentifier); + assertEquals("namespace.my_table", icebergTable.tableIdentifier); assertEquals(provider.catalogConfig, icebergTable.catalogConfig); } - private static Table fakeTableWithProperties(String name, String properties) { + @Test + public void testBuildBeamSqlTableWithPartitionFields() { + List partitionFields = ImmutableList.of("id", "truncate(name, 3)"); + InMemoryCatalogManager catalogManager = new InMemoryCatalogManager(); + BeamSqlEnv sqlEnv = + BeamSqlEnv.builder(catalogManager) + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); + + sqlEnv.executeDdl("CREATE CATALOG my_catalog TYPE iceberg"); + sqlEnv.executeDdl("SET CATALOG my_catalog"); + sqlEnv.executeDdl( + "CREATE EXTERNAL TABLE test_partitioned_table(\n" + + " id INTEGER,\n" + + " name VARCHAR) \n" + + "TYPE 'iceberg' \n" + + "PARTITIONED BY ('id', 'truncate(name, 3)') \n" + + "LOCATION 'namespace.test_partitioned_table'"); + + Table result = catalogManager.currentCatalog().metaStore().getTable("test_partitioned_table"); + Table expected = + fakeTableBuilder("test_partitioned_table").partitionFields(partitionFields).build(); + + assertEquals(expected, result); + } + + private static Table.Builder fakeTableBuilder(String name) { return Table.builder() .name(name) - .comment(name + " table") - .location("namespace.table") + .location("namespace." + name) .schema( Stream.of( Schema.Field.nullable("id", Schema.FieldType.INT32), Schema.Field.nullable("name", Schema.FieldType.STRING)) .collect(toSchema())) - .type("iceberg") - .properties(TableUtils.parseProperties(properties)) - .build(); + .type("iceberg"); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 66467e34f09c..a127081f6870 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -298,7 +298,7 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema synchronized (TABLE_CACHE) { // Create namespace if it does not exist yet - if (catalog instanceof SupportsNamespaces) { + if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) { SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; if (!supportsNamespaces.namespaceExists(namespace)) { try {