diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 7a397215c245..bba3d502d100 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -183,8 +183,6 @@ var dataflowFilters = []string{ // Dataflow does not automatically terminate the TestCheckpointing pipeline when // complete. "TestCheckpointing", - // TODO(21761): This test needs to provide GCP project to expansion service. - "TestBigQueryIO_BasicWriteQueryRead", // Dataflow does not drain jobs by itself. "TestDrain", } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index d1acfbb54a7a..e0dd65a4ca80 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -568,11 +568,13 @@ private Map loadRegisteredTransforms() { .build(); } - protected Pipeline createPipeline() { + // configPipelineOptions is set up to be overridden for expansion service subclasses that may want + // to change which pipeline options are read. + protected PipelineOptions configPipelineOptions(PipelineOptions providedOpts) { // TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation PipelineOptions effectiveOpts = PipelineOptionsFactory.create(); PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class); - PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class); + PortablePipelineOptions specifiedOptions = providedOpts.as(PortablePipelineOptions.class); Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType()) .ifPresent(portableOptions::setDefaultEnvironmentType); Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig()) @@ -583,9 +585,14 @@ protected Pipeline createPipeline() { } effectiveOpts .as(ExperimentalOptions.class) - .setExperiments(pipelineOptions.as(ExperimentalOptions.class).getExperiments()); + .setExperiments(providedOpts.as(ExperimentalOptions.class).getExperiments()); effectiveOpts.setRunner(NotRunnableRunner.class); - return Pipeline.create(effectiveOpts); + return effectiveOpts; + } + + protected Pipeline createPipeline() { + PipelineOptions options = configPipelineOptions(pipelineOptions); + return Pipeline.create(options); } @Override @@ -609,15 +616,15 @@ public void close() throws Exception { // Nothing to do because the expansion service is stateless. } - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - System.out.println("Starting expansion service at localhost:" + port); - - // Register the options class used by the expansion service. - PipelineOptionsFactory.register(ExpansionServiceOptions.class); - - @SuppressWarnings("nullness") - ExpansionService service = new ExpansionService(Arrays.copyOfRange(args, 1, args.length)); + /** + * StartServer accepts an {@link ExpansionService}, including subclasses, and starts up a server. + * This is a utility function for writing main functions in {@link ExpansionService} subclasses. + * + * @param service The {@link ExpansionService} implementation to start as a server. + * @param port The integer port to start the server on. + * @throws Exception Thrown if the server fails to start. + */ + public static void StartServer(ExpansionService service, int port) throws Exception { for (Map.Entry entry : service.getRegisteredTransforms().entrySet()) { System.out.println("\t" + entry.getKey() + ": " + entry.getValue()); @@ -632,6 +639,19 @@ public static void main(String[] args) throws Exception { server.awaitTermination(); } + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + System.out.println("Starting expansion service at localhost:" + port); + + // Register the options class used by the expansion service. + PipelineOptionsFactory.register(ExpansionServiceOptions.class); + + @SuppressWarnings("nullness") + ExpansionService service = new ExpansionService(Arrays.copyOfRange(args, 1, args.length)); + + StartServer(service, port); + } + private static class NotRunnableRunner extends PipelineRunner { public static NotRunnableRunner fromOptions(PipelineOptions opts) { return new NotRunnableRunner(); diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle index c55b50ef4a63..278db3c7dafe 100644 --- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle +++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle @@ -18,7 +18,7 @@ apply plugin: 'org.apache.beam.module' apply plugin: 'application' -mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" +mainClassName = "org.apache.beam.sdk.io.gcp.expansion.service.GcpExpansionService" applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.gcp.expansion.service', @@ -32,16 +32,17 @@ ext.summary = "Expansion service serving GCP Java IOs" dependencies { implementation project(":sdks:java:expansion-service") - permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 implementation project(":sdks:java:io:google-cloud-platform") permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761 implementation project(":sdks:java:extensions:schemaio-expansion-service") permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761 + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:extensions:google-cloud-platform-core") runtimeOnly library.java.slf4j_jdk14 } task runExpansionService (type: JavaExec) { - mainClass = "org.apache.beam.sdk.expansion.service.ExpansionService" + mainClass = "org.apache.beam.sdk.io.gcp.expansion.service.GcpExpansionService" classpath = sourceSets.test.runtimeClasspath args = [project.findProperty("constructionService.port") ?: "8097"] } diff --git a/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java new file mode 100644 index 000000000000..3a4924937b62 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/GcpExpansionService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.expansion.service; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.beam.sdk.expansion.service.ExpansionService; +import org.apache.beam.sdk.expansion.service.ExpansionServiceOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** An expansion service for GCP IOs. */ +public class GcpExpansionService extends ExpansionService { + public GcpExpansionService(String[] args) { + super(args); + } + + @Override + protected PipelineOptions configPipelineOptions(PipelineOptions providedOpts) { + PipelineOptions opts = super.configPipelineOptions(providedOpts); + GcpOptions gcpOptions = opts.as(GcpOptions.class); + GcpOptions specifiedOptions = providedOpts.as(GcpOptions.class); + Optional.ofNullable(specifiedOptions.getProject()).ifPresent(gcpOptions::setProject); + return opts; + } + + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + System.out.println("Starting expansion service at localhost:" + port); + + // Register the options class used by the expansion service. + PipelineOptionsFactory.register(ExpansionServiceOptions.class); + + @SuppressWarnings("nullness") + ExpansionService service = new GcpExpansionService(Arrays.copyOfRange(args, 1, args.length)); + + StartServer(service, port); + } +} diff --git a/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java new file mode 100644 index 000000000000..be7260cd4772 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/expansion-service/src/main/java/org/apache/beam/sdk/io/gcp/expansion/service/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Classes used to expand transforms for GCP IOs. */ +package org.apache.beam.sdk.io.gcp.expansion.service;