diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f6d433498fde..85deadda265c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2228,9 +2228,9 @@ public Write withClustering(Clustering clustering) { /** * Allows writing to clustered tables when {@link #to(SerializableFunction)} or {@link * #to(DynamicDestinations)} is used. The returned {@link TableDestination} objects should - * specify the time partitioning and clustering fields per table. If writing to a single table, - * use {@link #withClustering(Clustering)} instead to pass a {@link Clustering} instance that - * specifies the static clustering fields to use. + * specify the clustering fields per table. If writing to a single table, use {@link + * #withClustering(Clustering)} instead to pass a {@link Clustering} instance that specifies the + * static clustering fields to use. * *

Setting this option enables use of {@link TableDestinationCoderV3} which encodes * clustering information. The updated coder is compatible with non-clustered tables, so can be @@ -2661,11 +2661,6 @@ public WriteResult expand(PCollection input) { "The supplied getTableFunction object can directly set TimePartitioning." + " There is no need to call BigQueryIO.Write.withTimePartitioning."); } - if (getClustering() != null && getClustering().getFields() != null) { - checkArgument( - getJsonTimePartitioning() != null, - "Clustering fields can only be set when TimePartitioning is set."); - } DynamicDestinations dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 3a40b59284b1..737ab4ff41de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -485,10 +485,9 @@ private PendingJob startLoad( } if (timePartitioning != null) { loadConfig.setTimePartitioning(timePartitioning); - // only set clustering if timePartitioning is set - if (clustering != null) { - loadConfig.setClustering(clustering); - } + } + if (clustering != null) { + loadConfig.setClustering(clustering); } if (kmsKey != null) { loadConfig.setDestinationEncryptionConfiguration( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java new file mode 100644 index 000000000000..67777b265885 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Clustering; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.math.BigInteger; +import java.util.Arrays; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration test that clusters sample data in BigQuery. */ +@RunWith(JUnit4.class) +public class BigQueryClusteringIT { + private static final Long EXPECTED_BYTES = 16000L; + private static final BigInteger EXPECTED_ROWS = new BigInteger("1000"); + private static final String WEATHER_SAMPLES_TABLE = + "clouddataflow-readonly:samples.weather_stations"; + private static final String DATASET_NAME = "BigQueryClusteringIT"; + private static final Clustering CLUSTERING = + new Clustering().setFields(Arrays.asList("station_number")); + private static final TableSchema SCHEMA = + new TableSchema() + .setFields( + Arrays.asList( + new TableFieldSchema().setName("station_number").setType("INTEGER"), + new TableFieldSchema().setName("date").setType("DATE"))); + + private Bigquery bqClient; + private BigQueryClusteringITOptions options; + + @Before + public void setUp() { + PipelineOptionsFactory.register(BigQueryClusteringITOptions.class); + options = TestPipeline.testingPipelineOptions().as(BigQueryClusteringITOptions.class); + options.setTempLocation(options.getTempRoot() + "/temp-it/"); + bqClient = BigqueryClient.getNewBigqueryClient(options.getAppName()); + } + + /** Customized PipelineOptions for BigQueryClustering Integration Test. */ + public interface BigQueryClusteringITOptions + extends TestPipelineOptions, ExperimentalOptions, BigQueryOptions { + @Description("Table to read from, specified as " + ":.") + @Default.String(WEATHER_SAMPLES_TABLE) + String getBqcInput(); + + void setBqcInput(String value); + } + + static class KeepStationNumberAndConvertDate extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + String day = (String) c.element().get("day"); + String month = (String) c.element().get("month"); + String year = (String) c.element().get("year"); + + TableRow row = new TableRow(); + row.set("station_number", c.element().get("station_number")); + row.set("date", String.format("%s-%s-%s", year, month, day)); + c.output(row); + } + } + + static class ClusteredDestinations extends DynamicDestinations { + private final String tableName; + + public ClusteredDestinations(String tableName) { + this.tableName = tableName; + } + + @Override + public @Nullable Coder getDestinationCoder() { + return TableDestinationCoderV3.of(); + } + + @Override + public TableDestination getDestination(ValueInSingleWindow element) { + return new TableDestination( + String.format("%s.%s", DATASET_NAME, tableName), null, null, CLUSTERING); + } + + @Override + public TableDestination getTable(TableDestination destination) { + return destination; + } + + @Override + public TableSchema getSchema(TableDestination destination) { + return SCHEMA; + } + } + + @Test + public void testE2EBigQueryClusteringNoPartitionTableFunction() throws Exception { + String tableName = "weather_stations_clustered_table_function_" + System.currentTimeMillis(); + + Pipeline p = Pipeline.create(options); + + p.apply(BigQueryIO.readTableRows().from(options.getBqcInput())) + .apply(ParDo.of(new KeepStationNumberAndConvertDate())) + .apply( + BigQueryIO.writeTableRows() + .to( + (ValueInSingleWindow vsw) -> + new TableDestination( + String.format("%s.%s", DATASET_NAME, tableName), + null, + null, + CLUSTERING)) + .withClustering(CLUSTERING) + .withSchema(SCHEMA) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) + .withMethod(BigQueryIO.Write.Method.DEFAULT)); + + p.run().waitUntilFinish(); + + Table table = bqClient.tables().get(options.getProject(), DATASET_NAME, tableName).execute(); + + Assert.assertEquals(CLUSTERING, table.getClustering()); + Assert.assertEquals(EXPECTED_BYTES, table.getNumBytes()); + Assert.assertEquals(EXPECTED_ROWS, table.getNumRows()); + } + + @Test + public void testE2EBigQueryClusteringNoPartitionDynamicDestinations() throws Exception { + String tableName = + "weather_stations_clustered_dynamic_destinations_" + System.currentTimeMillis(); + + Pipeline p = Pipeline.create(options); + + p.apply(BigQueryIO.readTableRows().from(options.getBqcInput())) + .apply(ParDo.of(new KeepStationNumberAndConvertDate())) + .apply( + BigQueryIO.writeTableRows() + .to(new ClusteredDestinations(tableName)) + .withClustering(CLUSTERING) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) + .withMethod(BigQueryIO.Write.Method.FILE_LOADS)); + + p.run().waitUntilFinish(); + + Table table = bqClient.tables().get(options.getProject(), DATASET_NAME, tableName).execute(); + + Assert.assertEquals(CLUSTERING, table.getClustering()); + Assert.assertEquals(EXPECTED_ROWS, table.getNumRows()); + Assert.assertEquals(EXPECTED_BYTES, table.getNumBytes()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index aa8951edd036..14451d225970 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -512,15 +512,6 @@ public void testClusteringStorageApi() throws Exception { } } - @Test(expected = IllegalArgumentException.class) - public void testClusteringThrowsWithoutPartitioning() throws Exception { - if (useStorageApi || !useStreaming) { - throw new IllegalArgumentException(); - } - p.enableAbandonedNodeEnforcement(false); - testTimePartitioningClustering(Method.STREAMING_INSERTS, false, true); - } - @Test public void testClusteringTableFunction() throws Exception { TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", "1");