Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2419,11 +2419,10 @@ class BeamModulePlugin implements Plugin<Project> {
config.cleanupJobServer.configure{mustRunAfter pythonSqlTask}

// Task for running Java testcases in Go SDK.
def scriptOptions = [
"--test_expansion_addr localhost:${javaPort}",
def pipelineOpts = [
"--expansion_addr=test:localhost:${javaPort}",
]
scriptOptions.addAll(config.goScriptOptions)
def goTask = project.project(":sdks:go:test:").goIoValidatesRunnerTask(project, config.name+"GoUsingJava", scriptOptions)
def goTask = project.project(":sdks:go:test:").goIoValidatesRunnerTask(project, config.name+"GoUsingJava", config.goScriptOptions, pipelineOpts)
goTask.configure {
description = "Validates runner for cross-language capability of using Java transforms from Go SDK"
dependsOn setupTask
Expand Down
57 changes: 38 additions & 19 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ task dataflowValidatesRunner() {
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"

doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner dataflow",
"--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--runner dataflow",
"--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
Expand All @@ -56,10 +59,13 @@ task flinkValidatesRunner {
dependsOn ":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner flink",
"--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--runner flink",
"--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
Expand All @@ -78,10 +84,13 @@ task samzaValidatesRunner {
dependsOn ":runners:samza:job-server:shadowJar"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner samza",
"--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--runner samza",
"--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
Expand All @@ -101,10 +110,13 @@ task sparkValidatesRunner {
dependsOn ":runners:spark:2:job-server:shadowJar"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner spark",
"--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--runner spark",
"--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
Expand All @@ -131,9 +143,12 @@ tasks.register("ulrValidatesRunner") {
dependsOn ":sdks:python:buildPython"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner portable",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--runner portable",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
Expand All @@ -149,7 +164,7 @@ tasks.register("ulrValidatesRunner") {

// A method for configuring a cross-language validates runner test task,
// intended to be used in calls to createCrossLanguageValidatesRunnerTask.
ext.goIoValidatesRunnerTask = { proj, name, scriptOpts ->
ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
return proj.tasks.register(name) {
group = "Verification"

Expand All @@ -165,11 +180,15 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts ->
def schemaIoExpJar = project(":sdks:java:extensions:schemaio-expansion-service").shadowJar.archivePath
def debeziumIoExpJar = project(":sdks:java:io:debezium:expansion-service").shadowJar.archivePath
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--kafka_jar=${kafkaJar}",
"--expansion_jar=io:${ioExpJar}",
"--expansion_jar=schemaio:${schemaIoExpJar}",
"--expansion_jar=debeziumio:${debeziumIoExpJar}",
]
pipelineOptions.addAll(pipelineOpts)
def options = [
"--io_expansion_jar ${ioExpJar}",
"--schemaio_expansion_jar ${schemaIoExpJar}",
"--debeziumio_expansion_jar ${debeziumIoExpJar}",
"--pipeline_opts \"--kafka_jar=${kafkaJar}\"",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
options.addAll(scriptOpts)
logger.info("Running the command: sh -c ./run_validatesrunner_tests.sh ${options.join(' ')}")
Expand Down
7 changes: 3 additions & 4 deletions sdks/go/test/integration/expansions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@ import (
// recommended to only use ExpansionServices in TestMain to avoid this.
//
// Example:
// var retCode int
// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers.
// flag.Parse()
// beam.Init()
// services := integration.NewExpansionServices()
// defer func() { services.Shutdown() }()
// addr, err := services.GetAddr("example")
// if err != nil {
// retCode = 1
// panic(err)
// }
// expansionAddr = addr // Save address to a package-level variable used by tests.
// retCode = ptest.MainRet(m)
// ptest.MainRet(m)
type ExpansionServices struct {
addrs map[string]string
jars map[string]string
Expand Down
16 changes: 0 additions & 16 deletions sdks/go/test/integration/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ import (
// Because any flags used with those commands are used for each package, every
// integration test package must import these flags, even if they are not used.
var (
// TestExpansionAddr is the endpoint for the expansion service for test-only
// cross-language transforms.
TestExpansionAddr = flag.String("test_expansion_addr", "", "Address of Expansion Service for test cross-language transforms.")

// IoExpansionAddr is the endpoint for the expansion service for
// cross-language IO transforms.
IoExpansionAddr = flag.String("io_expansion_addr", "", "Address of Expansion Service for cross-language IOs.")

// SchemaIoExpansionAddr is the endpoint for the expansion service for
// cross-language SchemaIO-based transforms.
SchemaIoExpansionAddr = flag.String("schemaio_expansion_addr", "", "Address of Expansion Service for cross-language SchemaIO-based IOs.")

// DebeziumIoExpansionAddr is the endpoint for the expansion service for
// cross-language DebeziumIO transforms.
DebeziumIoExpansionAddr = flag.String("debeziumio_expansion_addr", "", "Address of Expansion Service for cross-language Debezium IOs.")

// BootstrapServers is the address of the bootstrap servers for a Kafka
// cluster, used for Kafka IO tests.
BootstrapServers = flag.String("bootstrap_servers", "",
Expand Down
23 changes: 20 additions & 3 deletions sdks/go/test/integration/io/xlang/debezium/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package debezium

import (
"context"
"flag"
"log"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/debeziumio"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
Expand All @@ -31,8 +34,10 @@ import (
"github.com/testcontainers/testcontainers-go"
)

var expansionAddr string // Populate with expansion address labelled "debeziumio".

func checkFlags(t *testing.T) {
if *integration.DebeziumIoExpansionAddr == "" {
if expansionAddr == "" {
t.Skip("No DebeziumIo expansion address provided.")
}
}
Expand Down Expand Up @@ -84,10 +89,22 @@ func TestDebeziumIO_BasicRead(t *testing.T) {
"database.include.list=inventory",
"include.schema.changes=false",
}
read := ReadPipeline(*integration.DebeziumIoExpansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties)
read := ReadPipeline(expansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties)
ptest.RunAndValidate(t, read)
}

func TestMain(m *testing.M) {
ptest.Main(m)
flag.Parse()
beam.Init()

services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("debeziumio")
if err != nil {
log.Printf("skipping missing expansion service: %v", err)
} else {
expansionAddr = addr
}

ptest.MainRet(m)
}
29 changes: 23 additions & 6 deletions sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package jdbc
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
Expand All @@ -34,8 +37,10 @@ import (
"github.com/testcontainers/testcontainers-go/wait"
)

var expansionAddr string // Populate with expansion address labelled "schemaio".

func checkFlags(t *testing.T) {
if *integration.SchemaIoExpansionAddr == "" {
if expansionAddr == "" {
t.Skip("No Schema IO expansion address provided.")
}
}
Expand Down Expand Up @@ -101,10 +106,10 @@ func TestJDBCIO_BasicReadWrite(t *testing.T) {
host := "localhost"
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname)

write := WritePipeline(*integration.SchemaIoExpansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password)
write := WritePipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password)
ptest.RunAndValidate(t, write)

read := ReadPipeline(*integration.SchemaIoExpansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password)
read := ReadPipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password)
ptest.RunAndValidate(t, read)
}

Expand All @@ -121,13 +126,25 @@ func TestJDBCIO_PostgresReadWrite(t *testing.T) {
host := "localhost"
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname)

write := WriteToPostgres(*integration.SchemaIoExpansionAddr, tableName, jdbcUrl, username, password)
write := WriteToPostgres(expansionAddr, tableName, jdbcUrl, username, password)
ptest.RunAndValidate(t, write)

read := ReadFromPostgres(*integration.SchemaIoExpansionAddr, tableName, jdbcUrl, username, password)
read := ReadFromPostgres(expansionAddr, tableName, jdbcUrl, username, password)
ptest.RunAndValidate(t, read)
}

func TestMain(m *testing.M) {
ptest.Main(m)
flag.Parse()
beam.Init()

services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("schemaio")
if err != nil {
log.Printf("skipping missing expansion service: %v", err)
} else {
expansionAddr = addr
}

ptest.MainRet(m)
}
26 changes: 17 additions & 9 deletions sdks/go/test/integration/io/xlang/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ package kafka

import (
"flag"
"fmt"
"log"
"os"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
Expand All @@ -32,6 +33,7 @@ import (
// bootstrapAddr should be set by TestMain once a Kafka cluster has been
// started, and is used by each test.
var bootstrapAddr string
var expansionAddr string // Populate with expansion address labelled "io".

const (
basicTopic = "xlang_kafkaio_basic_test"
Expand All @@ -56,32 +58,38 @@ func TestKafkaIO_BasicReadWrite(t *testing.T) {
}
topic := appendUuid(basicTopic)

write := WritePipeline(*integration.IoExpansionAddr, bootstrapAddr, topic, inputs)
write := WritePipeline(expansionAddr, bootstrapAddr, topic, inputs)
ptest.RunAndValidate(t, write)
read := ReadPipeline(*integration.IoExpansionAddr, bootstrapAddr, topic, inputs)
read := ReadPipeline(expansionAddr, bootstrapAddr, topic, inputs)
ptest.RunAndValidate(t, read)
}

// TestMain starts up a Kafka cluster from integration.KafkaJar before running
// tests through ptest.Main.
func TestMain(m *testing.M) {
// Defer os.Exit so it happens after other defers.
var retCode int
defer func() { os.Exit(retCode) }()

flag.Parse()
beam.Init()

// Start local Kafka cluster and defer its shutdown.
if *integration.BootstrapServers != "" {
bootstrapAddr = *integration.BootstrapServers
} else if *integration.KafkaJar != "" {
cluster, err := runLocalKafka(*integration.KafkaJar, *integration.KafkaJarTimeout)
if err != nil {
log.Fatalf("Kafka cluster failed to start: %v", err)
panic(fmt.Errorf("kafka cluster failed to start: %v", err))
}
defer func() { cluster.Shutdown() }()
bootstrapAddr = cluster.bootstrapAddr
}

retCode = ptest.MainRet(m)
services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("io")
if err != nil {
log.Printf("skipping missing expansion service: %v", err)
} else {
expansionAddr = addr
}

ptest.MainRet(m)
}
Loading