From 19b4fd5b69d9e4240ff63805a45ee0c7d9237c5e Mon Sep 17 00:00:00 2001 From: MOLIG004 Date: Thu, 4 May 2017 09:21:23 -0700 Subject: [PATCH 1/6] Initial implementation of SpannerIO.Write This closes #2166. --- .../examples/spanner/SpannerCSVLoader.java | 143 +++++++++ pom.xml | 8 + sdks/java/io/google-cloud-platform/pom.xml | 33 ++- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 272 ++++++++++++++++++ .../beam/sdk/io/gcp/spanner/package-info.java | 23 ++ .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 2 + 6 files changed, 471 insertions(+), 10 deletions(-) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java diff --git a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java new file mode 100644 index 000000000000..eee581da2148 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.spanner; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.util.Collections; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + + + +/** + * Generalized bulk loader for importing CSV files into Spanner. + * + */ +public class SpannerCSVLoader { + + /** + * Command options specification. + */ + private interface Options extends PipelineOptions { + @Description("Create a sample database") + @Default.Boolean(false) + boolean isCreateDatabase(); + void setCreateDatabase(boolean createDatabase); + + @Description("File to read from ") + @Validation.Required + String getInput(); + void setInput(String value); + + @Description("Instance ID to write to in Spanner") + @Validation.Required + String getInstanceId(); + void setInstanceId(String value); + + @Description("Database ID to write to in Spanner") + @Validation.Required + String getDatabaseId(); + void setDatabaseId(String value); + + @Description("Table name") + @Validation.Required + String getTable(); + void setTable(String value); + } + + + /** + * Constructs and executes the processing pipeline based upon command options. + */ + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + Pipeline p = Pipeline.create(options); + PCollection lines = p.apply(TextIO.Read.from(options.getInput())); + PCollection mutations = lines + .apply(ParDo.of(new NaiveParseCsvFn(options.getTable()))); + mutations + .apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId())); + p.run().waitUntilFinish(); + } + + public static void createDatabase(Options options) { + Spanner client = SpannerOptions.getDefaultInstance().getService(); + + DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient(); + try { + databaseAdminClient.dropDatabase(options.getInstanceId(), options + .getDatabaseId()); + } catch (SpannerException e) { + // Does not exist, ignore. + } + Operation op = databaseAdminClient.createDatabase( + options.getInstanceId(), options + .getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " (" + + " Key INT64," + + " Name STRING," + + " Email STRING," + + " Age INT," + + ") PRIMARY KEY (Key)")); + op.waitFor(); + } + + + /** + * A DoFn that creates a Spanner Mutation for each CSV line. + */ + static class NaiveParseCsvFn extends DoFn { + private final String table; + + NaiveParseCsvFn(String table) { + this.table = table; + } + + @ProcessElement + public void processElement(ProcessContext c) { + String line = c.element(); + String[] elements = line.split(","); + if (elements.length != 4) { + return; + } + Mutation mutation = Mutation.newInsertOrUpdateBuilder(table) + .set("Key").to(Long.valueOf(elements[0])) + .set("Name").to(elements[1]) + .set("Email").to(elements[2]) + .set("Age").to(Integer.valueOf(elements[3])) + .build(); + c.output(mutation); + } + } +} diff --git a/pom.xml b/pom.xml index cc2e4837a248..3a6289dfb7f2 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 3.5 1.9 2.24.0 + 1.0.0-rc2 1.8.1 v2-rev295-1.22.0 0.9.6.2 @@ -139,6 +140,7 @@ 3.1.4 v1-rev71-1.22.0 4.4.1 + 0.16.0-beta 4.3.5.RELEASE 2.0 1.1.4-M3 @@ -865,6 +867,12 @@ ${google-cloud-bigdataoss.version} + + com.google.cloud + google-cloud-spanner + ${spanner.version} + + com.google.cloud.bigdataoss util diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index ea2d8f0cb290..21818959843f 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -81,11 +81,28 @@ jackson-databind + + io.grpc + grpc-core + + + + com.google.api.grpc + grpc-google-common-protos + ${grpc-google-common-protos.version} + + com.google.apis google-api-services-bigquery + + com.google.api + api-common + ${api-common.version} + + com.google.apis google-api-services-pubsub @@ -116,11 +133,6 @@ grpc-auth - - io.grpc - grpc-core - - io.grpc grpc-netty @@ -149,6 +161,12 @@ joda-time + + com.google.cloud + google-cloud-spanner + ${spanner.version} + + com.google.cloud.bigtable bigtable-protos @@ -184,11 +202,6 @@ google-auth-library-oauth2-http - - com.google.api.grpc - grpc-google-common-protos - - org.slf4j slf4j-api diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java new file mode 100644 index 000000000000..172ed8f02687 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; + +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + + +/** + * Google Cloud Spanner connectors. + * + *

