diff --git a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java new file mode 100644 index 000000000000..eee581da2148 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.spanner; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.util.Collections; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + + + +/** + * Generalized bulk loader for importing CSV files into Spanner. + * + */ +public class SpannerCSVLoader { + + /** + * Command options specification. + */ + private interface Options extends PipelineOptions { + @Description("Create a sample database") + @Default.Boolean(false) + boolean isCreateDatabase(); + void setCreateDatabase(boolean createDatabase); + + @Description("File to read from ") + @Validation.Required + String getInput(); + void setInput(String value); + + @Description("Instance ID to write to in Spanner") + @Validation.Required + String getInstanceId(); + void setInstanceId(String value); + + @Description("Database ID to write to in Spanner") + @Validation.Required + String getDatabaseId(); + void setDatabaseId(String value); + + @Description("Table name") + @Validation.Required + String getTable(); + void setTable(String value); + } + + + /** + * Constructs and executes the processing pipeline based upon command options. + */ + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + Pipeline p = Pipeline.create(options); + PCollection lines = p.apply(TextIO.Read.from(options.getInput())); + PCollection mutations = lines + .apply(ParDo.of(new NaiveParseCsvFn(options.getTable()))); + mutations + .apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId())); + p.run().waitUntilFinish(); + } + + public static void createDatabase(Options options) { + Spanner client = SpannerOptions.getDefaultInstance().getService(); + + DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient(); + try { + databaseAdminClient.dropDatabase(options.getInstanceId(), options + .getDatabaseId()); + } catch (SpannerException e) { + // Does not exist, ignore. + } + Operation op = databaseAdminClient.createDatabase( + options.getInstanceId(), options + .getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " (" + + " Key INT64," + + " Name STRING," + + " Email STRING," + + " Age INT," + + ") PRIMARY KEY (Key)")); + op.waitFor(); + } + + + /** + * A DoFn that creates a Spanner Mutation for each CSV line. + */ + static class NaiveParseCsvFn extends DoFn { + private final String table; + + NaiveParseCsvFn(String table) { + this.table = table; + } + + @ProcessElement + public void processElement(ProcessContext c) { + String line = c.element(); + String[] elements = line.split(","); + if (elements.length != 4) { + return; + } + Mutation mutation = Mutation.newInsertOrUpdateBuilder(table) + .set("Key").to(Long.valueOf(elements[0])) + .set("Name").to(elements[1]) + .set("Email").to(elements[2]) + .set("Age").to(Integer.valueOf(elements[3])) + .build(); + c.output(mutation); + } + } +} diff --git a/pom.xml b/pom.xml index dcca93bd2889..01a48b340fb4 100644 --- a/pom.xml +++ b/pom.xml @@ -139,10 +139,11 @@ 4.4.1 4.3.5.RELEASE 2.0 - -Werror -Xpkginfo:always nothing + 0.16.0-beta + 1.0.0-rc2 pom @@ -824,6 +825,12 @@ ${google-cloud-bigdataoss.version} + + com.google.cloud + google-cloud-spanner + ${spanner.version} + + com.google.cloud.bigdataoss util diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 6c464531ebd8..c0ad4ea7536b 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -232,8 +232,8 @@ joda-time - + org.tukaani xz diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 261d42706616..e7a14892043a 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -81,11 +81,28 @@ jackson-databind + + io.grpc + grpc-core + + + + com.google.api.grpc + grpc-google-common-protos + ${grpc-google-common-protos.version} + + com.google.apis google-api-services-bigquery + + com.google.api + api-common + ${api-common.version} + + com.google.apis google-api-services-storage @@ -121,11 +138,6 @@ grpc-auth - - io.grpc - grpc-core - - io.grpc grpc-netty @@ -160,6 +172,12 @@ joda-time + + com.google.cloud + google-cloud-spanner + ${spanner.version} + + com.google.cloud.bigtable bigtable-protos @@ -197,11 +215,6 @@ google-auth-library-oauth2-http - - com.google.api.grpc - grpc-google-common-protos - - org.slf4j slf4j-api diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java new file mode 100644 index 000000000000..172ed8f02687 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; + +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + + +/** + * Google Cloud Spanner connectors. + * + *

Reading from Cloud Spanner

+ * Status: Not implemented. + * + *

Writing to Cloud Spanner

+ * Status: Experimental. + * + *

{@link SpannerIO#writeTo} batches together and concurrently writes a set of {@link Mutation}s. + * To configure Cloud Spanner sink, you must apply {@link SpannerIO#writeTo} transform to + * {@link PCollection} and specify instance and database identifiers. + * For example, following code sketches out a pipeline that imports data from the CSV file to Cloud + * Spanner. + * + *

{@code
+ *
+ * Pipeline p = ...;
+ * // Read the CSV file.
+ * PCollection lines = p.apply("Read CSV file", TextIO.Read.from(options.getInput()));
+ * // Parse the line and convert to mutation.
+ * PCollection mutations = lines.apply("Parse CSV", parseFromCsv());
+ * // Write mutations.
+ * mutations.apply("Write", SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
+ * p.run();
+ *
+ * }
+ + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class SpannerIO { + + private SpannerIO() { + } + + @VisibleForTesting + static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000; + + /** + * Creates an instance of {@link Writer}. Use {@link Writer#withBatchSize} to limit the batch + * size. + */ + public static Writer writeTo(String instanceId, String databaseId) { + return new Writer(instanceId, databaseId, SPANNER_MUTATIONS_PER_COMMIT_LIMIT); + } + + /** + * A {@link PTransform} that writes {@link Mutation} objects to Cloud Spanner. + * + * @see SpannerIO + */ + public static class Writer extends PTransform, PDone> { + + private final String instanceId; + private final String databaseId; + private int batchSize; + + Writer(String instanceId, String databaseId, int batchSize) { + this.instanceId = instanceId; + this.databaseId = databaseId; + this.batchSize = batchSize; + } + + /** + * Returns a new {@link Writer} with a limit on the number of mutations per batch. + * Defaults to {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. + */ + public Writer withBatchSize(Integer batchSize) { + return new Writer(instanceId, databaseId, batchSize); + } + + @Override + public PDone expand(PCollection input) { + input.apply("Write mutations to Spanner", ParDo.of( + new SpannerWriterFn(instanceId, databaseId, batchSize))); + + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection input) { + checkNotNull(instanceId, "instanceId"); + checkNotNull(databaseId, "databaseId"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("instanceId", instanceId) + .add("databaseId", databaseId) + .toString(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("instanceId", instanceId) + .withLabel("Output Instance")) + .addIfNotNull(DisplayData.item("databaseId", databaseId) + .withLabel("Output Database")); + } + + } + + + /** + * {@link DoFn} that writes {@link Mutation}s to Cloud Spanner. Mutations are written in + * batches, where the maximum batch size is {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. + * + *

See + * + *

Commits are non-transactional. If a commit fails, it will be retried (up to + * {@link SpannerIO#MAX_RETRIES}. times). This means that the + * mutation operation should be idempotent. + */ + @VisibleForTesting + static class SpannerWriterFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriterFn.class); + private transient Spanner spanner; + private final String instanceId; + private final String databaseId; + private final int batchSize; + private transient DatabaseClient dbClient; + // Current batch of mutations to be written. + private final List mutations = new ArrayList<>(); + + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); + + @VisibleForTesting + SpannerWriterFn(String instanceId, String databaseId, int batchSize) { + this.instanceId = checkNotNull(instanceId, "instanceId"); + this.databaseId = checkNotNull(databaseId, "databaseId"); + this.batchSize = batchSize; + } + + @Setup + public void setup() throws Exception { + SpannerOptions options = SpannerOptions.newBuilder().build(); + spanner = options.getService(); + dbClient = spanner.getDatabaseClient( + DatabaseId.of(options.getProjectId(), instanceId, databaseId)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Mutation m = c.element(); + mutations.add(m); + int columnCount = m.asMap().size(); + if ((mutations.size() + 1) * columnCount >= batchSize) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (!mutations.isEmpty()) { + flushBatch(); + } + } + + @Teardown + public void teardown() throws Exception { + if (spanner == null) { + return; + } + spanner.closeAsync().get(); + } + + /** + * Writes a batch of mutations to Cloud Spanner. + * + *

If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. + * If the retry limit is exceeded, the last exception from Cloud Spanner will be + * thrown. + * + * @throws AbortedException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws AbortedException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + + while (true) { + // Batch upsert rows. + try { + dbClient.writeAtLeastOnce(mutations); + + // Break if the commit threw no exception. + break; + } catch (AbortedException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error("Error writing to Spanner ({}): {}", exception.getCode(), + exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations.clear(); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("instanceId", instanceId) + .withLabel("Instance")) + .addIfNotNull(DisplayData.item("databaseId", databaseId) + .withLabel("Database")); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java new file mode 100644 index 000000000000..19e468cce041 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

Provides an API for reading from and writing to + * Google Cloud Spanner. + */ +package org.apache.beam.sdk.io.gcp.spanner; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 7025004cc953..8950452b5db2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -75,6 +75,8 @@ public void testGcpApiSurface() throws Exception { classesInPackage("javax"), classesInPackage("org.apache.beam"), classesInPackage("org.apache.commons.logging"), + classesInPackage("com.google.cloud"), + classesInPackage("com.google.cloud.spanner"), // via Bigtable classesInPackage("org.joda.time"));