-
Notifications
You must be signed in to change notification settings - Fork 4.5k
BEAM-1542 : Added SpannerIO Data Sink for Cloud Spanner #2166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
86ad4e2
564e714
49beb06
90754bb
caa6b58
be81ec7
69b0eb7
ba7e7ee
f583eae
77cfbb6
dc97e1e
0c7cfd4
0f176fc
c6bef41
f4118f7
68854f5
1b367c3
335021f
b1daa91
d8db650
7d1336f
5defd21
084d68e
5c95afb
2b64605
839fe05
500a22a
66ccbcd
202f6e1
71b6295
9656e22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.examples.spanner; | ||
|
|
||
| import com.google.cloud.spanner.Database; | ||
| import com.google.cloud.spanner.DatabaseAdminClient; | ||
| import com.google.cloud.spanner.Mutation; | ||
| import com.google.cloud.spanner.Operation; | ||
| import com.google.cloud.spanner.Spanner; | ||
| import com.google.cloud.spanner.SpannerException; | ||
| import com.google.cloud.spanner.SpannerOptions; | ||
| import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; | ||
| import java.util.Collections; | ||
| import org.apache.beam.sdk.Pipeline; | ||
| import org.apache.beam.sdk.io.TextIO; | ||
| import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; | ||
| import org.apache.beam.sdk.options.Default; | ||
| import org.apache.beam.sdk.options.Description; | ||
| import org.apache.beam.sdk.options.PipelineOptions; | ||
| import org.apache.beam.sdk.options.PipelineOptionsFactory; | ||
| import org.apache.beam.sdk.options.Validation; | ||
| import org.apache.beam.sdk.transforms.DoFn; | ||
| import org.apache.beam.sdk.transforms.ParDo; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
|
|
||
|
|
||
|
|
||
| /** | ||
| * Generalized bulk loader for importing CSV files into Spanner. | ||
| * | ||
| */ | ||
| public class SpannerCSVLoader { | ||
|
|
||
| /** | ||
| * Command options specification. | ||
| */ | ||
| private interface Options extends PipelineOptions { | ||
| @Description("Create a sample database") | ||
| @Default.Boolean(false) | ||
| boolean isCreateDatabase(); | ||
| void setCreateDatabase(boolean createDatabase); | ||
|
|
||
| @Description("File to read from ") | ||
| @Validation.Required | ||
| String getInput(); | ||
| void setInput(String value); | ||
|
|
||
| @Description("Instance ID to write to in Spanner") | ||
| @Validation.Required | ||
| String getInstanceId(); | ||
| void setInstanceId(String value); | ||
|
|
||
| @Description("Database ID to write to in Spanner") | ||
| @Validation.Required | ||
| String getDatabaseId(); | ||
| void setDatabaseId(String value); | ||
|
|
||
| @Description("Table name") | ||
| @Validation.Required | ||
| String getTable(); | ||
| void setTable(String value); | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Constructs and executes the processing pipeline based upon command options. | ||
| */ | ||
| public static void main(String[] args) throws Exception { | ||
| Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); | ||
|
|
||
| Pipeline p = Pipeline.create(options); | ||
| PCollection<String> lines = p.apply(TextIO.Read.from(options.getInput())); | ||
| PCollection<Mutation> mutations = lines | ||
| .apply(ParDo.of(new NaiveParseCsvFn(options.getTable()))); | ||
| mutations | ||
| .apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId())); | ||
| p.run().waitUntilFinish(); | ||
| } | ||
|
|
||
| public static void createDatabase(Options options) { | ||
| Spanner client = SpannerOptions.getDefaultInstance().getService(); | ||
|
|
||
| DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient(); | ||
| try { | ||
| databaseAdminClient.dropDatabase(options.getInstanceId(), options | ||
| .getDatabaseId()); | ||
| } catch (SpannerException e) { | ||
| // Does not exist, ignore. | ||
| } | ||
| Operation<Database, CreateDatabaseMetadata> op = databaseAdminClient.createDatabase( | ||
| options.getInstanceId(), options | ||
| .getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " (" | ||
| + " Key INT64," | ||
| + " Name STRING," | ||
| + " Email STRING," | ||
| + " Age INT," | ||
| + ") PRIMARY KEY (Key)")); | ||
| op.waitFor(); | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * A DoFn that creates a Spanner Mutation for each CSV line. | ||
| */ | ||
| static class NaiveParseCsvFn extends DoFn<String, Mutation> { | ||
| private final String table; | ||
|
|
||
| NaiveParseCsvFn(String table) { | ||
| this.table = table; | ||
| } | ||
|
|
||
| @ProcessElement | ||
| public void processElement(ProcessContext c) { | ||
| String line = c.element(); | ||
| String[] elements = line.split(","); | ||
| if (elements.length != 4) { | ||
| return; | ||
| } | ||
| Mutation mutation = Mutation.newInsertOrUpdateBuilder(table) | ||
| .set("Key").to(Long.valueOf(elements[0])) | ||
| .set("Name").to(elements[1]) | ||
| .set("Email").to(elements[2]) | ||
| .set("Age").to(Integer.valueOf(elements[3])) | ||
| .build(); | ||
| c.output(mutation); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -232,8 +232,8 @@ | |
| <artifactId>joda-time</artifactId> | ||
| </dependency> | ||
|
|
||
| <!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly | ||
| declare this dependency to include org.tukaani:xz on the classpath at runtime. --> | ||
| <!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert? |
||
| declare this dependency to include org.tukaani:xz on the classpath at runtime. --> | ||
| <dependency> | ||
| <groupId>org.tukaani</groupId> | ||
| <artifactId>xz</artifactId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,11 +81,28 @@ | |
| <artifactId>jackson-databind</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.grpc</groupId> | ||
| <artifactId>grpc-core</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.api.grpc</groupId> | ||
| <artifactId>grpc-google-common-protos</artifactId> | ||
| <version>${grpc-google-common-protos.version}</version> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no versions down here -- they should all be in top-level dependency management like you did for the |
||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.apis</groupId> | ||
| <artifactId>google-api-services-bigquery</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.api</groupId> | ||
| <artifactId>api-common</artifactId> | ||
| <version>${api-common.version}</version> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto re dependency management |
||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.apis</groupId> | ||
| <artifactId>google-api-services-storage</artifactId> | ||
|
|
@@ -121,11 +138,6 @@ | |
| <artifactId>grpc-auth</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.grpc</groupId> | ||
| <artifactId>grpc-core</artifactId> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is |
||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>io.grpc</groupId> | ||
| <artifactId>grpc-netty</artifactId> | ||
|
|
@@ -160,6 +172,12 @@ | |
| <artifactId>joda-time</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.cloud</groupId> | ||
| <artifactId>google-cloud-spanner</artifactId> | ||
| <version>${spanner.version}</version> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto re version |
||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.cloud.bigtable</groupId> | ||
| <artifactId>bigtable-protos</artifactId> | ||
|
|
@@ -197,11 +215,6 @@ | |
| <artifactId>google-auth-library-oauth2-http</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>com.google.api.grpc</groupId> | ||
| <artifactId>grpc-google-common-protos</artifactId> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.slf4j</groupId> | ||
| <artifactId>slf4j-api</artifactId> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not put this in the global examples – we're trying to keep this independent of specific proprietary clouds. Instead, we'd probably like to use it as part of an integration test in the
sdks/java/io/google-cloud-platform-javamodule.