From 84724126657f4d389573c5bc1403730029da3216 Mon Sep 17 00:00:00 2001 From: Daniel Oliveira Date: Mon, 13 Jun 2022 22:56:31 -0700 Subject: [PATCH 1/3] [#21761] Add GCP Project as pipeline opt read by expansion service The expansion service now reads the Project pipeline option so that during expansion it can perform GCP operations on the correct project. Also un-sickbay a test that was blocked on this issue. --- sdks/go/test/integration/integration.go | 2 -- sdks/java/expansion-service/build.gradle | 1 + .../apache/beam/sdk/expansion/service/ExpansionService.java | 3 +++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 7a397215c245..bba3d502d100 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -183,8 +183,6 @@ var dataflowFilters = []string{ // Dataflow does not automatically terminate the TestCheckpointing pipeline when // complete. "TestCheckpointing", - // TODO(21761): This test needs to provide GCP project to expansion service. - "TestBigQueryIO_BasicWriteQueryRead", // Dataflow does not drain jobs by itself. "TestDrain", } diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 49336faf2e4a..6f0e5dfafcb1 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":runners:core-construction-java") implementation project(path: ":runners:java-fn-execution") + implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") implementation project(path: ":sdks:java:fn-execution") permitUnusedDeclared project(path: ":sdks:java:fn-execution") implementation library.java.jackson_annotations diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index d1acfbb54a7a..22de29db34ab 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.AllowList; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -571,6 +572,8 @@ private Map loadRegisteredTransforms() { protected Pipeline createPipeline() { // TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation PipelineOptions effectiveOpts = PipelineOptionsFactory.create(); + GcpOptions gcpOptions = effectiveOpts.as(GcpOptions.class); + Optional.ofNullable(gcpOptions.getProject()).ifPresent(gcpOptions::setProject); PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class); PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class); Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType()) From 4307622d5e32a93a9d7645c51a1635ebd1ce5710 Mon Sep 17 00:00:00 2001 From: Daniel Oliveira Date: Tue, 14 Jun 2022 18:37:51 -0700 Subject: [PATCH 2/3] [#21761] Create expansion service subclass for GCP config. This makes some changes to ExpansionService.java to allow performing additional configuration in subclasses. --- sdks/java/expansion-service/build.gradle | 1 - .../expansion/service/ExpansionService.java | 49 +++++++++++------ .../expansion-service/build.gradle | 5 +- .../service/GcpExpansionService.java | 55 +++++++++++++++++++ .../gcp/expansion/service/package-info.java | 20 +++++++ 5 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java create mode 100644 sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 6f0e5dfafcb1..49336faf2e4a 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -40,7 +40,6 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":runners:core-construction-java") implementation project(path: ":runners:java-fn-execution") - implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") implementation project(path: ":sdks:java:fn-execution") permitUnusedDeclared project(path: ":sdks:java:fn-execution") implementation library.java.jackson_annotations diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 22de29db34ab..e0dd65a4ca80 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.AllowList; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -569,13 +568,13 @@ private Map loadRegisteredTransforms() { .build(); } - protected Pipeline createPipeline() { + // configPipelineOptions is set up to be overridden for expansion service subclasses that may want + // to change which pipeline options are read. + protected PipelineOptions configPipelineOptions(PipelineOptions providedOpts) { // TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation PipelineOptions effectiveOpts = PipelineOptionsFactory.create(); - GcpOptions gcpOptions = effectiveOpts.as(GcpOptions.class); - Optional.ofNullable(gcpOptions.getProject()).ifPresent(gcpOptions::setProject); PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class); - PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class); + PortablePipelineOptions specifiedOptions = providedOpts.as(PortablePipelineOptions.class); Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType()) .ifPresent(portableOptions::setDefaultEnvironmentType); Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig()) @@ -586,9 +585,14 @@ protected Pipeline createPipeline() { } effectiveOpts .as(ExperimentalOptions.class) - .setExperiments(pipelineOptions.as(ExperimentalOptions.class).getExperiments()); + .setExperiments(providedOpts.as(ExperimentalOptions.class).getExperiments()); effectiveOpts.setRunner(NotRunnableRunner.class); - return Pipeline.create(effectiveOpts); + return effectiveOpts; + } + + protected Pipeline createPipeline() { + PipelineOptions options = configPipelineOptions(pipelineOptions); + return Pipeline.create(options); } @Override @@ -612,15 +616,15 @@ public void close() throws Exception { // Nothing to do because the expansion service is stateless. } - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - System.out.println("Starting expansion service at localhost:" + port); - - // Register the options class used by the expansion service. - PipelineOptionsFactory.register(ExpansionServiceOptions.class); - - @SuppressWarnings("nullness") - ExpansionService service = new ExpansionService(Arrays.copyOfRange(args, 1, args.length)); + /** + * StartServer accepts an {@link ExpansionService}, including subclasses, and starts up a server. + * This is a utility function for writing main functions in {@link ExpansionService} subclasses. + * + * @param service The {@link ExpansionService} implementation to start as a server. + * @param port The integer port to start the server on. + * @throws Exception Thrown if the server fails to start. + */ + public static void StartServer(ExpansionService service, int port) throws Exception { for (Map.Entry entry : service.getRegisteredTransforms().entrySet()) { System.out.println("\t" + entry.getKey() + ": " + entry.getValue()); @@ -635,6 +639,19 @@ public static void main(String[] args) throws Exception { server.awaitTermination(); } + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + System.out.println("Starting expansion service at localhost:" + port); + + // Register the options class used by the expansion service. + PipelineOptionsFactory.register(ExpansionServiceOptions.class); + + @SuppressWarnings("nullness") + ExpansionService service = new ExpansionService(Arrays.copyOfRange(args, 1, args.length)); + + StartServer(service, port); + } + private static class NotRunnableRunner extends PipelineRunner { public static NotRunnableRunner fromOptions(PipelineOptions opts) { return new NotRunnableRunner(); diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle index c55b50ef4a63..00a3497380b4 100644 --- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle +++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle @@ -32,16 +32,17 @@ ext.summary = "Expansion service serving GCP Java IOs" dependencies { implementation project(":sdks:java:expansion-service") - permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 implementation project(":sdks:java:io:google-cloud-platform") permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761 implementation project(":sdks:java:extensions:schemaio-expansion-service") permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761 + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:extensions:google-cloud-platform-core") runtimeOnly library.java.slf4j_jdk14 } task runExpansionService (type: JavaExec) { - mainClass = "org.apache.beam.sdk.expansion.service.ExpansionService" + mainClass = "org.apache.beam.sdk.io.gcp.expansion.service.GcpExpansionService" classpath = sourceSets.test.runtimeClasspath args = [project.findProperty("constructionService.port") ?: "8097"] } diff --git a/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java new file mode 100644 index 000000000000..3a4924937b62 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.expansion.service; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.beam.sdk.expansion.service.ExpansionService; +import org.apache.beam.sdk.expansion.service.ExpansionServiceOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** An expansion service for GCP IOs. */ +public class GcpExpansionService extends ExpansionService { + public GcpExpansionService(String[] args) { + super(args); + } + + @Override + protected PipelineOptions configPipelineOptions(PipelineOptions providedOpts) { + PipelineOptions opts = super.configPipelineOptions(providedOpts); + GcpOptions gcpOptions = opts.as(GcpOptions.class); + GcpOptions specifiedOptions = providedOpts.as(GcpOptions.class); + Optional.ofNullable(specifiedOptions.getProject()).ifPresent(gcpOptions::setProject); + return opts; + } + + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + System.out.println("Starting expansion service at localhost:" + port); + + // Register the options class used by the expansion service. + PipelineOptionsFactory.register(ExpansionServiceOptions.class); + + @SuppressWarnings("nullness") + ExpansionService service = new GcpExpansionService(Arrays.copyOfRange(args, 1, args.length)); + + StartServer(service, port); + } +} diff --git a/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java new file mode 100644 index 000000000000..be7260cd4772 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Classes used to expand transforms for GCP IOs. */ +package org.apache.beam.sdk.io.gcp.expansion.service; From 2497e6023b37c7a8bf6e7d2feddcce7a0680b5f0 Mon Sep 17 00:00:00 2001 From: Daniel Oliveira Date: Thu, 16 Jun 2022 17:39:14 -0700 Subject: [PATCH 3/3] Fixup: Missed updating main class name --- .../io/google-cloud-platform/expansion-service/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle index 00a3497380b4..278db3c7dafe 100644 --- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle +++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle @@ -18,7 +18,7 @@ apply plugin: 'org.apache.beam.module' apply plugin: 'application' -mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" +mainClassName = "org.apache.beam.sdk.io.gcp.expansion.service.GcpExpansionService" applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.gcp.expansion.service',