diff --git a/pom.xml b/pom.xml index cc2e4837a248..c3a6b73dfb1a 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 3.5 1.9 2.24.0 + 1.0.0-rc2 1.8.1 v2-rev295-1.22.0 0.9.6.2 @@ -114,11 +115,13 @@ 0.5.160222 1.4.0 1.3.0 + 1.0.0-rc2 1.0-rc2 1.4.1 0.6.1 1.22.0 1.4.5 + 1.0.2 0.5.160304 20.0 1.2.0 @@ -131,17 +134,20 @@ 1.9.5 4.1.8.Final 1.1.33.Fork26 - 1.5.0.Final 3.2.0 v1-rev10-1.22.0 1.7.14 + 0.16.0-beta 1.6.2 + 4.3.5.RELEASE 3.1.4 v1-rev71-1.22.0 4.4.1 4.3.5.RELEASE - 2.0 1.1.4-M3 + + 1.5.0.Final + 2.0 2.20 2.20 3.6.1 @@ -608,6 +614,12 @@ ${grpc.version} + + com.google.api + api-common + ${google-api-common.version} + + com.google.api-client google-api-client @@ -865,6 +877,18 @@ ${google-cloud-bigdataoss.version} + + com.google.cloud + google-cloud-core + ${google-cloud-core.version} + + + + com.google.cloud + google-cloud-spanner + ${spanner.version} + + com.google.cloud.bigdataoss util diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index ea2d8f0cb290..9143ccf553cf 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -81,11 +81,26 @@ jackson-databind + + io.grpc + grpc-core + + + + com.google.api.grpc + grpc-google-common-protos + + com.google.apis google-api-services-bigquery + + com.google.api + api-common + + com.google.apis google-api-services-pubsub @@ -116,11 +131,6 @@ grpc-auth - - io.grpc - grpc-core - - io.grpc grpc-netty @@ -149,6 +159,16 @@ joda-time + + com.google.cloud + google-cloud-core + + + + com.google.cloud + google-cloud-spanner + + com.google.cloud.bigtable bigtable-protos @@ -184,11 +204,6 @@ google-auth-library-oauth2-http - - com.google.api.grpc - grpc-google-common-protos - - org.slf4j slf4j-api diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java new file mode 100644 index 000000000000..ec119311c106 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Experimental {@link PTransform Transforms} for reading from and writing to + * Google Cloud Spanner. + * + *

Reading from Cloud Spanner

+ * + *

This functionality is not yet implemented. + * + *

Writing to Cloud Spanner

+ * + *

The Cloud Spanner {@link SpannerIO.Write} transform writes to Cloud Spanner by executing a + * collection of input row {@link Mutation Mutations}. The mutations grouped into batches for + * efficiency. + * + *

