Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder


import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS


// This job runs end-to-end cross language GCP IO tests with DataflowRunner.
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.')


// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)


// Publish all test results to Jenkins
publishers {
archiveJunit('**/pytest*.xml')
}


// Gradle goals for this job.
steps {
CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS.each { pythonVersion ->
shell("echo \"Running cross language GCP IO tests with Python ${pythonVersion} on DataflowRunner.\"")
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(":sdks:python:test-suites:dataflow:py${pythonVersion.replace('.', '')}:gcpCrossLanguagePythonUsingJava")
commonJobProperties.setGradleSwitches(delegate)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder


import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS


// This job runs end-to-end cross language GCP IO tests with DirectRunner.
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Direct runner.')


// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)


// Publish all test results to Jenkins
publishers {
archiveJunit('**/pytest*.xml')
}


// Gradle goals for this job.
steps {
CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS.each { pythonVersion ->
shell("echo \"Running cross language GCP IO tests with Python ${pythonVersion} on DirectRunner.\"")
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(":sdks:python:test-suites:direct:py${pythonVersion.replace('.', '')}:gcpCrossLanguagePythonUsingJava")
commonJobProperties.setGradleSwitches(delegate)
}
}
}
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,48 @@ class BeamModulePlugin implements Plugin<Project> {
}
}

// A class defining the common properties in a given suite of cross-language tests
// Properties are shared across runners and are used when creating a CrossLanguageUsingJavaExpansionConfiguration object
static class CrossLanguageTaskCommon {
// Used as the task name for cross-language
String name
// The expansion service's project path (required)
String expansionProjectPath
// Collect Python pipeline tests with this marker
String collectMarker
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
}

// A class defining the configuration for CrossLanguageUsingJavaExpansion.
static class CrossLanguageUsingJavaExpansionConfiguration {
// Task name for cross-language tests using Java expansion.
String name = 'crossLanguageUsingJavaExpansion'
// Python pipeline options to use.
List<String> pythonPipelineOptions = [
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_cache_millis=10000",
"--experiments=beam_fn_api",
]
// Additional pytest options
List<String> pytestOptions = []
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
// Number of parallel test runs.
Integer numParallelTests = 1
// Whether the pipeline needs --sdk_location option
boolean needsSdkLocation = false
// Project path for the expansion service to start up
String expansionProjectPath
// Collect Python pipeline tests with this marker
String collectMarker
}

// A class defining the configuration for CrossLanguageValidatesRunner.
static class CrossLanguageValidatesRunnerConfiguration {
// Task name for cross-language validate runner case.
Expand Down Expand Up @@ -2375,6 +2417,108 @@ class BeamModulePlugin implements Plugin<Project> {
}
}

/** ***********************************************************************************************/
// Method to create the createCrossLanguageUsingJavaExpansionTask.
// The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
// This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
project.ext.createCrossLanguageUsingJavaExpansionTask = {
// This task won't work if the python build file doesn't exist.
if (!project.project(":sdks:python").buildFile.exists()) {
System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
return
}
def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()

project.evaluationDependsOn(":sdks:python")
project.evaluationDependsOn(config.expansionProjectPath)
project.evaluationDependsOn(":runners:core-construction-java")
project.evaluationDependsOn(":sdks:java:extensions:python")

// Setting up args to launch the expansion service
def envDir = project.project(":sdks:python").envdir
def pythonDir = project.project(":sdks:python").projectDir
def javaExpansionPort = -1 // will be populated in setupTask
def expansionJar = project.project(config.expansionProjectPath).shadowJar.archivePath
def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
def expansionServiceOpts = [
"group_id": project.name,
"java_expansion_service_jar": expansionJar,
"java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
]
def javaContainerSuffix
if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
javaContainerSuffix = 'java8'
} else if (JavaVersion.current() == JavaVersion.VERSION_11) {
javaContainerSuffix = 'java11'
} else if (JavaVersion.current() == JavaVersion.VERSION_17) {
javaContainerSuffix = 'java17'
} else {
String exceptionMessage = "Your Java version is unsupported. You need Java version of 8 or 11 or 17 to get started, but your Java version is: " + JavaVersion.current();
throw new GradleException(exceptionMessage)
}

// 1. Builds the chosen expansion service jar and launches it
def setupTask = project.tasks.register(config.name+"Setup") {
dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker'
dependsOn project.project(config.expansionProjectPath).shadowJar.getPath()
dependsOn ":sdks:python:installGcpTest"
doLast {
project.exec {
// Prepare a port to use for the expansion service
javaExpansionPort = getRandomPort()
expansionServiceOpts.put("java_port", javaExpansionPort)
// setup test env
def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
executable 'sh'
args '-c', "$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
}
}
}

