Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions .test-infra/jenkins/job_PostCommit_TransformService_Direct.groovy

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -402,30 +402,6 @@ class BeamModulePlugin implements Plugin<Project> {
FileCollection classpath
}

// A class defining the configuration for createTransformServiceTask.
static class TransformServiceConfiguration {
// Task name TransformService case.
String name = 'transformService'

List<String> pythonPipelineOptions = []

List<String> javaPipelineOptions = []

// Additional pytest options
List<String> pytestOptions = []
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
// Number of parallel test runs.
Integer numParallelTests = 1
// Whether the pipeline needs --sdk_location option
boolean needsSdkLocation = false

// Collect Python pipeline tests with this marker
String collectMarker
}

def isRelease(Project project) {
return parseBooleanProperty(project, 'isRelease');
}
Expand Down Expand Up @@ -2765,108 +2741,6 @@ class BeamModulePlugin implements Plugin<Project> {

/** ***********************************************************************************************/

// Method to create the createTransformServiceTask.
// The method takes TransformServiceConfiguration as parameter.
project.ext.createTransformServiceTask = {
// This task won't work if the python build file doesn't exist.
if (!project.project(":sdks:python").buildFile.exists()) {
System.err.println 'Python build file not found. Skipping createTransformServiceTask.'
return
}
def config = it ? it as TransformServiceConfiguration : new TransformServiceConfiguration()

project.evaluationDependsOn(":sdks:python")
project.evaluationDependsOn(":runners:core-construction-java")
project.evaluationDependsOn(":sdks:java:extensions:python")
project.evaluationDependsOn(":sdks:java:transform-service:launcher")

def usesDataflowRunner = config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") || config.pythonPipelineOptions.contains("--runner=DataflowRunner")

// Task for launching transform services
def envDir = project.project(":sdks:python").envdir
def pythonDir = project.project(":sdks:python").projectDir
def externalPort = getRandomPort()
def launcherJar = project.project(':sdks:java:transform-service:launcher').shadowJar.archivePath
def transformServiceOpts = [
"transform_service_launcher_jar": launcherJar,
"group_id": project.name,
"external_port": externalPort,
"beam_version": project.version
]
def serviceArgs = project.project(':sdks:python').mapToArgString(transformServiceOpts)
def pythonContainerSuffix = project.project(':sdks:python').pythonVersion.replace('.', '')
def javaContainerSuffix
if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
javaContainerSuffix = 'java8'
} else if (JavaVersion.current() == JavaVersion.VERSION_11) {
javaContainerSuffix = 'java11'
} else if (JavaVersion.current() == JavaVersion.VERSION_17) {
javaContainerSuffix = 'java17'
} else {
String exceptionMessage = "Your Java version is unsupported. You need Java version of 8 or 11 or 17 to get started, but your Java version is: " + JavaVersion.current();
throw new GradleException(exceptionMessage)
}

def setupTask = project.tasks.register(config.name+"Setup", Exec) {
dependsOn ':sdks:java:container:'+javaContainerSuffix+':docker'
dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker'
dependsOn ':sdks:java:transform-service:controller-container:docker'
dependsOn ':sdks:python:expansion-service-container:docker'
dependsOn ':sdks:java:expansion-service:container:docker'
dependsOn ":sdks:python:installGcpTest"
dependsOn project.project(':sdks:java:transform-service:launcher').shadowJar.getPath()

if (usesDataflowRunner) {
dependsOn ":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.', '')}:initializeForDataflowJob"
}

// setup test env
executable 'sh'
args '-c', "$pythonDir/scripts/run_transform_service.sh stop $serviceArgs && $pythonDir/scripts/run_transform_service.sh start $serviceArgs"
}

if (config.needsSdkLocation) {
setupTask.configure {dependsOn ':sdks:python:sdist'}
}

def pythonTask = project.tasks.register(config.name+"PythonUsingJava") {
group = "Verification"
description = "Runs Python SDK pipeline tests that use transform service"
dependsOn setupTask
dependsOn config.startJobServer
doLast {
def beamPythonTestPipelineOptions = [
"pipeline_opts": config.pythonPipelineOptions + (usesDataflowRunner ? [
"--sdk_location=${project.ext.sdkLocation}"]
: []),
"test_opts": config.pytestOptions,
"suite": config.name,
"collect": config.collectMarker,
]
def cmdArgs = project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions)

project.exec {
environment "EXPANSION_PORT", externalPort
executable 'sh'
args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}

def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) {
// teardown test env
executable 'sh'
args '-c', "$pythonDir/scripts/run_transform_service.sh stop $serviceArgs"
}
setupTask.configure {finalizedBy cleanupTask}
config.startJobServer.configure {finalizedBy config.cleanupJobServer}

cleanupTask.configure{mustRunAfter pythonTask}
config.cleanupJobServer.configure{mustRunAfter pythonTask}
}