To configure the write transform, create an instance using {@link #write()} and then specify + * the destination Cloud Spanner instance ({@link Write#withInstanceId(String)} and destination + * database ({@link Write#withDatabaseId(String)}). For example: + * + *

{@code
+ * // Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner.
+ * PCollection mutations = ...;
+ * // Write mutations.
+ * mutations.apply(
+ *     "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
+ * }
+ */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class SpannerIO { + + @VisibleForTesting + static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000; + + /** + * Creates an unitialized instance of {@link Write}. Before use, the {@link Write} must be + * configured with a {@link Write#withInstanceId} and {@link Write#withDatabaseId} that identify + * the Cloud Spanner database being written. + */ + @Experimental + public static Write write() { + return new AutoValue_SpannerIO_Write.Builder().build(); + } + + /** + * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner. + * + * @see SpannerIO + */ + @Experimental(Experimental.Kind.SOURCE_SINK) + @AutoValue + public abstract static class Write extends PTransform, PDone> { + + @Nullable + abstract String getInstanceId(); + + @Nullable + abstract String getDatabaseId(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setInstanceId(String instanceId); + + abstract Builder setDatabaseId(String databaseId); + + abstract Write build(); + } + + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * instance. + * + *

Does not modify this object. + */ + public Write withInstanceId(String instanceId) { + return toBuilder().setInstanceId(instanceId).build(); + } + + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * database. + * + *

Does not modify this object. + */ + public Write withDatabaseId(String databaseId) { + return toBuilder().setDatabaseId(databaseId).build(); + } + + @Override + public PDone expand(PCollection input) { + input.apply("Write mutations to Spanner", + ParDo.of(new SpannerWriterFn( + getInstanceId(), getDatabaseId(), SPANNER_MUTATIONS_PER_COMMIT_LIMIT))); + + return PDone.in(input.getPipeline()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("instanceId", getInstanceId()) + .withLabel("Output Instance")) + .addIfNotNull(DisplayData.item("databaseId", getDatabaseId()) + .withLabel("Output Database")); + } + } + + /** + * {@link DoFn} that writes {@link Mutation}s to Google Cloud Spanner. Mutations are written in + * batches, where the maximum batch size is {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. + * + *

Commits are non-transactional. If a commit fails, it will be retried (up to + * {@link SpannerWriterFn#MAX_RETRIES} times). This means that the mutation operation should be + * idempotent. + * + *

See Google Cloud Spanner documentation. + */ + @VisibleForTesting + static class SpannerWriterFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriterFn.class); + private transient Spanner spanner; + private final String instanceId; + private final String databaseId; + private final int batchSize; + private transient DatabaseClient dbClient; + // Current batch of mutations to be written. + private final List mutations = new ArrayList<>(); + + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); + + @VisibleForTesting + SpannerWriterFn(String instanceId, String databaseId, int batchSize) { + this.instanceId = checkNotNull(instanceId, "instanceId"); + this.databaseId = checkNotNull(databaseId, "databaseId"); + this.batchSize = batchSize; + } + + @Setup + public void setup() throws Exception { + SpannerOptions options = SpannerOptions.newBuilder().build(); + spanner = options.getService(); + dbClient = spanner.getDatabaseClient( + DatabaseId.of(options.getProjectId(), instanceId, databaseId)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Mutation m = c.element(); + mutations.add(m); + int columnCount = m.asMap().size(); + if ((mutations.size() + 1) * columnCount >= batchSize) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle() throws Exception { + if (!mutations.isEmpty()) { + flushBatch(); + } + } + + @Teardown + public void teardown() throws Exception { + if (spanner == null) { + return; + } + spanner.closeAsync().get(); + } + + /** + * Writes a batch of mutations to Cloud Spanner. + * + *

If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. + * If the retry limit is exceeded, the last exception from Cloud Spanner will be + * thrown. + * + * @throws AbortedException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws AbortedException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + + while (true) { + // Batch upsert rows. + try { + dbClient.writeAtLeastOnce(mutations); + + // Break if the commit threw no exception. + break; + } catch (AbortedException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error("Error writing to Spanner ({}): {}", exception.getCode(), + exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations.clear(); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("instanceId", instanceId) + .withLabel("Instance")) + .addIfNotNull(DisplayData.item("databaseId", databaseId) + .withLabel("Database")); + } + } + + private SpannerIO() {} // Prevent construction. + +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java new file mode 100644 index 000000000000..19e468cce041 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

Provides an API for reading from and writing to + * Google Cloud Spanner. + */ +package org.apache.beam.sdk.io.gcp.spanner; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 7025004cc953..91caded1ad35 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -63,7 +63,10 @@ public void testGcpApiSurface() throws Exception { Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class), - // via Bigtable, PR above out to fix. + Matchers.>equalTo(com.google.cloud.ByteArray.class), + Matchers.>equalTo(com.google.cloud.Date.class), + Matchers.>equalTo(com.google.cloud.Timestamp.class), + classesInPackage("com.google.cloud.spanner"), classesInPackage("com.google.datastore.v1"), classesInPackage("com.google.protobuf"), classesInPackage("com.google.type"), @@ -75,7 +78,6 @@ public void testGcpApiSurface() throws Exception { classesInPackage("javax"), classesInPackage("org.apache.beam"), classesInPackage("org.apache.commons.logging"), - // via Bigtable classesInPackage("org.joda.time")); assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));