Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,29 @@ class BeamModulePlugin implements Plugin<Project> {
}
}

// A class defining the configuration for CrossLanguageValidatesRunner.
class CrossLanguageValidatesRunnerConfiguration {
// Task name for cross-language validate runner case.
String name = 'validatesCrossLanguageRunner'
// Fully qualified JobServerClass name to use.
String jobServerDriver
// A string representing the jobServer Configuration.
String jobServerConfig
// Number of parallel test runs.
Integer numParallelTests = 1
// Extra options to pass to TestPipeline
String[] pipelineOpts = []
// Categories for tests to run.
Closure testCategories = {
includeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
// Use the following to include / exclude categories:
// includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
}
// Configuration for the classpath when running the test.
Configuration testClasspathConfiguration
}

def isRelease(Project project) {
return project.hasProperty('isRelease')
}
Expand Down Expand Up @@ -1572,16 +1595,13 @@ class BeamModulePlugin implements Plugin<Project> {
*/
project.evaluationDependsOn(":beam-sdks-java-core")
project.evaluationDependsOn(":beam-runners-core-java")
project.evaluationDependsOn(":beam-runners-core-construction-java")
def config = it ? it as PortableValidatesRunnerConfiguration : new PortableValidatesRunnerConfiguration()
def name = config.name
def beamTestPipelineOptions = [
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
"--jobServerDriver=${config.jobServerDriver}",
"--environmentCacheMillis=10000"
]
def expansionPort = startingExpansionPortNumber.getAndDecrement()
config.systemProperties.put("expansionPort", expansionPort)
beamTestPipelineOptions.addAll(config.pipelineOpts)
if (config.environment == PortableValidatesRunnerConfiguration.Environment.EMBEDDED) {
beamTestPipelineOptions += "--defaultEnvironmentType=EMBEDDED"
Expand All @@ -1595,7 +1615,7 @@ class BeamModulePlugin implements Plugin<Project> {
description = "Validates the PortableRunner with JobServer ${config.jobServerDriver}"
systemProperties config.systemProperties
classpath = config.testClasspathConfiguration
testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-construction-java").sourceSets.test.output.classesDirs)
testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs)
maxParallelForks config.numParallelTests
useJUnit(config.testCategories)
// increase maxHeapSize as this is directly correlated to direct memory,
Expand All @@ -1609,6 +1629,110 @@ class BeamModulePlugin implements Plugin<Project> {

/** ***********************************************************************************************/

// Method to create the crossLanguageValidatesRunnerTask.
// The method takes crossLanguageValidatesRunnerConfiguration as parameter.
project.ext.createCrossLanguageValidatesRunnerTask = {
def config = it ? it as CrossLanguageValidatesRunnerConfiguration : new CrossLanguageValidatesRunnerConfiguration()

project.evaluationDependsOn(":beam-sdks-python")
project.evaluationDependsOn(":beam-sdks-java-test-expansion-service")
project.evaluationDependsOn(":beam-runners-core-construction-java")

// Task for launching expansion services
def envDir = project.project(":beam-sdks-python").envdir
def pythonDir = project.project(":beam-sdks-python").projectDir
def javaPort = startingExpansionPortNumber.getAndDecrement()
def pythonPort = startingExpansionPortNumber.getAndDecrement()
def expansionJar = project.project(':beam-sdks-java-test-expansion-service').buildTestExpansionServiceJar.archivePath
def expansionServiceOpts = [
"group_id": project.name,
"java_expansion_service_jar": expansionJar,
"java_port": javaPort,
"python_virtualenv_dir": envDir,
"python_expansion_service_module": "apache_beam.runners.portability.expansion_service_test",
"python_port": pythonPort
]
def serviceArgs = project.project(':beam-sdks-python').mapToArgString(expansionServiceOpts)
def setupTask = project.tasks.create(name: config.name+"Setup", type: Exec) {
dependsOn ':beam-sdks-java-container:docker'
dependsOn ':beam-sdks-python-container:docker'
dependsOn ':beam-sdks-java-test-expansion-service:buildTestExpansionServiceJar'
dependsOn ":beam-sdks-python:installGcpTest"
// setup test env
executable 'sh'
args '-c', "$pythonDir/scripts/run_expansion_services.sh start $serviceArgs; $pythonDir/scripts/run_kafka_services.sh start"
}

def mainTask = project.tasks.create(name: config.name) {
group = "Verification"
description = "Validates cross-language capability of runner"
}

def cleanupTask = project.tasks.create(name: config.name+'Cleanup', type: Exec) {
// teardown test env
executable 'sh'
args '-c', "$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}; $pythonDir/scripts/run_kafka_services.sh stop"
}
setupTask.finalizedBy cleanupTask

// Task for running testcases in Java SDK
def beamJavaTestPipelineOptions = [
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
"--jobServerDriver=${config.jobServerDriver}",
"--environmentCacheMillis=10000"
]
beamJavaTestPipelineOptions.addAll(config.pipelineOpts)
if (config.jobServerConfig) {
beamJavaTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
}
['Java': javaPort, 'Python': pythonPort].each { sdk, port ->
def javaTask = project.tasks.create(name: config.name+"JavaUsing"+sdk, type: Test) {
group = "Verification"
description = "Validates runner for cross-language capability of using ${sdk} transforms from Java SDK"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(beamJavaTestPipelineOptions)
systemProperty "expansionPort", port
classpath = config.testClasspathConfiguration
testClassesDirs = project.files(project.project(":beam-runners-core-construction-java").sourceSets.test.output.classesDirs)
maxParallelForks config.numParallelTests
useJUnit(config.testCategories)
// increase maxHeapSize as this is directly correlated to direct memory,
// see https://issues.apache.org/jira/browse/BEAM-6698
maxHeapSize = '4g'
dependsOn setupTask
}
mainTask.dependsOn javaTask
cleanupTask.mustRunAfter javaTask

// Task for running testcases in Python SDK
def testOpts = [
"--attr=UsesCrossLanguageTransforms"
]
def pipelineOpts = [
"--runner=PortableRunner",
"--experiments=xlang_test",
"--environment_cache_millis=10000"
]
def beamPythonTestPipelineOptions = [
"pipeline_opts": pipelineOpts,
"test_opts": testOpts
]
def cmdArgs = project.project(':beam-sdks-python').mapToArgString(beamPythonTestPipelineOptions)
def pythonTask = project.tasks.create(name: config.name+"PythonUsing"+sdk, type: Exec) {
group = "Verification"
description = "Validates runner for cross-language capability of using ${sdk} transforms from Python SDK"
environment "EXPANSION_PORT", port
environment "EXPANSION_JAR", expansionJar
executable 'sh'
args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs"
dependsOn setupTask
}
mainTask.dependsOn pythonTask
cleanupTask.mustRunAfter pythonTask
}
}

/** ***********************************************************************************************/

project.ext.applyPythonNature = {

// Define common lifecycle tasks and artifact types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@ examples:

---

coder:
urn: "beam:coder:string_utf8:v1"
nested: false
examples:
"abc": abc
"ab\0c": "ab\0c"
"\u00c3\u00bf": "\u00ff"
"\u00e5\u0085\u0089\u00e7\u00ba\u00bf": "光线"

---

coder:
urn: "beam:coder:string_utf8:v1"
nested: true
examples:
"\u0003abc": abc
"\u0004ab\0c": "ab\0c"
"\u0002\u00c3\u00bf": "\u00ff"
"\u0006\u00e5\u0085\u0089\u00e7\u00ba\u00bf": "光线"
"\u00c8\u0001 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|":
" 10| 20| 30| 40| 50| 60| 70| 80| 90| 100| 110| 120| 130| 140| 150| 160| 170| 180| 190| 200|"

---

coder:
urn: "beam:coder:varint:v1"
examples:
Expand Down
21 changes: 0 additions & 21 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,3 @@ task runExpansionService (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
args = [project.findProperty("constructionService.port") ?: "8097"]
}

task runTestExpansionService (type: JavaExec) {
main = "org.apache.beam.runners.core.construction.expansion.TestExpansionService"
classpath = sourceSets.test.runtimeClasspath
args = [project.findProperty("constructionService.port") ?: "8097"]
}

task buildTestExpansionServiceJar(type: Jar) {
dependsOn = [shadowJar, shadowTestJar]
appendix = "testExpansionService"
// Use zip64 mode to avoid "Archive contains more than 65535 entries".
zip64 = true
manifest {
attributes(
'Main-Class': 'org.apache.beam.runners.core.construction.expansion.TestExpansionService'
)
}
from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }}
from sourceSets.main.output
from sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;

/** Coder registrar for AvroGenericCoder. */
@AutoService(CoderTranslatorRegistrar.class)
public class AvroGenericCoderRegistrar implements CoderTranslatorRegistrar {
public static final String AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1";

@Override
public Map<Class<? extends Coder>, String> getCoderURNs() {
return ImmutableMap.of(AvroGenericCoder.class, AVRO_GENERIC_CODER_URN);
}

@Override
public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() {
return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;

/** Coder translator for AvroGenericCoder. */
public class AvroGenericCoderTranslator implements CoderTranslator<AvroGenericCoder> {
@Override
public List<? extends Coder<?>> getComponents(AvroGenericCoder from) {
return Collections.emptyList();
}

@Override
public byte[] getPayload(AvroGenericCoder from) {
try {
return from.getSchema().toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("failed to encode schema.");
}
}

@Override
public AvroGenericCoder fromComponents(List<Coder<?>> components, byte[] payload) {
Schema schema;
try {
schema = new Schema.Parser().parse(new String(payload, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("failed to parse schema.");
}
return AvroGenericCoder.of(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
Expand All @@ -47,6 +48,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
static final BiMap<Class<? extends Coder>, String> BEAM_MODEL_CODER_URNS =
ImmutableBiMap.<Class<? extends Coder>, String>builder()
.put(ByteArrayCoder.class, ModelCoders.BYTES_CODER_URN)
.put(StringUtf8Coder.class, ModelCoders.STRING_UTF8_CODER_URN)
.put(KvCoder.class, ModelCoders.KV_CODER_URN)
.put(VarLongCoder.class, ModelCoders.INT64_CODER_URN)
.put(IntervalWindowCoder.class, ModelCoders.INTERVAL_WINDOW_CODER_URN)
Expand All @@ -64,6 +66,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> BEAM_MODEL_CODERS =
ImmutableMap.<Class<? extends Coder>, CoderTranslator<? extends Coder>>builder()
.put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class))
.put(StringUtf8Coder.class, CoderTranslators.atomic(StringUtf8Coder.class))
.put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class))
.put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class))
.put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private ModelCoders() {}
// Where is this required explicitly, instead of implicit within WindowedValue and LengthPrefix
// coders?
public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT);
public static final String STRING_UTF8_CODER_URN = getUrn(StandardCoders.Enum.STRING_UTF8);

public static final String DOUBLE_CODER_URN = getUrn(StandardCoders.Enum.DOUBLE);

Expand All @@ -57,6 +58,7 @@ private ModelCoders() {}
ImmutableSet.of(
BYTES_CODER_URN,
INT64_CODER_URN,
STRING_UTF8_CODER_URN,
ITERABLE_CODER_URN,
TIMER_CODER_URN,
KV_CODER_URN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
Expand All @@ -60,6 +61,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Converter;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
Expand Down Expand Up @@ -343,6 +345,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
SdkComponents sdkComponents =
rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
pipeline.replaceAll(ImmutableList.of(JavaReadViaImpulse.boundedOverride()));
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents);
String expandedTransformId =
Iterables.getOnlyElement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class CoderTranslationTest {
.add(ByteArrayCoder.of())
.add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
.add(VarLongCoder.of())
.add(StringUtf8Coder.of())
.add(IntervalWindowCoder.of())
.add(IterableCoder.of(ByteArrayCoder.of()))
.add(Timer.Coder.of(ByteArrayCoder.of()))
Expand Down
Loading