Reading from Cloud Spanner

+ * Status: Not implemented. + * + *

Writing to Cloud Spanner

+ * Status: Experimental. + * + *

{@link SpannerIO#writeTo} batches together and concurrently writes a set of {@link Mutation}s. + * To configure Cloud Spanner sink, you must apply {@link SpannerIO#writeTo} transform to + * {@link PCollection} and specify instance and database identifiers. + * For example, following code sketches out a pipeline that imports data from the CSV file to Cloud + * Spanner. + * + *

{@code
+ *
+ * Pipeline p = ...;
+ * // Read the CSV file.
+ * PCollection lines = p.apply("Read CSV file", TextIO.Read.from(options.getInput()));
+ * // Parse the line and convert to mutation.
+ * PCollection mutations = lines.apply("Parse CSV", parseFromCsv());
+ * // Write mutations.
+ * mutations.apply("Write", SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
+ * p.run();
+ *
+ * }
+ + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class SpannerIO { + + private SpannerIO() { + } + + @VisibleForTesting + static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000; + + /** + * Creates an instance of {@link Writer}. Use {@link Writer#withBatchSize} to limit the batch + * size. + */ + public static Writer writeTo(String instanceId, String databaseId) { + return new Writer(instanceId, databaseId, SPANNER_MUTATIONS_PER_COMMIT_LIMIT); + } + + /** + * A {@link PTransform} that writes {@link Mutation} objects to Cloud Spanner. + * + * @see SpannerIO + */ + public static class Writer extends PTransform, PDone> { + + private final String instanceId; + private final String databaseId; + private int batchSize; + + Writer(String instanceId, String databaseId, int batchSize) { + this.instanceId = instanceId; + this.databaseId = databaseId; + this.batchSize = batchSize; + } + + /** + * Returns a new {@link Writer} with a limit on the number of mutations per batch. + * Defaults to {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. + */ + public Writer withBatchSize(Integer batchSize) { + return new Writer(instanceId, databaseId, batchSize); + } + + @Override + public PDone expand(PCollection input) { + input.apply("Write mutations to Spanner", ParDo.of( + new SpannerWriterFn(instanceId, databaseId, batchSize))); + + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection input) { + checkNotNull(instanceId, "instanceId"); + checkNotNull(databaseId, "databaseId"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("instanceId", instanceId) + .add("databaseId", databaseId) + .toString(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("instanceId", instanceId) + .withLabel("Output Instance")) + .addIfNotNull(DisplayData.item("databaseId", databaseId) + .withLabel("Output Database")); + } + + } + + + /** + * {@link DoFn} that writes {@link Mutation}s to Cloud Spanner. Mutations are written in + * batches, where the maximum batch size is {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. + * + *

See + * + *

Commits are non-transactional. If a commit fails, it will be retried (up to + * {@link SpannerIO#MAX_RETRIES}. times). This means that the + * mutation operation should be idempotent. + */ + @VisibleForTesting + static class SpannerWriterFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriterFn.class); + private transient Spanner spanner; + private final String instanceId; + private final String databaseId; + private final int batchSize; + private transient DatabaseClient dbClient; + // Current batch of mutations to be written. + private final List mutations = new ArrayList<>(); + + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); + + @VisibleForTesting + SpannerWriterFn(String instanceId, String databaseId, int batchSize) { + this.instanceId = checkNotNull(instanceId, "instanceId"); + this.databaseId = checkNotNull(databaseId, "databaseId"); + this.batchSize = batchSize; + } + + @Setup + public void setup() throws Exception { + SpannerOptions options = SpannerOptions.newBuilder().build(); + spanner = options.getService(); + dbClient = spanner.getDatabaseClient( + DatabaseId.of(options.getProjectId(), instanceId, databaseId)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Mutation m = c.element(); + mutations.add(m); + int columnCount = m.asMap().size(); + if ((mutations.size() + 1) * columnCount >= batchSize) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (!mutations.isEmpty()) { + flushBatch(); + } + } + + @Teardown + public void teardown() throws Exception { + if (spanner == null) { + return; + } + spanner.closeAsync().get(); + } + + /** + * Writes a batch of mutations to Cloud Spanner. + * + *

If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. + * If the retry limit is exceeded, the last exception from Cloud Spanner will be + * thrown. + * + * @throws AbortedException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws AbortedException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + + while (true) { + // Batch upsert rows. + try { + dbClient.writeAtLeastOnce(mutations); + + // Break if the commit threw no exception. + break; + } catch (AbortedException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error("Error writing to Spanner ({}): {}", exception.getCode(), + exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations.clear(); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("instanceId", instanceId) + .withLabel("Instance")) + .addIfNotNull(DisplayData.item("databaseId", databaseId) + .withLabel("Database")); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java new file mode 100644 index 000000000000..19e468cce041 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

Provides an API for reading from and writing to + * Google Cloud Spanner. + */ +package org.apache.beam.sdk.io.gcp.spanner; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 7025004cc953..8950452b5db2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -75,6 +75,8 @@ public void testGcpApiSurface() throws Exception { classesInPackage("javax"), classesInPackage("org.apache.beam"), classesInPackage("org.apache.commons.logging"), + classesInPackage("com.google.cloud"), + classesInPackage("com.google.cloud.spanner"), // via Bigtable classesInPackage("org.joda.time")); From 6a1ac1d44235c5c628baef1019ca1d8c5327a191 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 4 May 2017 10:42:07 -0700 Subject: [PATCH 2/6] Minor style, compilation, javadoc fixups --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 172ed8f02687..c9c81a51a384 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -19,10 +19,6 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; - import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; @@ -35,23 +31,25 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - - - /** - * Google Cloud Spanner connectors. + * {@link PTransform Transforms} for reading from and writing to + * Google Cloud Spanner. * *

Reading from Cloud Spanner

* Status: Not implemented. @@ -61,9 +59,8 @@ * *

{@link SpannerIO#writeTo} batches together and concurrently writes a set of {@link Mutation}s. * To configure Cloud Spanner sink, you must apply {@link SpannerIO#writeTo} transform to - * {@link PCollection} and specify instance and database identifiers. - * For example, following code sketches out a pipeline that imports data from the CSV file to Cloud - * Spanner. + * {@link PCollection} and specify instance and database identifiers. For example, following code + * sketches out a pipeline that imports data from the CSV file to Cloud Spanner. * *

{@code
  *
@@ -77,14 +74,10 @@
  * p.run();
  *
  * }
- */ @Experimental(Experimental.Kind.SOURCE_SINK) public class SpannerIO { - private SpannerIO() { - } - @VisibleForTesting static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000; @@ -97,10 +90,11 @@ public static Writer writeTo(String instanceId, String databaseId) { } /** - * A {@link PTransform} that writes {@link Mutation} objects to Cloud Spanner. + * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner. * * @see SpannerIO */ + @Experimental(Experimental.Kind.SOURCE_SINK) public static class Writer extends PTransform, PDone> { private final String instanceId; @@ -130,7 +124,7 @@ public PDone expand(PCollection input) { } @Override - public void validate(PCollection input) { + public void validate(PipelineOptions options) { checkNotNull(instanceId, "instanceId"); checkNotNull(databaseId, "databaseId"); } @@ -152,19 +146,17 @@ public void populateDisplayData(DisplayData.Builder builder) { .addIfNotNull(DisplayData.item("databaseId", databaseId) .withLabel("Output Database")); } - } - /** - * {@link DoFn} that writes {@link Mutation}s to Cloud Spanner. Mutations are written in + * {@link DoFn} that writes {@link Mutation}s to Google Cloud Spanner. Mutations are written in * batches, where the maximum batch size is {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. * - *

See - * *

Commits are non-transactional. If a commit fails, it will be retried (up to - * {@link SpannerIO#MAX_RETRIES}. times). This means that the - * mutation operation should be idempotent. + * {@link SpannerWriterFn#MAX_RETRIES} times). This means that the mutation operation should be + * idempotent. + * + *

See Google Cloud Spanner documentation. */ @VisibleForTesting static class SpannerWriterFn extends DoFn { @@ -208,7 +200,7 @@ public void processElement(ProcessContext c) throws Exception { } @FinishBundle - public void finishBundle(Context c) throws Exception { + public void finishBundle() throws Exception { if (!mutations.isEmpty()) { flushBatch(); } @@ -269,4 +261,7 @@ public void populateDisplayData(Builder builder) { .withLabel("Database")); } } + + private SpannerIO() {} // Prevent construction. + } From 2a02f0ab61a3ef68483785c43a7d6f0fc23f7f9f Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 4 May 2017 10:48:54 -0700 Subject: [PATCH 3/6] Fix spanner dependency management Also minor cleanup alphabetization in root pom.xml --- pom.xml | 22 +++++++++++++++++++--- sdks/java/io/google-cloud-platform/pom.xml | 8 +++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 3a6289dfb7f2..c3a6b73dfb1a 100644 --- a/pom.xml +++ b/pom.xml @@ -115,11 +115,13 @@ 0.5.160222 1.4.0 1.3.0 + 1.0.0-rc2 1.0-rc2 1.4.1 0.6.1 1.22.0 1.4.5 + 1.0.2 0.5.160304 20.0 1.2.0 @@ -132,18 +134,20 @@ 1.9.5 4.1.8.Final 1.1.33.Fork26 - 1.5.0.Final 3.2.0 v1-rev10-1.22.0 1.7.14 + 0.16.0-beta 1.6.2 + 4.3.5.RELEASE 3.1.4 v1-rev71-1.22.0 4.4.1 - 0.16.0-beta 4.3.5.RELEASE - 2.0 1.1.4-M3 + + 1.5.0.Final + 2.0 2.20 2.20 3.6.1 @@ -610,6 +614,12 @@ ${grpc.version} + + com.google.api + api-common + ${google-api-common.version} + + com.google.api-client google-api-client @@ -867,6 +877,12 @@ ${google-cloud-bigdataoss.version} + + com.google.cloud + google-cloud-core + ${google-cloud-core.version} + + com.google.cloud google-cloud-spanner diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 21818959843f..9143ccf553cf 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -89,7 +89,6 @@ com.google.api.grpc grpc-google-common-protos - ${grpc-google-common-protos.version} @@ -100,7 +99,6 @@ com.google.api api-common - ${api-common.version} @@ -161,10 +159,14 @@ joda-time + + com.google.cloud + google-cloud-core + + com.google.cloud google-cloud-spanner - ${spanner.version} From 814fce80b52f169d581291dd2091478f0127c169 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 4 May 2017 10:48:24 -0700 Subject: [PATCH 4/6] Delete SpannerCSVLoader This is not appropriate for examples. SpannerIO should be well-javadoced and integration tested. --- .../examples/spanner/SpannerCSVLoader.java | 143 ------------------ 1 file changed, 143 deletions(-) delete mode 100644 examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java diff --git a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java deleted file mode 100644 index eee581da2148..000000000000 --- a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.spanner; - -import com.google.cloud.spanner.Database; -import com.google.cloud.spanner.DatabaseAdminClient; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Operation; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerException; -import com.google.cloud.spanner.SpannerOptions; -import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; -import java.util.Collections; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; - - - -/** - * Generalized bulk loader for importing CSV files into Spanner. - * - */ -public class SpannerCSVLoader { - - /** - * Command options specification. - */ - private interface Options extends PipelineOptions { - @Description("Create a sample database") - @Default.Boolean(false) - boolean isCreateDatabase(); - void setCreateDatabase(boolean createDatabase); - - @Description("File to read from ") - @Validation.Required - String getInput(); - void setInput(String value); - - @Description("Instance ID to write to in Spanner") - @Validation.Required - String getInstanceId(); - void setInstanceId(String value); - - @Description("Database ID to write to in Spanner") - @Validation.Required - String getDatabaseId(); - void setDatabaseId(String value); - - @Description("Table name") - @Validation.Required - String getTable(); - void setTable(String value); - } - - - /** - * Constructs and executes the processing pipeline based upon command options. - */ - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - Pipeline p = Pipeline.create(options); - PCollection lines = p.apply(TextIO.Read.from(options.getInput())); - PCollection mutations = lines - .apply(ParDo.of(new NaiveParseCsvFn(options.getTable()))); - mutations - .apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId())); - p.run().waitUntilFinish(); - } - - public static void createDatabase(Options options) { - Spanner client = SpannerOptions.getDefaultInstance().getService(); - - DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient(); - try { - databaseAdminClient.dropDatabase(options.getInstanceId(), options - .getDatabaseId()); - } catch (SpannerException e) { - // Does not exist, ignore. - } - Operation op = databaseAdminClient.createDatabase( - options.getInstanceId(), options - .getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " (" - + " Key INT64," - + " Name STRING," - + " Email STRING," - + " Age INT," - + ") PRIMARY KEY (Key)")); - op.waitFor(); - } - - - /** - * A DoFn that creates a Spanner Mutation for each CSV line. - */ - static class NaiveParseCsvFn extends DoFn { - private final String table; - - NaiveParseCsvFn(String table) { - this.table = table; - } - - @ProcessElement - public void processElement(ProcessContext c) { - String line = c.element(); - String[] elements = line.split(","); - if (elements.length != 4) { - return; - } - Mutation mutation = Mutation.newInsertOrUpdateBuilder(table) - .set("Key").to(Long.valueOf(elements[0])) - .set("Name").to(elements[1]) - .set("Email").to(elements[2]) - .set("Age").to(Integer.valueOf(elements[3])) - .build(); - c.output(mutation); - } - } -} From dfeb08a397b219a30225fca0345b3170f610a67e Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 15 May 2017 10:16:18 -0700 Subject: [PATCH 5/6] Refine Spanner API tests And remove outdated Bigtable comment --- .../org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 8950452b5db2..91caded1ad35 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -63,7 +63,10 @@ public void testGcpApiSurface() throws Exception { Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class), - // via Bigtable, PR above out to fix. + Matchers.>equalTo(com.google.cloud.ByteArray.class), + Matchers.>equalTo(com.google.cloud.Date.class), + Matchers.>equalTo(com.google.cloud.Timestamp.class), + classesInPackage("com.google.cloud.spanner"), classesInPackage("com.google.datastore.v1"), classesInPackage("com.google.protobuf"), classesInPackage("com.google.type"), @@ -75,9 +78,6 @@ public void testGcpApiSurface() throws Exception { classesInPackage("javax"), classesInPackage("org.apache.beam"), classesInPackage("org.apache.commons.logging"), - classesInPackage("com.google.cloud"), - classesInPackage("com.google.cloud.spanner"), - // via Bigtable classesInPackage("org.joda.time")); assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses)); From 6c5f3739d089ee5e3858a95dda32869ac1a4f3d7 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 15 May 2017 10:42:57 -0700 Subject: [PATCH 6/6] SpannerIO.Write cleanup and style fixes * Rename to Write to match the rest of the SDK. * Convert to AutoValue, delete toString. * Drop .writeTo(), instead use .write() as default constructor. * Temporarily drop withBatchSize, as its existence is not clearly justified. --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 116 ++++++++++-------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index c9c81a51a384..ec119311c106 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.auto.value.AutoValue; import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; @@ -26,12 +27,11 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -48,31 +48,29 @@ import org.slf4j.LoggerFactory; /** - * {@link PTransform Transforms} for reading from and writing to + * Experimental {@link PTransform Transforms} for reading from and writing to * Google Cloud Spanner. * *

Reading from Cloud Spanner

- * Status: Not implemented. + * + *

This functionality is not yet implemented. * *

Writing to Cloud Spanner

- * Status: Experimental. * - *

{@link SpannerIO#writeTo} batches together and concurrently writes a set of {@link Mutation}s. - * To configure Cloud Spanner sink, you must apply {@link SpannerIO#writeTo} transform to - * {@link PCollection} and specify instance and database identifiers. For example, following code - * sketches out a pipeline that imports data from the CSV file to Cloud Spanner. + *

The Cloud Spanner {@link SpannerIO.Write} transform writes to Cloud Spanner by executing a + * collection of input row {@link Mutation Mutations}. The mutations grouped into batches for + * efficiency. * - *

{@code
+ * 

To configure the write transform, create an instance using {@link #write()} and then specify + * the destination Cloud Spanner instance ({@link Write#withInstanceId(String)} and destination + * database ({@link Write#withDatabaseId(String)}). For example: * - * Pipeline p = ...; - * // Read the CSV file. - * PCollection lines = p.apply("Read CSV file", TextIO.Read.from(options.getInput())); - * // Parse the line and convert to mutation. - * PCollection mutations = lines.apply("Parse CSV", parseFromCsv()); + *

{@code
+ * // Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner.
+ * PCollection mutations = ...;
  * // Write mutations.
- * mutations.apply("Write", SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
- * p.run();
- *
+ * mutations.apply(
+ *     "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
  * }
*/ @Experimental(Experimental.Kind.SOURCE_SINK) @@ -81,12 +79,14 @@ public class SpannerIO { @VisibleForTesting static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000; - /** - * Creates an instance of {@link Writer}. Use {@link Writer#withBatchSize} to limit the batch - * size. - */ - public static Writer writeTo(String instanceId, String databaseId) { - return new Writer(instanceId, databaseId, SPANNER_MUTATIONS_PER_COMMIT_LIMIT); + /** + * Creates an unitialized instance of {@link Write}. Before use, the {@link Write} must be + * configured with a {@link Write#withInstanceId} and {@link Write#withDatabaseId} that identify + * the Cloud Spanner database being written. + */ + @Experimental + public static Write write() { + return new AutoValue_SpannerIO_Write.Builder().build(); } /** @@ -95,55 +95,63 @@ public static Writer writeTo(String instanceId, String databaseId) { * @see SpannerIO */ @Experimental(Experimental.Kind.SOURCE_SINK) - public static class Writer extends PTransform, PDone> { + @AutoValue + public abstract static class Write extends PTransform, PDone> { - private final String instanceId; - private final String databaseId; - private int batchSize; + @Nullable + abstract String getInstanceId(); - Writer(String instanceId, String databaseId, int batchSize) { - this.instanceId = instanceId; - this.databaseId = databaseId; - this.batchSize = batchSize; + @Nullable + abstract String getDatabaseId(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setInstanceId(String instanceId); + + abstract Builder setDatabaseId(String databaseId); + + abstract Write build(); } /** - * Returns a new {@link Writer} with a limit on the number of mutations per batch. - * Defaults to {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}. + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * instance. + * + *

Does not modify this object. */ - public Writer withBatchSize(Integer batchSize) { - return new Writer(instanceId, databaseId, batchSize); + public Write withInstanceId(String instanceId) { + return toBuilder().setInstanceId(instanceId).build(); } - @Override - public PDone expand(PCollection input) { - input.apply("Write mutations to Spanner", ParDo.of( - new SpannerWriterFn(instanceId, databaseId, batchSize))); - - return PDone.in(input.getPipeline()); + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * database. + * + *

Does not modify this object. + */ + public Write withDatabaseId(String databaseId) { + return toBuilder().setDatabaseId(databaseId).build(); } @Override - public void validate(PipelineOptions options) { - checkNotNull(instanceId, "instanceId"); - checkNotNull(databaseId, "databaseId"); - } + public PDone expand(PCollection input) { + input.apply("Write mutations to Spanner", + ParDo.of(new SpannerWriterFn( + getInstanceId(), getDatabaseId(), SPANNER_MUTATIONS_PER_COMMIT_LIMIT))); - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("instanceId", instanceId) - .add("databaseId", databaseId) - .toString(); + return PDone.in(input.getPipeline()); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("instanceId", instanceId) + .addIfNotNull(DisplayData.item("instanceId", getInstanceId()) .withLabel("Output Instance")) - .addIfNotNull(DisplayData.item("databaseId", databaseId) + .addIfNotNull(DisplayData.item("databaseId", getDatabaseId()) .withLabel("Output Database")); } }