/** ***********************************************************************************************/

project.ext.applyPythonNature = {

// Define common lifecycle tasks and artifact types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -155,15 +154,15 @@ public static synchronized TransformServiceLauncher forProject(
return launchers.get(projectName);
}

private void runDockerComposeCommand(List<String> command) throws IOException {
private void runDockerComposeCommand(String command) throws IOException {
this.runDockerComposeCommand(command, null);
}

private void runDockerComposeCommand(List<String> command, @Nullable File outputOverride)
private void runDockerComposeCommand(String command, @Nullable File outputOverride)
throws IOException {
List<String> shellCommand = new ArrayList<>();
shellCommand.addAll(dockerComposeStartCommandPrefix);
shellCommand.addAll(command);
shellCommand.add(command);
System.out.println("Executing command: " + String.join(" ", command));
ProcessBuilder processBuilder =
new ProcessBuilder(shellCommand).redirectError(ProcessBuilder.Redirect.INHERIT);
Expand All @@ -187,15 +186,15 @@ private void runDockerComposeCommand(List<String> command, @Nullable File output
}

public synchronized void start() throws IOException, TimeoutException {
runDockerComposeCommand(ImmutableList.of("up", "-d"));
runDockerComposeCommand("up");
}

public synchronized void shutdown() throws IOException {
runDockerComposeCommand(ImmutableList.of("down"));
runDockerComposeCommand("down");
}

public synchronized void status() throws IOException {
runDockerComposeCommand(ImmutableList.of("ps"));
runDockerComposeCommand("ps");
}

public synchronized void waitTillUp(int timeout) throws IOException, TimeoutException {
Expand Down Expand Up @@ -226,7 +225,7 @@ public synchronized void waitTillUp(int timeout) throws IOException, TimeoutExce

private synchronized String getStatus() throws IOException {
File outputOverride = File.createTempFile("output_override", null);
runDockerComposeCommand(ImmutableList.of("ps"), outputOverride);
runDockerComposeCommand("ps", outputOverride);

return outputOverride.getAbsolutePath();
}
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@


@pytest.mark.uses_gcp_java_expansion_service
@pytest.mark.uses_transform_service
@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")
Expand Down Expand Up @@ -143,7 +142,6 @@ def test_read_xlang(self):


@pytest.mark.uses_gcp_java_expansion_service
@pytest.mark.uses_transform_service
@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")
Expand Down
1 change: 0 additions & 1 deletion sdks/python/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ markers =
uses_java_expansion_service: collect Cross Language Java transforms test runs
uses_python_expansion_service: collect Cross Language Python transforms test runs
uses_io_expansion_service: collect Cross Language transform test runs (with Kafka bootstrap server)
uses_transform_service: collect Cross Language test runs that uses the Transform Service
xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs
it_postcommit: collect for post-commit integration test runs
it_postcommit_sickbay: collect for post-commit sickbay integration test run
Expand Down
85 changes: 0 additions & 85 deletions sdks/python/scripts/run_transform_service.sh

This file was deleted.

19 changes: 0 additions & 19 deletions sdks/python/test-suites/direct/xlang/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
}

def gcpProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing'

createCrossLanguageValidatesRunnerTask(
startJobServer: setupTask,
cleanupJobServer: cleanupTask,
Expand All @@ -69,20 +67,3 @@ createCrossLanguageValidatesRunnerTask(
"--endpoint localhost:${jobPort}",
],
)

createTransformServiceTask(
startJobServer: setupTask,
cleanupJobServer: cleanupTask,
numParallelTests: 1,
collectMarker: 'uses_transform_service',
pythonPipelineOptions: [
"--runner=TestDirectRunner",
"--project=${gcpProject}",
],
pytestOptions: [
"--capture=no", // print stdout instantly
"--timeout=4500", // timeout of whole command execution
"--color=yes", // console color
"--log-cli-level=INFO" //log level info
]
)