// 2. Sets up, collects, and runs Python pipeline tests
def sdkLocationOpt = []
if (config.needsSdkLocation) {
setupTask.configure {dependsOn ':sdks:python:sdist'}
sdkLocationOpt = [
"--sdk_location=${pythonDir}/build/apache-beam.tar.gz"
]
}
def beamPythonTestPipelineOptions = [
"pipeline_opts": config.pythonPipelineOptions + sdkLocationOpt,
"test_opts": config.pytestOptions,
"suite": config.name,
"collect": config.collectMarker,
]
def cmdArgs = project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions)
def pythonTask = project.tasks.register(config.name+"PythonUsingJava") {
group = "Verification"
description = "Runs Python SDK pipeline tests that use a Java expansion service"
dependsOn setupTask
dependsOn config.startJobServer
doLast {
project.exec {
environment "EXPANSION_JAR", expansionJar
environment "EXPANSION_PORT", javaExpansionPort
executable 'sh'
args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}

// 3. Shuts down the expansion service
def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) {
// teardown test env
executable 'sh'
args '-c', "$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}"
}

setupTask.configure {finalizedBy cleanupTask}
config.startJobServer.configure {finalizedBy config.cleanupJobServer}

cleanupTask.configure{mustRunAfter pythonTask}
config.cleanupJobServer.configure{mustRunAfter pythonTask}
}

/** ***********************************************************************************************/

// Method to create the crossLanguageValidatesRunnerTask.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,16 @@ public static Schema of(Field... fields) {
/** Returns an identical Schema with sorted fields. */
public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, encodingPositions, options}
// {fields, uuid, options}
// Note: encoding positions are not copied over because generally they should align with the
// ordering of field indices. Otherwise, problems may occur when encoding/decoding Rows of
// this schema.
Schema sortedSchema =
this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());
sortedSchema.setEncodingPositions(getEncodingPositions());

return sortedSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ public void testSorted() {
.addStringField("d")
.build()
.withOptions(testOptions);
sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions());

assertEquals(true, unorderedSchema.equivalent(unorderedSchemaAfterSorting));
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public class ExpansionServiceSchemaTransformProviderTest {

private static final Schema TEST_SCHEMATRANSFORM_CONFIG_SCHEMA =
Schema.of(
Field.of("str1", FieldType.STRING),
Field.of("str2", FieldType.STRING),
Field.of("int1", FieldType.INT32),
Field.of("int2", FieldType.INT32));
Field.of("int2", FieldType.INT32),
Field.of("str1", FieldType.STRING),
Field.of("str2", FieldType.STRING));

private ExpansionService expansionService = new ExpansionService();

Expand Down Expand Up @@ -381,10 +381,10 @@ public void testSchemaTransformExpansion() {
.values());
Row configRow =
Row.withSchema(TEST_SCHEMATRANSFORM_CONFIG_SCHEMA)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.withFieldValue("int1", 111)
.withFieldValue("int2", 222)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.build();

ByteStringOutputStream outputStream = new ByteStringOutputStream();
Expand Down Expand Up @@ -440,10 +440,10 @@ public void testSchemaTransformExpansionMultiInputMultiOutput() {

Row configRow =
Row.withSchema(TEST_SCHEMATRANSFORM_CONFIG_SCHEMA)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.withFieldValue("int1", 111)
.withFieldValue("int2", 222)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.build();

ByteStringOutputStream outputStream = new ByteStringOutputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class BeamRowToStorageApiProto {
.put(
SqlTypes.DATETIME.getIdentifier(),
(logicalType, value) ->
CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value))
.put(
SqlTypes.TIMESTAMP.getIdentifier(),
(logicalType, value) -> (ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public static TableRow convertGenericRecordToTableRow(
return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
}

/** Convert a BigQuery TableRow to a Beam Row. */
/** Convert a Beam Row to a BigQuery TableRow. */
public static TableRow toTableRow(Row row) {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
Expand Down Expand Up @@ -686,7 +686,14 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
if (JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
try {
// Handle if datetime value is in micros ie. 123456789
Long value = Long.parseLong(jsonBQString);
return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(value);
} catch (NumberFormatException e) {
// Handle as a String, ie. "2023-02-16 12:00:00"
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
}
} else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.TIME.getIdentifier())) {
Expand Down
Loading