diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 066c64cbad3d..76aff6e524c3 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -293,6 +293,29 @@ class BeamModulePlugin implements Plugin { } } + // A class defining the configuration for CrossLanguageValidatesRunner. + class CrossLanguageValidatesRunnerConfiguration { + // Task name for cross-language validate runner case. + String name = 'validatesCrossLanguageRunner' + // Fully qualified JobServerClass name to use. + String jobServerDriver + // A string representing the jobServer Configuration. + String jobServerConfig + // Number of parallel test runs. + Integer numParallelTests = 1 + // Extra options to pass to TestPipeline + String[] pipelineOpts = [] + // Categories for tests to run. + Closure testCategories = { + includeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' + // Use the following to include / exclude categories: + // includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + // excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' + } + // Configuration for the classpath when running the test. + Configuration testClasspathConfiguration + } + def isRelease(Project project) { return project.hasProperty('isRelease') } @@ -1572,7 +1595,6 @@ class BeamModulePlugin implements Plugin { */ project.evaluationDependsOn(":beam-sdks-java-core") project.evaluationDependsOn(":beam-runners-core-java") - project.evaluationDependsOn(":beam-runners-core-construction-java") def config = it ? it as PortableValidatesRunnerConfiguration : new PortableValidatesRunnerConfiguration() def name = config.name def beamTestPipelineOptions = [ @@ -1580,8 +1602,6 @@ class BeamModulePlugin implements Plugin { "--jobServerDriver=${config.jobServerDriver}", "--environmentCacheMillis=10000" ] - def expansionPort = startingExpansionPortNumber.getAndDecrement() - config.systemProperties.put("expansionPort", expansionPort) beamTestPipelineOptions.addAll(config.pipelineOpts) if (config.environment == PortableValidatesRunnerConfiguration.Environment.EMBEDDED) { beamTestPipelineOptions += "--defaultEnvironmentType=EMBEDDED" @@ -1595,7 +1615,7 @@ class BeamModulePlugin implements Plugin { description = "Validates the PortableRunner with JobServer ${config.jobServerDriver}" systemProperties config.systemProperties classpath = config.testClasspathConfiguration - testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-construction-java").sourceSets.test.output.classesDirs) + testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs) maxParallelForks config.numParallelTests useJUnit(config.testCategories) // increase maxHeapSize as this is directly correlated to direct memory, @@ -1609,6 +1629,110 @@ class BeamModulePlugin implements Plugin { /** ***********************************************************************************************/ + // Method to create the crossLanguageValidatesRunnerTask. + // The method takes crossLanguageValidatesRunnerConfiguration as parameter. + project.ext.createCrossLanguageValidatesRunnerTask = { + def config = it ? it as CrossLanguageValidatesRunnerConfiguration : new CrossLanguageValidatesRunnerConfiguration() + + project.evaluationDependsOn(":beam-sdks-python") + project.evaluationDependsOn(":beam-sdks-java-test-expansion-service") + project.evaluationDependsOn(":beam-runners-core-construction-java") + + // Task for launching expansion services + def envDir = project.project(":beam-sdks-python").envdir + def pythonDir = project.project(":beam-sdks-python").projectDir + def javaPort = startingExpansionPortNumber.getAndDecrement() + def pythonPort = startingExpansionPortNumber.getAndDecrement() + def expansionJar = project.project(':beam-sdks-java-test-expansion-service').buildTestExpansionServiceJar.archivePath + def expansionServiceOpts = [ + "group_id": project.name, + "java_expansion_service_jar": expansionJar, + "java_port": javaPort, + "python_virtualenv_dir": envDir, + "python_expansion_service_module": "apache_beam.runners.portability.expansion_service_test", + "python_port": pythonPort + ] + def serviceArgs = project.project(':beam-sdks-python').mapToArgString(expansionServiceOpts) + def setupTask = project.tasks.create(name: config.name+"Setup", type: Exec) { + dependsOn ':beam-sdks-java-container:docker' + dependsOn ':beam-sdks-python-container:docker' + dependsOn ':beam-sdks-java-test-expansion-service:buildTestExpansionServiceJar' + dependsOn ":beam-sdks-python:installGcpTest" + // setup test env + executable 'sh' + args '-c', "$pythonDir/scripts/run_expansion_services.sh start $serviceArgs; $pythonDir/scripts/run_kafka_services.sh start" + } + + def mainTask = project.tasks.create(name: config.name) { + group = "Verification" + description = "Validates cross-language capability of runner" + } + + def cleanupTask = project.tasks.create(name: config.name+'Cleanup', type: Exec) { + // teardown test env + executable 'sh' + args '-c', "$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}; $pythonDir/scripts/run_kafka_services.sh stop" + } + setupTask.finalizedBy cleanupTask + + // Task for running testcases in Java SDK + def beamJavaTestPipelineOptions = [ + "--runner=org.apache.beam.runners.reference.testing.TestPortableRunner", + "--jobServerDriver=${config.jobServerDriver}", + "--environmentCacheMillis=10000" + ] + beamJavaTestPipelineOptions.addAll(config.pipelineOpts) + if (config.jobServerConfig) { + beamJavaTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}") + } + ['Java': javaPort, 'Python': pythonPort].each { sdk, port -> + def javaTask = project.tasks.create(name: config.name+"JavaUsing"+sdk, type: Test) { + group = "Verification" + description = "Validates runner for cross-language capability of using ${sdk} transforms from Java SDK" + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(beamJavaTestPipelineOptions) + systemProperty "expansionPort", port + classpath = config.testClasspathConfiguration + testClassesDirs = project.files(project.project(":beam-runners-core-construction-java").sourceSets.test.output.classesDirs) + maxParallelForks config.numParallelTests + useJUnit(config.testCategories) + // increase maxHeapSize as this is directly correlated to direct memory, + // see https://issues.apache.org/jira/browse/BEAM-6698 + maxHeapSize = '4g' + dependsOn setupTask + } + mainTask.dependsOn javaTask + cleanupTask.mustRunAfter javaTask + + // Task for running testcases in Python SDK + def testOpts = [ + "--attr=UsesCrossLanguageTransforms" + ] + def pipelineOpts = [ + "--runner=PortableRunner", + "--experiments=xlang_test", + "--environment_cache_millis=10000" + ] + def beamPythonTestPipelineOptions = [ + "pipeline_opts": pipelineOpts, + "test_opts": testOpts + ] + def cmdArgs = project.project(':beam-sdks-python').mapToArgString(beamPythonTestPipelineOptions) + def pythonTask = project.tasks.create(name: config.name+"PythonUsing"+sdk, type: Exec) { + group = "Verification" + description = "Validates runner for cross-language capability of using ${sdk} transforms from Python SDK" + environment "EXPANSION_PORT", port + environment "EXPANSION_JAR", expansionJar + executable 'sh' + args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" + dependsOn setupTask + } + mainTask.dependsOn pythonTask + cleanupTask.mustRunAfter pythonTask + } + } + + /** ***********************************************************************************************/ + project.ext.applyPythonNature = { // Define common lifecycle tasks and artifact types diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 2629cb58e461..d53b93312393 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -57,6 +57,30 @@ examples: --- +coder: + urn: "beam:coder:string_utf8:v1" +nested: false +examples: + "abc": abc + "ab\0c": "ab\0c" + "\u00c3\u00bf": "\u00ff" + "\u00e5\u0085\u0089\u00e7\u00ba\u00bf": "光线" + +--- + +coder: + urn: "beam:coder:string_utf8:v1" +nested: true +examples: + "\u0003abc": abc + "\u0004ab\0c": "ab\0c" + "\u0002\u00c3\u00bf": "\u00ff" + "\u0006\u00e5\u0085\u0089\u00e7\u00ba\u00bf": "光线" + "\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|": + " 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|" + +--- + coder: urn: "beam:coder:varint:v1" examples: diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle index d1b182533a92..c1a07d75ae13 100644 --- a/runners/core-construction-java/build.gradle +++ b/runners/core-construction-java/build.gradle @@ -56,24 +56,3 @@ task runExpansionService (type: JavaExec) { classpath = sourceSets.main.runtimeClasspath args = [project.findProperty("constructionService.port") ?: "8097"] } - -task runTestExpansionService (type: JavaExec) { - main = "org.apache.beam.runners.core.construction.expansion.TestExpansionService" - classpath = sourceSets.test.runtimeClasspath - args = [project.findProperty("constructionService.port") ?: "8097"] -} - -task buildTestExpansionServiceJar(type: Jar) { - dependsOn = [shadowJar, shadowTestJar] - appendix = "testExpansionService" - // Use zip64 mode to avoid "Archive contains more than 65535 entries". - zip64 = true - manifest { - attributes( - 'Main-Class': 'org.apache.beam.runners.core.construction.expansion.TestExpansionService' - ) - } - from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }} - from sourceSets.main.output - from sourceSets.test.output -} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java new file mode 100644 index 000000000000..a103a6a5e838 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.coders.AvroGenericCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; + +/** Coder registrar for AvroGenericCoder. */ +@AutoService(CoderTranslatorRegistrar.class) +public class AvroGenericCoderRegistrar implements CoderTranslatorRegistrar { + public static final String AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1"; + + @Override + public Map, String> getCoderURNs() { + return ImmutableMap.of(AvroGenericCoder.class, AVRO_GENERIC_CODER_URN); + } + + @Override + public Map, CoderTranslator> getCoderTranslators() { + return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator()); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java new file mode 100644 index 000000000000..0f916a8e9705 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.beam.sdk.coders.AvroGenericCoder; +import org.apache.beam.sdk.coders.Coder; + +/** Coder translator for AvroGenericCoder. */ +public class AvroGenericCoderTranslator implements CoderTranslator { + @Override + public List> getComponents(AvroGenericCoder from) { + return Collections.emptyList(); + } + + @Override + public byte[] getPayload(AvroGenericCoder from) { + try { + return from.getSchema().toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("failed to encode schema."); + } + } + + @Override + public AvroGenericCoder fromComponents(List> components, byte[] payload) { + Schema schema; + try { + schema = new Schema.Parser().parse(new String(payload, "UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("failed to parse schema."); + } + return AvroGenericCoder.of(schema); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index 8e5d30c205be..66698fda474d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; @@ -47,6 +48,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { static final BiMap, String> BEAM_MODEL_CODER_URNS = ImmutableBiMap., String>builder() .put(ByteArrayCoder.class, ModelCoders.BYTES_CODER_URN) + .put(StringUtf8Coder.class, ModelCoders.STRING_UTF8_CODER_URN) .put(KvCoder.class, ModelCoders.KV_CODER_URN) .put(VarLongCoder.class, ModelCoders.INT64_CODER_URN) .put(IntervalWindowCoder.class, ModelCoders.INTERVAL_WINDOW_CODER_URN) @@ -64,6 +66,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { static final Map, CoderTranslator> BEAM_MODEL_CODERS = ImmutableMap., CoderTranslator>builder() .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class)) + .put(StringUtf8Coder.class, CoderTranslators.atomic(StringUtf8Coder.class)) .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class)) .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class)) .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class)) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java index 720bd6cea8d3..b925af26585c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java @@ -36,6 +36,7 @@ private ModelCoders() {} // Where is this required explicitly, instead of implicit within WindowedValue and LengthPrefix // coders? public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT); + public static final String STRING_UTF8_CODER_URN = getUrn(StandardCoders.Enum.STRING_UTF8); public static final String DOUBLE_CODER_URN = getUrn(StandardCoders.Enum.DOUBLE); @@ -57,6 +58,7 @@ private ModelCoders() {} ImmutableSet.of( BYTES_CODER_URN, INT64_CODER_URN, + STRING_UTF8_CODER_URN, ITERABLE_CODER_URN, TIMER_CODER_URN, KV_CODER_URN, diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index 50fd4d61309c..7cfc5aca8470 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.core.construction.BeamUrns; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.SdkComponents; @@ -60,6 +61,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Converter; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.slf4j.Logger; @@ -343,6 +345,7 @@ private Map loadRegisteredTransforms() { SdkComponents sdkComponents = rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace()); sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT); + pipeline.replaceAll(ImmutableList.of(JavaReadViaImpulse.boundedOverride())); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents); String expandedTransformId = Iterables.getOnlyElement( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java index 0ec75884bddc..a19555bdbaaa 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -63,6 +63,7 @@ public class CoderTranslationTest { .add(ByteArrayCoder.of()) .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of())) .add(VarLongCoder.of()) + .add(StringUtf8Coder.of()) .add(IntervalWindowCoder.of()) .add(IterableCoder.of(ByteArrayCoder.of())) .add(Timer.Coder.of(ByteArrayCoder.of())) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index 3e129aa9cfcf..251ea12466f8 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -85,6 +86,7 @@ public class CommonCoderTest { private static final Map> coders = ImmutableMap.>builder() .put(getUrn(StandardCoders.Enum.BYTES), ByteCoder.class) + .put(getUrn(StandardCoders.Enum.STRING_UTF8), StringUtf8Coder.class) .put(getUrn(StandardCoders.Enum.KV), KvCoder.class) .put(getUrn(StandardCoders.Enum.VARINT), VarLongCoder.class) .put(getUrn(StandardCoders.Enum.INTERVAL_WINDOW), IntervalWindowCoder.class) @@ -220,6 +222,8 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co String s = coderSpec.getUrn(); if (s.equals(getUrn(StandardCoders.Enum.BYTES))) { return ((String) value).getBytes(StandardCharsets.ISO_8859_1); + } else if (s.equals(getUrn(StandardCoders.Enum.STRING_UTF8))) { + return value; } else if (s.equals(getUrn(StandardCoders.Enum.KV))) { Coder keyCoder = ((KvCoder) coder).getKeyCoder(); Coder valueCoder = ((KvCoder) coder).getValueCoder(); @@ -287,6 +291,8 @@ private static Coder instantiateCoder(CommonCoder coder) { String s = coder.getUrn(); if (s.equals(getUrn(StandardCoders.Enum.BYTES))) { return ByteArrayCoder.of(); + } else if (s.equals(getUrn(StandardCoders.Enum.STRING_UTF8))) { + return StringUtf8Coder.of(); } else if (s.equals(getUrn(StandardCoders.Enum.KV))) { return KvCoder.of(components.get(0), components.get(1)); } else if (s.equals(getUrn(StandardCoders.Enum.VARINT))) { @@ -328,7 +334,8 @@ private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object String s = coder.getUrn(); if (s.equals(getUrn(StandardCoders.Enum.BYTES))) { assertThat(expectedValue, equalTo(actualValue)); - + } else if (s.equals(getUrn(StandardCoders.Enum.STRING_UTF8))) { + assertEquals(expectedValue, actualValue); } else if (s.equals(getUrn(StandardCoders.Enum.KV))) { assertThat(actualValue, instanceOf(KV.class)); verifyDecodedValue( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java index 1e35bd4eb566..cb0f78a58f4f 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java @@ -32,11 +32,11 @@ import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ConnectivityState; import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannel; @@ -61,14 +61,12 @@ public class ExternalTest implements Serializable { private static final String TEST_URN_LE = "le"; private static final String TEST_URN_MULTI = "multi"; - private static String pythonServerCommand; private static Integer expansionPort; private static String localExpansionAddr; private static Server localExpansionServer; @BeforeClass public static void setUp() throws IOException { - pythonServerCommand = System.getProperty("pythonTestExpansionCommand"); expansionPort = Integer.valueOf(System.getProperty("expansionPort")); int localExpansionPort = expansionPort + 100; localExpansionAddr = String.format("localhost:%s", localExpansionPort); @@ -86,26 +84,27 @@ public static void tearDown() { @Test @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class}) public void expandSingleTest() { - PCollection col = + PCollection col = testPipeline - .apply(Create.of(1, 2, 3)) + .apply(Create.of("1", "2", "3")) .apply(External.of(TEST_URN_SIMPLE, new byte[] {}, localExpansionAddr)); - PAssert.that(col).containsInAnyOrder(2, 3, 4); + PAssert.that(col).containsInAnyOrder("Simple(1)", "Simple(2)", "Simple(3)"); testPipeline.run(); } @Test @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class}) public void expandMultipleTest() { - PCollection pcol = + PCollection pcol = testPipeline - .apply(Create.of(1, 2, 3)) - .apply("add one", External.of(TEST_URN_SIMPLE, new byte[] {}, localExpansionAddr)) + .apply(Create.of(1, 2, 3, 4, 5, 6)) .apply( "filter <=3", - External.of(TEST_URN_LE, "3".getBytes(StandardCharsets.UTF_8), localExpansionAddr)); + External.of(TEST_URN_LE, "3".getBytes(StandardCharsets.UTF_8), localExpansionAddr)) + .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString)) + .apply("put simple", External.of(TEST_URN_SIMPLE, new byte[] {}, localExpansionAddr)); - PAssert.that(pcol).containsInAnyOrder(2, 3); + PAssert.that(pcol).containsInAnyOrder("Simple(1)", "Simple(2)", "Simple(3)"); testPipeline.run(); } @@ -123,20 +122,10 @@ public void expandMultiOutputTest() { testPipeline.run(); } - private Process runCommandline(String command) { - ProcessBuilder builder = new ProcessBuilder("sh", "-c", command); - try { - return builder.start(); - } catch (IOException e) { - throw new AssertionError("process launch failed."); - } - } - @Test @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class}) public void expandPythonTest() { String target = String.format("localhost:%s", expansionPort); - Process p = runCommandline(String.format("%s -p %s", pythonServerCommand, expansionPort)); try { ManagedChannel channel = ManagedChannelBuilder.forTarget(target).build(); ConnectivityState state = channel.getState(true); @@ -150,17 +139,17 @@ public void expandPythonTest() { testPipeline .apply(Create.of("1", "2", "2", "3", "3", "3")) .apply( - "toBytes", - MapElements.into(new TypeDescriptor() {}).via(String::getBytes)) - .apply(External.of("count_per_element_bytes", new byte[] {}, target)) - .apply("toString", MapElements.into(TypeDescriptors.strings()).via(String::new)); + External.>of( + "beam:transforms:xlang:count", new byte[] {}, target)) + .apply( + "toString", + MapElements.into(TypeDescriptors.strings()) + .via(x -> String.format("%s->%s", x.getKey(), x.getValue()))); PAssert.that(pCol).containsInAnyOrder("1->1", "2->2", "3->3"); testPipeline.run(); } catch (InterruptedException e) { throw new RuntimeException("interrupted."); - } finally { - p.destroyForcibly(); } } @@ -175,7 +164,9 @@ public static class TestTransforms public Map knownTransforms() { return ImmutableMap.of( TEST_URN_SIMPLE, - spec -> MapElements.into(TypeDescriptors.integers()).via((Integer x) -> x + 1), + spec -> + MapElements.into(TypeDescriptors.strings()) + .via((String x) -> String.format("Simple(%s)", x)), TEST_URN_LE, spec -> Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())), TEST_URN_MULTI, diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java deleted file mode 100644 index 3ec867ac87c9..000000000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/TestExpansionService.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction.expansion; - -import com.google.auto.service.AutoService; -import java.util.Map; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; - -/** - * An {@link org.apache.beam.runners.core.construction.expansion.ExpansionService} useful for tests. - */ -public class TestExpansionService { - - private static final String TEST_COUNT_URN = "pytest:beam:transforms:count"; - private static final String TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than"; - - /** Registers a single test transformation. */ - @AutoService(ExpansionService.ExpansionServiceRegistrar.class) - public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar { - @Override - public Map knownTransforms() { - return ImmutableMap.of( - TEST_COUNT_URN, spec -> Count.perElement(), - TEST_FILTER_URN, - spec -> - Filter.lessThanEq( - // TODO(BEAM-6587): Use strings directly rather than longs. - (long) spec.getPayload().toStringUtf8().charAt(0))); - } - } - - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - System.out.println("Starting expansion service at localhost:" + port); - Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build(); - server.start(); - server.awaitTermination(); - } -} diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index f040b6a3f9c6..adc257065903 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -159,23 +159,13 @@ task validatesPortableRunner() { dependsOn validatesPortableRunnerStreaming } -project.ext.validatesCrossLanguageTransforms = - createPortableValidatesRunnerTask( - name: "validatesCrossLanguageTransforms", - jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", - jobServerConfig: "--clean-artifacts-per-job,--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0", - testClasspathConfiguration: configurations.validatesPortableRunner, - numParallelTests: 1, - pipelineOpts: [ - // Limit resource consumption via parallelism - "--parallelism=2", - "--shutdownSourcesOnFinalWatermark", - ], - testCategories: { - // Only include cross-language transform tests. Avoid to retest everything on Docker environment. - includeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' - }, - ) -project.evaluationDependsOn ':beam-sdks-python' -validatesCrossLanguageTransforms.dependsOn ':beam-sdks-python:setupVirtualenv' -validatesCrossLanguageTransforms.systemProperty "pythonTestExpansionCommand", ". ${project(':beam-sdks-python').envdir}/bin/activate && pip install -e ${project(':beam-sdks-python').projectDir}[test] && python -m apache_beam.runners.portability.expansion_service_test" +project.ext.validatesCrossLanguageRunner = createCrossLanguageValidatesRunnerTask( + jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", + jobServerConfig: "--clean-artifacts-per-job,--job-host=localhost,--job-port=0,--artifact-port=0", + testClasspathConfiguration: configurations.validatesPortableRunner, + numParallelTests: 1, + pipelineOpts: [ + "--parallelism=2", + "--shutdownSourcesOnFinalWatermark", + ] +) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index d4728c19aba9..8a02ce6ae064 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -773,8 +773,7 @@ public void setForWindow(InputT input, BoundedWindow window) { // make sure this fires after any window.maxTimestamp() timers Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy).plus(1); // needs to match the encoding in prepareStateBackend for state request handler - final ByteBuffer key = - FlinkKeyUtils.encodeKey(((KV) input).getKey(), keyCoder, Coder.Context.NESTED); + final ByteBuffer key = FlinkKeyUtils.encodeKey(((KV) input).getKey(), keyCoder); // Ensure the state backend is not concurrently accessed by the state requests try { stateBackendLock.lock(); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index d61c87fe2742..de46d63a0bc8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -432,7 +432,7 @@ public void testEnsureStateCleanupWithKeyedInputCleanupTimer() { cleanupTimer.setForWindow(KV.of("key", "string"), window); Mockito.verify(stateBackendLock).lock(); - ByteBuffer key = FlinkKeyUtils.encodeKey("key", keyCoder, Coder.Context.NESTED); + ByteBuffer key = FlinkKeyUtils.encodeKey("key", keyCoder); Mockito.verify(keyedStateBackend).setCurrentKey(key); assertThat( inMemoryTimerInternals.getNextTimer(TimeDomain.EVENT_TIME), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java index 823617e176d4..94b6e29946e3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java @@ -492,10 +492,10 @@ public StateInternals stateInternals() { } } - private KV> timerBytes( + private KV> timerBytes( String key, long timestampOffset) throws CoderException { return KV.of( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), + key, org.apache.beam.runners.core.construction.Timer.of( BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset), CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED))); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 108f88a4e707..1d87dd680d64 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -290,9 +290,9 @@ public void process(ProcessContext ctxt) { assertThat( windowedValues, containsInAnyOrder( - WindowedValue.valueInGlobalWindow(kvBytes("foo", 4)), - WindowedValue.valueInGlobalWindow(kvBytes("foo", 3)), - WindowedValue.valueInGlobalWindow(kvBytes("foo", 3)))); + WindowedValue.valueInGlobalWindow(byteValueOf("foo", 4)), + WindowedValue.valueInGlobalWindow(byteValueOf("foo", 3)), + WindowedValue.valueInGlobalWindow(byteValueOf("foo", 3)))); } } @@ -376,8 +376,8 @@ public void process(ProcessContext ctxt) throws Exception { assertThat( windowedValues, containsInAnyOrder( - WindowedValue.valueInGlobalWindow(kvBytes("Y", "Y")), - WindowedValue.valueInGlobalWindow(kvBytes("Z", "Z")))); + WindowedValue.valueInGlobalWindow(KV.of("Y", "Y")), + WindowedValue.valueInGlobalWindow(KV.of("Z", "Z")))); } } @@ -448,11 +448,7 @@ public void processElement(ProcessContext context) { RemoteOutputReceiver.of(targetCoder.getValue(), outputContents::add)); } - Iterable sideInputData = - Arrays.asList( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"), - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"), - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C")); + Iterable sideInputData = Arrays.asList("A", "B", "C"); StateRequestHandler stateRequestHandler = StateRequestHandlers.forSideInputHandlerFactory( descriptor.getSideInputSpecs(), @@ -482,24 +478,20 @@ public Coder resultCoder() { try (ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, progressHandler)) { Iterables.getOnlyElement(bundle.getInputReceivers().values()) - .accept( - WindowedValue.valueInGlobalWindow( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X"))); + .accept(WindowedValue.valueInGlobalWindow("X")); Iterables.getOnlyElement(bundle.getInputReceivers().values()) - .accept( - WindowedValue.valueInGlobalWindow( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y"))); + .accept(WindowedValue.valueInGlobalWindow("Y")); } for (Collection> windowedValues : outputValues.values()) { assertThat( windowedValues, containsInAnyOrder( - WindowedValue.valueInGlobalWindow(kvBytes("X", "A")), - WindowedValue.valueInGlobalWindow(kvBytes("X", "B")), - WindowedValue.valueInGlobalWindow(kvBytes("X", "C")), - WindowedValue.valueInGlobalWindow(kvBytes("Y", "A")), - WindowedValue.valueInGlobalWindow(kvBytes("Y", "B")), - WindowedValue.valueInGlobalWindow(kvBytes("Y", "C")))); + WindowedValue.valueInGlobalWindow(KV.of("X", "A")), + WindowedValue.valueInGlobalWindow(KV.of("X", "B")), + WindowedValue.valueInGlobalWindow(KV.of("X", "C")), + WindowedValue.valueInGlobalWindow(KV.of("Y", "A")), + WindowedValue.valueInGlobalWindow(KV.of("Y", "B")), + WindowedValue.valueInGlobalWindow(KV.of("Y", "C")))); } } @@ -595,11 +587,7 @@ public void process(ProcessContext ctxt) { RemoteOutputReceiver.of(targetCoder.getValue(), outputContents::add)); } - Iterable sideInputData = - Arrays.asList( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"), - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"), - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C")); + Iterable sideInputData = Arrays.asList("A", "B", "C"); StateRequestHandler stateRequestHandler = StateRequestHandlers.forSideInputHandlerFactory( @@ -879,15 +867,15 @@ public void clear(K key, W window) { processor.newBundle( outputReceivers, stateRequestHandler, BundleProgressHandler.ignored())) { Iterables.getOnlyElement(bundle.getInputReceivers().values()) - .accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "Y"))); + .accept(WindowedValue.valueInGlobalWindow(KV.of("X", "Y"))); } for (Collection> windowedValues : outputValues.values()) { assertThat( windowedValues, containsInAnyOrder( - WindowedValue.valueInGlobalWindow(kvBytes("X", "A")), - WindowedValue.valueInGlobalWindow(kvBytes("X", "B")), - WindowedValue.valueInGlobalWindow(kvBytes("X", "C")))); + WindowedValue.valueInGlobalWindow(KV.of("X", "A")), + WindowedValue.valueInGlobalWindow(KV.of("X", "B")), + WindowedValue.valueInGlobalWindow(KV.of("X", "C")))); } assertThat( userStateData.get(stateId), @@ -1025,7 +1013,7 @@ public void processingTimer( bundle .getInputReceivers() .get(stage.getInputPCollection().getId()) - .accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "X"))); + .accept(WindowedValue.valueInGlobalWindow(KV.of("X", "X"))); bundle .getInputReceivers() .get(eventTimeInputPCollectionId) @@ -1043,9 +1031,9 @@ public void processingTimer( assertThat( outputValues.get(mainOutputTarget), containsInAnyOrder( - WindowedValue.valueInGlobalWindow(kvBytes("mainX", "")), - WindowedValue.valueInGlobalWindow(kvBytes("event", "")), - WindowedValue.valueInGlobalWindow(kvBytes("processing", "")))); + WindowedValue.valueInGlobalWindow(KV.of("mainX", "")), + WindowedValue.valueInGlobalWindow(KV.of("event", "")), + WindowedValue.valueInGlobalWindow(KV.of("processing", "")))); assertThat( timerStructuralValues(outputValues.get(eventTimeOutputTarget)), containsInAnyOrder( @@ -1154,26 +1142,18 @@ public void process(ProcessContext c) { assertThat( outputValues, containsInAnyOrder( - WindowedValue.valueInGlobalWindow(kvBytes("stream1X", "")), - WindowedValue.valueInGlobalWindow(kvBytes("stream2X", "")))); + WindowedValue.valueInGlobalWindow(KV.of("stream1X", "")), + WindowedValue.valueInGlobalWindow(KV.of("stream2X", "")))); } - private KV kvBytes(String key, long value) throws CoderException { - return KV.of( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), - CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value)); - } - - private KV kvBytes(String key, String value) throws CoderException { - return KV.of( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), value)); + private KV byteValueOf(String key, long value) throws CoderException { + return KV.of(key, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value)); } - private KV> timerBytes( + private KV> timerBytes( String key, long timestampOffset) throws CoderException { return KV.of( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), + key, org.apache.beam.runners.core.construction.Timer.of( BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset), CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED))); @@ -1182,7 +1162,7 @@ private KV> time private Object timerStructuralValue(Object timer) { return WindowedValue.FullWindowedValueCoder.of( KvCoder.of( - ByteArrayCoder.of(), + StringUtf8Coder.of(), org.apache.beam.runners.core.construction.Timer.Coder.of(ByteArrayCoder.of())), GlobalWindow.Coder.INSTANCE) .structuralValue(timer); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java new file mode 100644 index 000000000000..838429a4f1ea --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +/** GenericRecord Avro Coder for simple dict type cross-language data transfer. */ +public class AvroGenericCoder extends AvroCoder { + protected AvroGenericCoder(Schema schema) { + super(GenericRecord.class, schema); + } + + public static AvroGenericCoder of(Schema schema) { + return new AvroGenericCoder(schema); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroGenericCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroGenericCoderTest.java new file mode 100644 index 000000000000..40b1e4a07caf --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroGenericCoderTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test case for {@link StringUtf8Coder}. */ +@RunWith(JUnit4.class) +public class AvroGenericCoderTest { + private static final Schema SCHEMA = + SchemaBuilder.record("test") + .namespace("org.apache.beam") + .fields() + .name("name") + .type() + .stringType() + .noDefault() + .name("age") + .type() + .intType() + .intDefault(0) + .name("hometown") + .type() + .nullable() + .stringType() + .noDefault() + .endRecord(); + + private static final Coder TEST_CODER = AvroGenericCoder.of(SCHEMA); + + private static final List TEST_VALUES = + Arrays.asList( + new GenericRecordBuilder(SCHEMA) + .set("name", "Jon Snow") + .set("age", 23) + .set("hometown", "キングズ・ランディング") + .build(), + new GenericRecordBuilder(SCHEMA) + .set("name", "Daenerys targaryen") + .set("age", 23) + .set("hometown", null) + .build(), + new GenericRecordBuilder(SCHEMA) + .set("name", "Sansa Stark") + .set("hometown", "윈터펠") + .build(), + new GenericRecordBuilder(SCHEMA) + .set("name", "Tyrion Lannister") + .set("hometown", null) + .build()); + + @Test + public void testDecodeEncodeEqual() throws Exception { + for (GenericRecord value : TEST_VALUES) { + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); + } + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat( + TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(GenericRecord.class))); + } +} diff --git a/sdks/java/testing/expansion-service/build.gradle b/sdks/java/testing/expansion-service/build.gradle new file mode 100644 index 000000000000..5c399cc3dc26 --- /dev/null +++ b/sdks/java/testing/expansion-service/build.gradle @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { id 'org.apache.beam.module' } +applyJavaNature() + +description = "Apache Beam :: SDKs :: Java :: Test Expansion Service" +ext.summary = """Testing Expansion Service used for executing cross-language transform tests.""" + + +dependencies { + shadowTest project(path: ":beam-runners-core-construction-java", configuration: "shadow") + shadowTest project(path: ":beam-sdks-java-io-parquet", configuration: "shadow") + shadowTest project(path: ":beam-sdks-java-io-kafka", configuration: "shadow") + shadowTest project(path: ":beam-sdks-java-core", configuration: "shadow") +} + +task runTestExpansionService (type: JavaExec) { + main = "org.apache.beam.sdk.expansion.TestExpansionService" + classpath = sourceSets.test.runtimeClasspath + args = [project.findProperty("constructionService.port") ?: "8097"] +} + +task buildTestExpansionServiceJar(type: ShadowJar) { + configurations = [project.configurations.shadowTest] + appendix = "testExpansionService" + // Use zip64 mode to avoid "Archive contains more than 65535 entries". + zip64 = true + mergeServiceFiles() + manifest { + attributes( + 'Main-Class': 'org.apache.beam.sdk.expansion.TestExpansionService' + ) + } + from { project.configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }} + from sourceSets.main.output + from sourceSets.test.output +} diff --git a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/expansion/TestExpansionService.java b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/expansion/TestExpansionService.java new file mode 100644 index 000000000000..6856c09e59cd --- /dev/null +++ b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/expansion/TestExpansionService.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.expansion; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.runners.core.construction.expansion.ExpansionService; +import org.apache.beam.sdk.coders.AvroGenericCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +/** + * An {@link org.apache.beam.runners.core.construction.expansion.ExpansionService} useful for tests. + */ +public class TestExpansionService { + + private static final String TEST_COUNT_URN = "beam:transforms:xlang:count"; + private static final String TEST_FILTER_URN = "beam:transforms:xlang:filter_less_than_eq"; + private static final String TEST_PARQUET_READ_URN = "beam:transforms:xlang:parquet_read"; + private static final String TEST_PARQUET_WRITE_URN = "beam:transforms:xlang:parquet_write"; + private static final String TEST_KAFKA_WRITE_URN = "beam:transforms:xlang:kafka_write"; + private static final String TEST_KAFKA_READ_URN = "beam:transforms:xlang:kafka_read"; + + /** Registers a single test transformation. */ + @AutoService(ExpansionService.ExpansionServiceRegistrar.class) + public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar { + String rawSchema = + "{ \"type\": \"record\", \"name\": \"testrecord\", \"fields\": " + + "[ {\"name\": \"name\", \"type\": \"string\"} ]}"; + + @Override + public Map knownTransforms() { + Schema schema = new Schema.Parser().parse(rawSchema); + ImmutableMap.Builder builder = + ImmutableMap.builder(); + builder.put(TEST_COUNT_URN, spec -> Count.perElement()); + builder.put( + TEST_FILTER_URN, + spec -> + Filter.lessThanEq( + // TODO(BEAM-6587): Use strings directly rather than longs. + (long) spec.getPayload().toStringUtf8().charAt(0))); + builder.put( + TEST_PARQUET_READ_URN, + spec -> + new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply(ParquetIO.readFiles(schema)) + .setCoder(AvroGenericCoder.of(schema)); + } + }); + builder.put( + TEST_PARQUET_WRITE_URN, + spec -> + new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input + .apply( + FileIO.write() + .via(ParquetIO.sink(schema)) + .to(spec.getPayload().toStringUtf8())) + .getPerDestinationOutputFilenames() + .apply(Values.create()); + } + }); + builder.put( + TEST_KAFKA_WRITE_URN, + spec -> + new PTransform, PDone>() { + @Override + public PDone expand(PCollection input) { + return input.apply( + KafkaIO.write() + .withBootstrapServers(spec.getPayload().toStringUtf8()) + .withTopic("beam-kafkaio-test") + .withValueSerializer(StringSerializer.class) + .values()); + } + }); + builder.put( + TEST_KAFKA_READ_URN, + spec -> + new PTransform>() { + @Override + public PCollection expand(PBegin input) { + return input + .apply( + KafkaIO.read() + .withBootstrapServers(spec.getPayload().toStringUtf8()) + .withTopic("beam-kafkaio-test") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializerAndCoder( + StringDeserializer.class, StringUtf8Coder.of()) + .withReadCommitted() + .withMaxNumRecords(3) + .updateConsumerProperties( + ImmutableMap.of( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) + .withoutMetadata()) + .apply(Values.create()); + } + }); + return builder.build(); + } + } + + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + System.out.println("Starting expansion service at localhost:" + port); + Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build(); + server.start(); + server.awaitTermination(); + } +} diff --git a/sdks/python/apache_beam/coders/avro_generic_coder.py b/sdks/python/apache_beam/coders/avro_generic_coder.py new file mode 100644 index 000000000000..1ce8b08b1ee3 --- /dev/null +++ b/sdks/python/apache_beam/coders/avro_generic_coder.py @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""avro generic record coder for simple dict-type x-language data transfer.""" + +from __future__ import absolute_import + +import json +from io import BytesIO + +from fastavro import parse_schema +from fastavro import schemaless_reader +from fastavro import schemaless_writer + +from apache_beam.coders.coder_impl import SimpleCoderImpl +from apache_beam.coders.coders import Coder +from apache_beam.coders.coders import FastCoder + +AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1" + +__all__ = ['AvroGenericCoder', 'AvroGenericRecord'] + + +class AvroGenericCoder(FastCoder): + """A coder used for AvroGenericRecord values.""" + + def __init__(self, schema): + self.schema = schema + + def _create_impl(self): + return AvroGenericCoderImpl(self.schema) + + def is_deterministic(self): + # TODO: need to confirm if it's deterministic + return False + + def __eq__(self, other): + return (type(self) == type(other) + and self.schema == other.schema) + + def __hash__(self): + return hash(self.schema) + + def to_type_hint(self): + return AvroGenericRecord + + def to_runner_api_parameter(self, context): + return AVRO_GENERIC_CODER_URN, self.schema, () + + @Coder.register_urn(AVRO_GENERIC_CODER_URN, bytes) + def from_runner_api_parameter(payload, unused_components, unused_context): + return AvroGenericCoder(payload) + + +class AvroGenericCoderImpl(SimpleCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + + def __init__(self, schema): + self.parsed_schema = parse_schema(json.loads(schema)) + + def encode(self, value): + assert issubclass(type(value), AvroGenericRecord) + with BytesIO() as buf: + schemaless_writer(buf, self.parsed_schema, value.record) + return buf.getvalue() + + def decode(self, encoded): + with BytesIO(encoded) as buf: + return AvroGenericRecord(schemaless_reader(buf, self.parsed_schema)) + + +class AvroGenericRecord(object): + """Simple wrapper class for dictionary records.""" + + def __init__(self, value): + self.record = value + + def __eq__(self, other): + return ( + issubclass(type(other), AvroGenericRecord) and + self.record == other.record + ) + + def __hash__(self): + return hash(self.record) diff --git a/sdks/python/apache_beam/coders/avro_generic_coder_test.py b/sdks/python/apache_beam/coders/avro_generic_coder_test.py new file mode 100644 index 000000000000..f183c854bf7f --- /dev/null +++ b/sdks/python/apache_beam/coders/avro_generic_coder_test.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import logging +import unittest + +from apache_beam.coders.avro_generic_coder import AvroGenericCoder +from apache_beam.coders.avro_generic_coder import AvroGenericRecord +from apache_beam.coders.typecoders import registry as coders_registry + + +class AvroGenericTestCoder(AvroGenericCoder): + SCHEMA = """ + { + "type": "record", "name": "testrecord", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] + } + """ + + def __init__(self): + super(AvroGenericTestCoder, self).__init__(self.SCHEMA) + + +class AvroGenericTestRecord(AvroGenericRecord): + pass + + +coders_registry.register_coder(AvroGenericTestRecord, AvroGenericTestCoder) + + +class CodersTest(unittest.TestCase): + + def test_avro_generic_record_coder(self): + real_coder = coders_registry.get_coder(AvroGenericTestRecord) + expected_coder = AvroGenericTestCoder() + self.assertEqual( + real_coder.encode( + AvroGenericTestRecord({"name": "Daenerys targaryen", "age": 23})), + expected_coder.encode( + AvroGenericTestRecord({"name": "Daenerys targaryen", "age": 23})) + ) + self.assertEqual( + AvroGenericTestRecord({"name": "Jon Snow", "age": 23}), + real_coder.decode( + real_coder.encode( + AvroGenericTestRecord({"name": "Jon Snow", "age": 23})) + ) + ) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/examples/wordcount_xlang.py b/sdks/python/apache_beam/examples/wordcount_xlang.py index f4071f861c0d..ea707d0d0c44 100644 --- a/sdks/python/apache_beam/examples/wordcount_xlang.py +++ b/sdks/python/apache_beam/examples/wordcount_xlang.py @@ -70,7 +70,7 @@ def run(p, input_file, output_file): | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(bytes)) | 'count' >> beam.ExternalTransform( - 'pytest:beam:transforms:count', None, EXPANSION_SERVICE_ADDR)) + 'beam:transforms:xlang:count', None, EXPANSION_SERVICE_ADDR)) # Format the counts into a PCollection of strings. def format_result(word_count): diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py index 8de68abd46cd..a03260c5cf8e 100644 --- a/sdks/python/apache_beam/io/external/generate_sequence.py +++ b/sdks/python/apache_beam/io/external/generate_sequence.py @@ -48,11 +48,12 @@ class GenerateSequence(ptransform.PTransform): this source. At the moment only the Flink Runner supports this. """ + URN = 'beam:external:java:generate_sequence:v1' + def __init__(self, start, stop=None, elements_per_period=None, max_read_time=None, expansion_service='localhost:8097'): super(GenerateSequence, self).__init__() - self._urn = 'beam:external:java:generate_sequence:v1' self.start = start self.stop = stop self.elements_per_period = elements_per_period @@ -87,6 +88,6 @@ def expand(self, pbegin): payload = ExternalConfigurationPayload(configuration=args) return pbegin.apply( ExternalTransform( - self._urn, + self.URN, payload.SerializeToString(), self.expansion_service)) diff --git a/sdks/python/apache_beam/io/external/generate_sequence_test.py b/sdks/python/apache_beam/io/external/generate_sequence_test.py new file mode 100644 index 000000000000..9e91a26d8d5a --- /dev/null +++ b/sdks/python/apache_beam/io/external/generate_sequence_test.py @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for cross-language generate sequence.""" + +from __future__ import absolute_import +from __future__ import print_function + +import logging +import os +import re +import unittest + +from nose.plugins.attrib import attr + +from apache_beam.io.external.generate_sequence import GenerateSequence +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +@attr('UsesCrossLanguageTransforms') +class XlangGenerateSequenceTest(unittest.TestCase): + def test_generate_sequence(self): + test_pipeline = TestPipeline() + port = os.environ.get('EXPANSION_PORT') + if not port: + print("EXPANSION_PORT environment var is not provided. skipping.") + return + address = 'localhost:%s' % port + + try: + with test_pipeline as p: + res = ( + p + | GenerateSequence(start=1, stop=10, + expansion_service=address) + ) + + assert_that(res, equal_to([i for i in range(1, 10)])) + except RuntimeError as e: + if re.search(GenerateSequence.URN, str(e)): + print("looks like URN not implemented in expansion service, skipping.") + else: + raise e + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_test.py b/sdks/python/apache_beam/io/external/xlang_kafkaio_test.py new file mode 100644 index 000000000000..e8c60ccb2dd3 --- /dev/null +++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_test.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for cross-language parquet io read/write.""" + +from __future__ import absolute_import +from __future__ import print_function + +import logging +import os +import re +import unittest + +from nose.plugins.attrib import attr +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +KAFKA_WRITE_URN = "beam:transforms:xlang:kafka_write" +KAFKA_READ_URN = "beam:transforms:xlang:kafka_read" + + +@attr('UsesCrossLanguageTransforms') +class XlangKafkaIOTest(unittest.TestCase): + def test_write_and_read(self): + expansion_jar = os.environ.get('EXPANSION_JAR') + if not expansion_jar: + print("EXPANSION_JAR environment variable is not set.") + return + + read_pipeline = TestPipeline(blocking=False) + read_pipeline.get_pipeline_options().view_as( + DebugOptions).experiments.append('jar_packages='+expansion_jar) + + port = read_pipeline.get_option("expansion_port") + if not port: + print("--expansion_port is not provided. skipping.") + return + address = 'localhost:%s' % port + + try: + read = read_pipeline \ + | beam.ExternalTransform( + KAFKA_READ_URN, + b'localhost:9092', address) + + assert_that(read, equal_to(['abc', 'def', 'ghi'])) + + read_result = read_pipeline.run(test_runner_api=False) + + write_pipeline = TestPipeline() + write_pipeline.get_pipeline_options().view_as( + DebugOptions).experiments.append('jar_packages='+expansion_jar) + write_pipeline.not_use_test_runner_api = True + + with write_pipeline as p: + _ = p \ + | beam.Create(['abc', 'def', 'ghi']).with_output_types(unicode) \ + | beam.ExternalTransform( + KAFKA_WRITE_URN, + b'localhost:9092', address) + + read_result.wait_until_finish() + + except RuntimeError as e: + if re.search( + '{}|{}'.format(KAFKA_WRITE_URN, KAFKA_READ_URN), str(e)): + print("looks like URN not implemented in expansion service, skipping.") + else: + raise e + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py new file mode 100644 index 000000000000..6893164846a5 --- /dev/null +++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for cross-language parquet io read/write.""" + +from __future__ import absolute_import +from __future__ import print_function + +import logging +import os +import re +import unittest + +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam import coders +from apache_beam.coders.avro_generic_coder import AvroGenericCoder +from apache_beam.coders.avro_generic_coder import AvroGenericRecord +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +PARQUET_WRITE_URN = "beam:transforms:xlang:parquet_write" +PARQUET_READ_URN = "beam:transforms:xlang:parquet_read" + + +@attr('UsesCrossLanguageTransforms') +class XlangParquetIOTest(unittest.TestCase): + def test_write_and_read(self): + test_pipeline = TestPipeline() + expansion_jar = os.environ.get('EXPANSION_JAR') + if not expansion_jar: + print("EXPANSION_JAR environment variable is not set.") + return + test_pipeline.get_pipeline_options().view_as( + DebugOptions).experiments.append('jar_packages='+expansion_jar) + port = os.environ.get('EXPANSION_PORT') + if not port: + print("EXPANSION_PORT environment var is not provided. skipping.") + return + address = 'localhost:%s' % port + test_pipeline.not_use_test_runner_api = True + + try: + with test_pipeline as p: + res = p \ + | beam.Create([ + AvroGenericRecord({"name": "abc"}), + AvroGenericRecord({"name": "def"}), + AvroGenericRecord({"name": "ghi"})]) \ + | beam.ExternalTransform( + PARQUET_WRITE_URN, + b'/tmp/test.parquet', address) \ + | beam.ExternalTransform( + PARQUET_READ_URN, None, address) \ + | beam.Map(lambda x: '%s' % x.record['name']) + + assert_that(res, equal_to(['abc', 'def', 'ghi'])) + except RuntimeError as e: + if re.search( + '{}|{}'.format(PARQUET_WRITE_URN, PARQUET_READ_URN), str(e)): + print("looks like URN not implemented in expansion service, skipping.") + else: + raise e + + +class AvroGenericTestCoder(AvroGenericCoder): + SCHEMA = """ + { + "type": "record", "name": "testrecord", + "fields": [ {"name": "name", "type": "string"} ] + } + """ + + def __init__(self): + super(AvroGenericTestCoder, self).__init__(self.SCHEMA) + + +coders.registry.register_coder(AvroGenericRecord, AvroGenericTestCoder) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_test.py b/sdks/python/apache_beam/runners/portability/expansion_service_test.py index 9e93a6475b08..e14769efa76e 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py @@ -26,6 +26,10 @@ import apache_beam as beam import apache_beam.transforms.combiners as combine +# load Avro generic coder for test +# pylint: disable=unused-import +from apache_beam.coders.avro_generic_coder import AvroGenericCoder +# pylint: enable=unused-import from apache_beam.pipeline import PipelineOptions from apache_beam.portability.api import beam_expansion_api_pb2_grpc from apache_beam.runners.portability import expansion_service @@ -35,22 +39,40 @@ # external transform test cases. See external_test.py for details. -@ptransform.PTransform.register_urn('count_per_element_bytes', None) -class KV2BytesTransform(ptransform.PTransform): +@ptransform.PTransform.register_urn('beam:transforms:xlang:count', None) +class CountPerElementTransform(ptransform.PTransform): def expand(self, pcoll): return ( - pcoll - | combine.Count.PerElement() - | beam.Map( - lambda x: '{}->{}'.format(x[0], x[1])).with_output_types(bytes) + pcoll | combine.Count.PerElement() ) def to_runner_api_parameter(self, unused_context): - return 'kv_to_bytes', None + return 'beam:transforms:xlang:count', None @staticmethod def from_runner_api_parameter(unused_parameter, unused_context): - return KV2BytesTransform() + return CountPerElementTransform() + + +@ptransform.PTransform.register_urn( + 'beam:transforms:xlang:filter_less_than_eq', bytes) +class FilterLessThanTransform(ptransform.PTransform): + def __init__(self, payload): + self._payload = payload + + def expand(self, pcoll): + return ( + pcoll | beam.Filter( + lambda elem, target: elem <= target, int(ord(self._payload[0]))) + ) + + def to_runner_api_parameter(self, unused_context): + return ( + 'beam:transforms:xlang:filter_less_than', self._payload.encode('utf8')) + + @staticmethod + def from_runner_api_parameter(payload, unused_context): + return FilterLessThanTransform(payload.decode('utf8')) @ptransform.PTransform.register_urn('simple', None) @@ -133,6 +155,11 @@ def from_runner_api_parameter(level, unused_context): server = None +def cleanup(unused_signum, unused_frame): + logging.info('Shutting down expansion service.') + server.stop(None) + + def main(unused_argv): parser = argparse.ArgumentParser() parser.add_argument('-p', '--port', @@ -148,19 +175,12 @@ def main(unused_argv): server.start() logging.info('Listening for expansion requests at %d', options.port) + signal.signal(signal.SIGTERM, cleanup) + signal.signal(signal.SIGINT, cleanup) # blocking main thread forever. signal.pause() -def cleanup(unused_signum, unused_frame): - logging.info('Shutting down expansion service.') - server.stop(None) - - -signal.signal(signal.SIGTERM, cleanup) -signal.signal(signal.SIGINT, cleanup) - - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) main(sys.argv) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 41440149d948..569b03c08239 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -58,6 +58,7 @@ from apache_beam.internal import pickler from apache_beam.internal.http_client import get_new_http from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import WorkerOptions # TODO(angoenka): Remove reference to dataflow internal names @@ -196,6 +197,15 @@ def stage_job_resources(self, setup_options.extra_packages, staging_location, temp_dir=temp_dir)) + # Handle jar packages that should be staged for Java SDK Harness. + jar_packages = options.view_as( + DebugOptions).lookup_experiment('jar_packages') + if jar_packages is not None: + resources.extend( + self._stage_jar_packages( + jar_packages.split(':'), staging_location, + temp_dir=temp_dir)) + # Pickle the main session if requested. # We will create the pickled main session locally and then copy it to the # staging location because the staging location is a remote path and the @@ -308,6 +318,56 @@ def _download_file(from_url, to_path): def _is_remote_path(path): return path.find('://') != -1 + def _stage_jar_packages(self, jar_packages, staging_location, temp_dir): + """Stages a list of local jar packages for Java SDK Harness. + + :param jar_packages: Ordered list of local paths to jar packages to be + staged. Only packages on localfile system and GCS are supported. + :param staging_location: Staging location for the packages. + :param temp_dir: Temporary folder where the resource building can happen. + :return: A list of file names (no paths) for the resource staged. All the + files are assumed to be staged in staging_location. + :raises: + RuntimeError: If files specified are not found or do not have expected + name patterns. + """ + resources = [] + staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) + local_packages = [] + for package in jar_packages: + if not os.path.basename(package).endswith('.jar'): + raise RuntimeError( + 'The --experiment=\'jar_packages=\' option expects a full path ' + 'ending with ".jar" instead of %s' % package) + + if not os.path.isfile(package): + if Stager._is_remote_path(package): + # Download remote package. + logging.info('Downloading jar package: %s locally before staging', + package) + _, last_component = FileSystems.split(package) + local_file_path = FileSystems.join(staging_temp_dir, last_component) + Stager._download_file(package, local_file_path) + else: + raise RuntimeError( + 'The file %s cannot be found. It was specified in the ' + '--experiment=\'jar_packages=\' command line option.' % package) + else: + local_packages.append(package) + + local_packages.extend([ + FileSystems.join(staging_temp_dir, f) + for f in os.listdir(staging_temp_dir) + ]) + + for package in local_packages: + basename = os.path.basename(package) + staged_path = FileSystems.join(staging_location, basename) + self.stage_artifact(package, staged_path) + resources.append(basename) + + return resources + def _stage_extra_packages(self, extra_packages, staging_location, temp_dir): """Stages a list of local extra packages. diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index d73366d807d3..2593be17fc9e 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -28,6 +28,7 @@ import mock from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.runners.dataflow.internal import names @@ -60,6 +61,26 @@ def create_temp_file(self, path, contents): f.write(contents) return f.name + # We can not rely on actual remote file systems paths hence making + # '/tmp/remote/' a new remote path. + def is_remote_path(self, path): + return path.startswith('/tmp/remote/') + + remote_copied_files = [] + + def file_copy(self, from_path, to_path): + if self.is_remote_path(from_path): + self.remote_copied_files.append(from_path) + _, from_name = os.path.split(from_path) + if os.path.isdir(to_path): + to_path = os.path.join(to_path, from_name) + self.create_temp_file(to_path, 'nothing') + logging.info('Fake copied remote file: %s to %s', from_path, to_path) + elif self.is_remote_path(to_path): + logging.info('Faking upload_file(%s, %s)', from_path, to_path) + else: + shutil.copyfile(from_path, to_path) + def populate_requirements_cache(self, requirements_file, cache_dir): _ = requirements_file self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing') @@ -422,14 +443,9 @@ def test_sdk_location_remote_wheel_file(self, *unused_mocks): self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - # We can not rely on actual remote file systems paths hence making - # '/tmp/remote/' a new remote path. - def is_remote_path(path): - return path.startswith('/tmp/remote/') - with mock.patch( 'apache_beam.runners.portability.stager_test' - '.stager.Stager._is_remote_path', staticmethod(is_remote_path)): + '.stager.Stager._is_remote_path', staticmethod(self.is_remote_path)): self.assertEqual([sdk_filename], self.stager.stage_job_resources( options, staging_location=staging_dir)[1]) @@ -477,32 +493,14 @@ def test_with_extra_packages(self): os.path.join(source_dir, 'whl.whl'), '/tmp/remote/remote_file.tar.gz' ] - remote_copied_files = [] - - # We can not rely on actual remote file systems paths hence making - # '/tmp/remote/' a new remote path. - def is_remote_path(path): - return path.startswith('/tmp/remote/') - - def file_copy(from_path, to_path): - if is_remote_path(from_path): - remote_copied_files.append(from_path) - _, from_name = os.path.split(from_path) - if os.path.isdir(to_path): - to_path = os.path.join(to_path, from_name) - self.create_temp_file(to_path, 'nothing') - logging.info('Fake copied remote file: %s to %s', from_path, to_path) - elif is_remote_path(to_path): - logging.info('Faking upload_file(%s, %s)', from_path, to_path) - else: - shutil.copyfile(from_path, to_path) + self.remote_copied_files = [] with mock.patch( 'apache_beam.runners.portability.stager_test' - '.stager.Stager._download_file', staticmethod(file_copy)): + '.stager.Stager._download_file', staticmethod(self.file_copy)): with mock.patch( 'apache_beam.runners.portability.stager_test' - '.stager.Stager._is_remote_path', staticmethod(is_remote_path)): + '.stager.Stager._is_remote_path', staticmethod(self.is_remote_path)): self.assertEqual([ 'abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'remote_file.tar.gz', stager.EXTRA_PACKAGES_FILE @@ -513,7 +511,8 @@ def file_copy(from_path, to_path): 'abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n', 'whl.whl\n', 'remote_file.tar.gz\n' ], f.readlines()) - self.assertEqual(['/tmp/remote/remote_file.tar.gz'], remote_copied_files) + self.assertEqual( + ['/tmp/remote/remote_file.tar.gz'], self.remote_copied_files) def test_with_extra_packages_missing_files(self): staging_dir = self.make_temp_dir() @@ -546,6 +545,70 @@ def test_with_extra_packages_invalid_file_name(self): '".tar", ".tar.gz", ".whl" or ".zip" ' 'instead of %s' % os.path.join(source_dir, 'abc.tgz')) + def test_with_jar_packages_missing_files(self): + staging_dir = self.make_temp_dir() + with self.assertRaises(RuntimeError) as cm: + + options = PipelineOptions() + self.update_options(options) + options.view_as(DebugOptions).experiments = [ + 'jar_packages=nosuchfile.jar' + ] + self.stager.stage_job_resources(options, staging_location=staging_dir) + self.assertEqual( + cm.exception.args[0], + 'The file %s cannot be found. It was specified in the ' + '--experiment=\'jar_packages=\' command line option.' % + 'nosuchfile.jar') + + def test_with_jar_packages_invalid_file_name(self): + staging_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + self.create_temp_file(os.path.join(source_dir, 'abc.tgz'), 'nothing') + with self.assertRaises(RuntimeError) as cm: + options = PipelineOptions() + self.update_options(options) + options.view_as(DebugOptions).experiments = [ + 'jar_packages='+os.path.join(source_dir, 'abc.tgz') + ] + self.stager.stage_job_resources(options, staging_location=staging_dir) + self.assertEqual( + cm.exception.args[0], + 'The --experiment=\'jar_packages=\' option expects a full path ending ' + 'with ".jar" instead of %s' % os.path.join(source_dir, 'abc.tgz')) + + def test_with_jar_packages(self): + staging_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + self.create_temp_file(os.path.join(source_dir, 'abc.jar'), 'nothing') + self.create_temp_file(os.path.join(source_dir, 'xyz.jar'), 'nothing') + self.create_temp_file(os.path.join(source_dir, 'ijk.jar'), 'nothing') + + options = PipelineOptions() + self.update_options(options) + options.view_as(DebugOptions).experiments = [ + 'jar_packages=%s:%s:%s:%s' % ( + os.path.join(source_dir, 'abc.jar'), + os.path.join(source_dir, 'xyz.jar'), + os.path.join(source_dir, 'ijk.jar'), + '/tmp/remote/remote.jar' + ) + ] + + self.remote_copied_files = [] + + with mock.patch( + 'apache_beam.runners.portability.stager_test' + '.stager.Stager._download_file', staticmethod(self.file_copy)): + with mock.patch( + 'apache_beam.runners.portability.stager_test' + '.stager.Stager._is_remote_path', staticmethod(self.is_remote_path)): + self.assertEqual([ + 'abc.jar', 'xyz.jar', 'ijk.jar', 'remote.jar' + ], self.stager.stage_job_resources( + options, staging_location=staging_dir)[1]) + self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files) + class TestStager(stager.Stager): diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 3748bd4b9906..1c569c8ab55e 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -136,6 +136,7 @@ def main(unused_argv): service_descriptor = endpoints_pb2.ApiServiceDescriptor() text_format.Merge(os.environ['CONTROL_API_SERVICE_DESCRIPTOR'], service_descriptor) + _load_avro_generic_coder(sdk_pipeline_options) # TODO(robertwb): Support credentials. assert not service_descriptor.oauth2_client_credentials_grant.url SdkHarness( @@ -197,6 +198,19 @@ def _get_worker_count(pipeline_options): return 12 +def _load_avro_generic_coder(pipeline_options): + experiments = pipeline_options.view_as(DebugOptions).experiments + + experiments = experiments if experiments else [] + + for experiment in experiments: + # There should only be 1 match so returning from the loop + if re.match(r'xlang_test', experiment): + # pylint: disable=unused-variable + from apache_beam.coders.avro_generic_coder import AvroGenericCoder + return + + def _load_main_session(semi_persistent_directory): """Loads a pickled main session from the path specified.""" if semi_persistent_directory: diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index f177e7127185..4417f0ea48fb 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -112,6 +112,9 @@ def run(self, test_runner_api=True): return result + def get_pipeline_options(self): + return self._options + def _parse_test_option_args(self, argv): """Parse value of command line argument: --test-pipeline-options to get pipeline options. diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index e8345a1651c1..600b3ede16b0 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -129,10 +129,13 @@ class PerElement(ptransform.PTransform): def expand(self, pcoll): paired_with_void_type = KV[pcoll.element_type, Any] - return (pcoll - | ('%s:PairWithVoid' % self.label >> core.Map(lambda x: (x, None)) - .with_output_types(paired_with_void_type)) - | core.CombinePerKey(CountCombineFn())) + output_type = KV[pcoll.element_type, int] + return ( + pcoll + | ('%s:PairWithVoid' % self.label >> core.Map(lambda x: (x, None)) + .with_output_types(paired_with_void_type)) + | core.CombinePerKey(CountCombineFn()).with_output_types(output_type) + ) @with_input_types(Any) diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index 5495ccc23bdf..c38a2abf78e0 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -20,21 +20,23 @@ from __future__ import absolute_import import argparse +import os import subprocess import sys import unittest import grpc from mock import patch +from nose.plugins.attrib import attr from past.builtins import unicode import apache_beam as beam from apache_beam import Pipeline from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions -from apache_beam.portability import python_urns from apache_beam.runners.portability import expansion_service from apache_beam.runners.portability.expansion_service_test import FibTransform +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -47,6 +49,7 @@ # pylint: enable=wrong-import-order, wrong-import-position +@attr('UsesCrossLanguageTransforms') class ExternalTransformTest(unittest.TestCase): # This will be overwritten if set via a flag. @@ -55,26 +58,33 @@ class ExternalTransformTest(unittest.TestCase): class _RunWithExpansion(object): - def __init__(self, port, expansion_service_jar): - self._port = port - self._expansion_service_jar = expansion_service_jar + def __init__(self): + self._server = None def __enter__(self): - if not ExternalTransformTest.expansion_service_jar: - raise unittest.SkipTest('No expansion service jar provided.') + if not (ExternalTransformTest.expansion_service_jar or + ExternalTransformTest.expansion_service_port): + raise unittest.SkipTest('No expansion service jar or port provided.') + + ExternalTransformTest.expansion_service_port = ( + ExternalTransformTest.expansion_service_port or 8091) + + jar = ExternalTransformTest.expansion_service_jar + port = ExternalTransformTest.expansion_service_port # Start the java server and wait for it to be ready. - self._server = subprocess.Popen( - ['java', '-jar', self._expansion_service_jar, str(self._port)]) + if jar: + self._server = subprocess.Popen(['java', '-jar', jar, str(port)]) - port = ExternalTransformTest.expansion_service_port or 8091 address = 'localhost:%s' % str(port) with grpc.insecure_channel(address) as channel: grpc.channel_ready_future(channel).result() def __exit__(self, type, value, traceback): - self._server.kill() + if self._server: + self._server.kill() + self._server = None def test_pipeline_generation(self): pipeline = beam.Pipeline() @@ -137,17 +147,10 @@ def test_nested(self): assert_that(p | FibTransform(6), equal_to([8])) def test_java_expansion_portable_runner(self): - pipeline_options = PipelineOptions( - ['--runner=PortableRunner', - '--experiments=beam_fn_api', - '--environment_type=%s' % python_urns.EMBEDDED_PYTHON, - '--job_endpoint=embed']) - - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options.view_as(SetupOptions).save_main_session = True + ExternalTransformTest.expansion_service_port = os.environ.get( + 'EXPANSION_PORT') - ExternalTransformTest.run_pipeline_with_portable_runner(pipeline_options) + ExternalTransformTest.run_pipeline_with_portable_runner(None) @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed') def test_java_expansion_dataflow(self): @@ -156,21 +159,17 @@ def test_java_expansion_dataflow(self): with patch.object( apiclient.DataflowApplicationClient, 'create_job') as mock_create_job: - port = ExternalTransformTest.expansion_service_port or 8091 - with self._RunWithExpansion(port, self.expansion_service_jar): + with self._RunWithExpansion(): pipeline_options = PipelineOptions( ['--runner=DataflowRunner', '--project=dummyproject', '--experiments=beam_fn_api', '--temp_location=gs://dummybucket/']) - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module - # level). - pipeline_options.view_as(SetupOptions).save_main_session = True - # Run a simple count-filtered-letters pipeline. - self.run_pipeline(pipeline_options, port, False) + self.run_pipeline( + pipeline_options, ExternalTransformTest.expansion_service_port, + False) mock_args = mock_create_job.call_args_list assert mock_args @@ -181,24 +180,27 @@ def test_java_expansion_dataflow(self): @staticmethod def run_pipeline_with_portable_runner(pipeline_options): - - port = ExternalTransformTest.expansion_service_port or 8091 - - with ExternalTransformTest._RunWithExpansion( - port, ExternalTransformTest.expansion_service_jar): + with ExternalTransformTest._RunWithExpansion(): # Run a simple count-filtered-letters pipeline. - ExternalTransformTest.run_pipeline(pipeline_options, port, True) + ExternalTransformTest.run_pipeline( + pipeline_options, ExternalTransformTest.expansion_service_port, True) @staticmethod def run_pipeline( pipeline_options, expansion_service_port, wait_until_finish=True): # The actual definitions of these transforms is in # org.apache.beam.runners.core.construction.TestExpansionService. - TEST_COUNT_URN = "pytest:beam:transforms:count" - TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than" + TEST_COUNT_URN = "beam:transforms:xlang:count" + TEST_FILTER_URN = "beam:transforms:xlang:filter_less_than_eq" # Run a simple count-filtered-letters pipeline. - p = beam.Pipeline(options=pipeline_options) + p = TestPipeline(options=pipeline_options) + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module + # level). + p.get_pipeline_options().view_as(SetupOptions).save_main_session = True + address = 'localhost:%s' % str(expansion_service_port) res = ( p diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index 8c6f1c07cc99..10a5382c3aa5 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -397,10 +397,10 @@ project.task('crossLanguagePythonJavaFlink') { dependsOn ':beam-runners-flink_2.11-job-server-container:docker' dependsOn ':beam-sdks-python-container:docker' dependsOn ':beam-sdks-java-container:docker' - dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar' + dependsOn ':beam-sdks-java-test-expansion-service:buildTestExpansionServiceJar' doLast { - def testServiceExpansionJar = project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath + def testServiceExpansionJar = project(":beam-sdks-java-test-expansion-service:").buildTestExpansionServiceJar.archivePath def options = [ "--runner=PortableRunner", "--experiments=worker_threads=100", @@ -422,10 +422,10 @@ project.task('crossLanguagePortableWordCount') { dependsOn ':beam-runners-flink_2.11-job-server-container:docker' dependsOn ':beam-sdks-python-container:docker' dependsOn ':beam-sdks-java-container:docker' - dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar' + dependsOn ':beam-sdks-java-test-expansion-service:buildTestExpansionServiceJar' doLast { - def testServiceExpansionJar = project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath + def testServiceExpansionJar = project(":beam-sdks-java-test-expansion-service:").buildTestExpansionServiceJar.archivePath def options = [ "--input=/etc/profile", "--output=/tmp/py-wordcount-portable", diff --git a/sdks/python/scripts/run_expansion_services.sh b/sdks/python/scripts/run_expansion_services.sh new file mode 100755 index 000000000000..0cf18c6e58ea --- /dev/null +++ b/sdks/python/scripts/run_expansion_services.sh @@ -0,0 +1,136 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +read -r -d '' USAGE </dev/null 2>&1 +if [[ $? -eq 0 ]]; then + exec 200>$lock + if ! flock -n 200; then + echo "script already running." + exit 0 + fi +fi + +case $STARTSTOP in + start) + if [ -f "$pid" ]; then + echo "services already running." + exit 0 + fi + + echo "Launching Java expansion service @ $JAVA_PORT" + java -jar $JAVA_EXPANSION_SERVICE_JAR $JAVA_PORT >$TEMP_DIR/$FILE_BASE-java.log 2>&1 /dev/null 2>&1; then + echo $mypid >> $pid + else + echo "Can't start Java expansion service." + fi + + echo "Launching Python expansion service @ $PYTHON_PORT" + sh -c ". $PYTHON_VIRTUALENV_DIR/bin/activate && python -m $PYTHON_EXPANSION_SERVICE_MODULE -p $PYTHON_PORT" >$TEMP_DIR/$FILE_BASE-python.log 2>&1 /dev/null 2>&1; then + echo $mypid >> $pid + else + echo "Can't start Python expansion service." + fi + ;; + stop) + if [ -f "$pid" ]; then + while read stop_pid; do + if kill -0 $stop_pid >/dev/null 2>&1; then + echo "Stopping expansion service pid: $stop_pid." + kill $stop_pid + else + echo "Skipping invalid pid: $stop_pid." + fi + done < $pid + rm $pid + fi + ;; +esac +flock -u 200 diff --git a/sdks/python/scripts/run_kafka_services.sh b/sdks/python/scripts/run_kafka_services.sh new file mode 100755 index 000000000000..a8147cabd7cc --- /dev/null +++ b/sdks/python/scripts/run_kafka_services.sh @@ -0,0 +1,74 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +read -r -d '' USAGE <> config/server.properties + fi + echo "Starting local Zookeeper server." + bin/zookeeper-server-start.sh -daemon config/zookeeper.properties + sleep 3 + echo "Starting local Kafka server." + bin/kafka-server-start.sh -daemon config/server.properties + sleep 3 + echo "Cleanup beam-kafkaio-test topic." + bin/kafka-topics.sh --zookeeper localhost:2181 --topic beam-kafkaio-test --delete --if-exists + ;; + stop) + cd $TEMP_DIR/kafka_$SCALA_VERSION-$KAFKA_VERSION + echo "Stopping local Kafka server." + bin/kafka-server-stop.sh + sleep 3 + echo "Stopping local Zookeeper server." + bin/zookeeper-server-stop.sh + ;; +esac diff --git a/settings.gradle b/settings.gradle index f1fa3dca48f8..27962ae42379 100644 --- a/settings.gradle +++ b/settings.gradle @@ -203,6 +203,8 @@ include "beam-sdks-java-maven-archetypes-starter" project(":beam-sdks-java-maven-archetypes-starter").dir = file("sdks/java/maven-archetypes/starter") include "beam-sdks-java-nexmark" project(":beam-sdks-java-nexmark").dir = file("sdks/java/testing/nexmark") +include "beam-sdks-java-test-expansion-service" +project(":beam-sdks-java-test-expansion-service").dir = file("sdks/java/testing/expansion-service") include "beam-sdks-python" project(":beam-sdks-python").dir = file("sdks/python") include "beam-sdks-python-container"