diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index eb48d4840d65..9611e7f3a9e1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2419,11 +2419,10 @@ class BeamModulePlugin implements Plugin { config.cleanupJobServer.configure{mustRunAfter pythonSqlTask} // Task for running Java testcases in Go SDK. - def scriptOptions = [ - "--test_expansion_addr localhost:${javaPort}", + def pipelineOpts = [ + "--expansion_addr=test:localhost:${javaPort}", ] - scriptOptions.addAll(config.goScriptOptions) - def goTask = project.project(":sdks:go:test:").goIoValidatesRunnerTask(project, config.name+"GoUsingJava", scriptOptions) + def goTask = project.project(":sdks:go:test:").goIoValidatesRunnerTask(project, config.name+"GoUsingJava", config.goScriptOptions, pipelineOpts) goTask.configure { description = "Validates runner for cross-language capability of using Java transforms from Go SDK" dependsOn setupTask diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index c2e7ee8897af..28e8c5482853 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -32,10 +32,13 @@ task dataflowValidatesRunner() { dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner dataflow", - "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner dataflow", + "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -56,10 +59,13 @@ task flinkValidatesRunner { dependsOn ":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner flink", - "--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner flink", + "--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -78,10 +84,13 @@ task samzaValidatesRunner { dependsOn ":runners:samza:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner samza", - "--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner samza", + "--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -101,10 +110,13 @@ task sparkValidatesRunner { dependsOn ":runners:spark:2:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner spark", - "--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner spark", + "--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -131,9 +143,12 @@ tasks.register("ulrValidatesRunner") { dependsOn ":sdks:python:buildPython" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner portable", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner portable", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -149,7 +164,7 @@ tasks.register("ulrValidatesRunner") { // A method for configuring a cross-language validates runner test task, // intended to be used in calls to createCrossLanguageValidatesRunnerTask. -ext.goIoValidatesRunnerTask = { proj, name, scriptOpts -> +ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts -> return proj.tasks.register(name) { group = "Verification" @@ -165,11 +180,15 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts -> def schemaIoExpJar = project(":sdks:java:extensions:schemaio-expansion-service").shadowJar.archivePath def debeziumIoExpJar = project(":sdks:java:io:debezium:expansion-service").shadowJar.archivePath def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--kafka_jar=${kafkaJar}", + "--expansion_jar=io:${ioExpJar}", + "--expansion_jar=schemaio:${schemaIoExpJar}", + "--expansion_jar=debeziumio:${debeziumIoExpJar}", + ] + pipelineOptions.addAll(pipelineOpts) def options = [ - "--io_expansion_jar ${ioExpJar}", - "--schemaio_expansion_jar ${schemaIoExpJar}", - "--debeziumio_expansion_jar ${debeziumIoExpJar}", - "--pipeline_opts \"--kafka_jar=${kafkaJar}\"", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] options.addAll(scriptOpts) logger.info("Running the command: sh -c ./run_validatesrunner_tests.sh ${options.join(' ')}") diff --git a/sdks/go/test/integration/expansions.go b/sdks/go/test/integration/expansions.go index 2c9c54a62658..660dbfc5034f 100644 --- a/sdks/go/test/integration/expansions.go +++ b/sdks/go/test/integration/expansions.go @@ -39,17 +39,16 @@ import ( // recommended to only use ExpansionServices in TestMain to avoid this. // // Example: -// var retCode int -// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers. +// flag.Parse() +// beam.Init() // services := integration.NewExpansionServices() // defer func() { services.Shutdown() }() // addr, err := services.GetAddr("example") // if err != nil { -// retCode = 1 // panic(err) // } // expansionAddr = addr // Save address to a package-level variable used by tests. -// retCode = ptest.MainRet(m) +// ptest.MainRet(m) type ExpansionServices struct { addrs map[string]string jars map[string]string diff --git a/sdks/go/test/integration/flags.go b/sdks/go/test/integration/flags.go index 6275578900ee..72954f2be066 100644 --- a/sdks/go/test/integration/flags.go +++ b/sdks/go/test/integration/flags.go @@ -26,22 +26,6 @@ import ( // Because any flags used with those commands are used for each package, every // integration test package must import these flags, even if they are not used. var ( - // TestExpansionAddr is the endpoint for the expansion service for test-only - // cross-language transforms. - TestExpansionAddr = flag.String("test_expansion_addr", "", "Address of Expansion Service for test cross-language transforms.") - - // IoExpansionAddr is the endpoint for the expansion service for - // cross-language IO transforms. - IoExpansionAddr = flag.String("io_expansion_addr", "", "Address of Expansion Service for cross-language IOs.") - - // SchemaIoExpansionAddr is the endpoint for the expansion service for - // cross-language SchemaIO-based transforms. - SchemaIoExpansionAddr = flag.String("schemaio_expansion_addr", "", "Address of Expansion Service for cross-language SchemaIO-based IOs.") - - // DebeziumIoExpansionAddr is the endpoint for the expansion service for - // cross-language DebeziumIO transforms. - DebeziumIoExpansionAddr = flag.String("debeziumio_expansion_addr", "", "Address of Expansion Service for cross-language Debezium IOs.") - // BootstrapServers is the address of the bootstrap servers for a Kafka // cluster, used for Kafka IO tests. BootstrapServers = flag.String("bootstrap_servers", "", diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go index 1f519e54d9f5..d347e4436f2e 100644 --- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go +++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go @@ -17,8 +17,11 @@ package debezium import ( "context" + "flag" + "log" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/debeziumio" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" @@ -31,8 +34,10 @@ import ( "github.com/testcontainers/testcontainers-go" ) +var expansionAddr string // Populate with expansion address labelled "debeziumio". + func checkFlags(t *testing.T) { - if *integration.DebeziumIoExpansionAddr == "" { + if expansionAddr == "" { t.Skip("No DebeziumIo expansion address provided.") } } @@ -84,10 +89,22 @@ func TestDebeziumIO_BasicRead(t *testing.T) { "database.include.list=inventory", "include.schema.changes=false", } - read := ReadPipeline(*integration.DebeziumIoExpansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties) + read := ReadPipeline(expansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties) ptest.RunAndValidate(t, read) } func TestMain(m *testing.M) { - ptest.Main(m) + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("debeziumio") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + ptest.MainRet(m) } diff --git a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go index 83f5c16d97a1..286e706c3201 100644 --- a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go +++ b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go @@ -17,10 +17,13 @@ package jdbc import ( "context" "database/sql" + "flag" "fmt" + "log" "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" @@ -34,8 +37,10 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) +var expansionAddr string // Populate with expansion address labelled "schemaio". + func checkFlags(t *testing.T) { - if *integration.SchemaIoExpansionAddr == "" { + if expansionAddr == "" { t.Skip("No Schema IO expansion address provided.") } } @@ -101,10 +106,10 @@ func TestJDBCIO_BasicReadWrite(t *testing.T) { host := "localhost" jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname) - write := WritePipeline(*integration.SchemaIoExpansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) + write := WritePipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) ptest.RunAndValidate(t, write) - read := ReadPipeline(*integration.SchemaIoExpansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) + read := ReadPipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) ptest.RunAndValidate(t, read) } @@ -121,13 +126,25 @@ func TestJDBCIO_PostgresReadWrite(t *testing.T) { host := "localhost" jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname) - write := WriteToPostgres(*integration.SchemaIoExpansionAddr, tableName, jdbcUrl, username, password) + write := WriteToPostgres(expansionAddr, tableName, jdbcUrl, username, password) ptest.RunAndValidate(t, write) - read := ReadFromPostgres(*integration.SchemaIoExpansionAddr, tableName, jdbcUrl, username, password) + read := ReadFromPostgres(expansionAddr, tableName, jdbcUrl, username, password) ptest.RunAndValidate(t, read) } func TestMain(m *testing.M) { - ptest.Main(m) + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("schemaio") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + ptest.MainRet(m) } diff --git a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go index 51af59bfe658..e1cdc2e935db 100644 --- a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go +++ b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go @@ -17,10 +17,11 @@ package kafka import ( "flag" + "fmt" "log" - "os" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" @@ -32,6 +33,7 @@ import ( // bootstrapAddr should be set by TestMain once a Kafka cluster has been // started, and is used by each test. var bootstrapAddr string +var expansionAddr string // Populate with expansion address labelled "io". const ( basicTopic = "xlang_kafkaio_basic_test" @@ -56,20 +58,17 @@ func TestKafkaIO_BasicReadWrite(t *testing.T) { } topic := appendUuid(basicTopic) - write := WritePipeline(*integration.IoExpansionAddr, bootstrapAddr, topic, inputs) + write := WritePipeline(expansionAddr, bootstrapAddr, topic, inputs) ptest.RunAndValidate(t, write) - read := ReadPipeline(*integration.IoExpansionAddr, bootstrapAddr, topic, inputs) + read := ReadPipeline(expansionAddr, bootstrapAddr, topic, inputs) ptest.RunAndValidate(t, read) } // TestMain starts up a Kafka cluster from integration.KafkaJar before running // tests through ptest.Main. func TestMain(m *testing.M) { - // Defer os.Exit so it happens after other defers. - var retCode int - defer func() { os.Exit(retCode) }() - flag.Parse() + beam.Init() // Start local Kafka cluster and defer its shutdown. if *integration.BootstrapServers != "" { @@ -77,11 +76,20 @@ func TestMain(m *testing.M) { } else if *integration.KafkaJar != "" { cluster, err := runLocalKafka(*integration.KafkaJar, *integration.KafkaJarTimeout) if err != nil { - log.Fatalf("Kafka cluster failed to start: %v", err) + panic(fmt.Errorf("kafka cluster failed to start: %v", err)) } defer func() { cluster.Shutdown() }() bootstrapAddr = cluster.bootstrapAddr } - retCode = ptest.MainRet(m) + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("io") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + ptest.MainRet(m) } diff --git a/sdks/go/test/integration/xlang/xlang_test.go b/sdks/go/test/integration/xlang/xlang_test.go index 560bc5c17beb..f1473f199057 100644 --- a/sdks/go/test/integration/xlang/xlang_test.go +++ b/sdks/go/test/integration/xlang/xlang_test.go @@ -16,7 +16,9 @@ package xlang import ( + "flag" "fmt" + "log" "reflect" "sort" "testing" @@ -32,6 +34,8 @@ import ( "github.com/apache/beam/sdks/v2/go/test/integration" ) +var expansionAddr string // Populate with expansion address labelled "test". + func init() { beam.RegisterType(reflect.TypeOf((*IntString)(nil)).Elem()) beam.RegisterType(reflect.TypeOf((*StringInt)(nil)).Elem()) @@ -46,8 +50,8 @@ func init() { } func checkFlags(t *testing.T) { - if *integration.TestExpansionAddr == "" { - t.Skip("No expansion address provided.") + if expansionAddr == "" { + t.Skip("No Test expansion address provided.") } } @@ -121,7 +125,7 @@ func TestXLang_Prefix(t *testing.T) { // Using the cross-language transform strings := beam.Create(s, "a", "b", "c") - prefixed := xlang.Prefix(s, "prefix_", *integration.TestExpansionAddr, strings) + prefixed := xlang.Prefix(s, "prefix_", expansionAddr, strings) passert.Equals(s, prefixed, "prefix_a", "prefix_b", "prefix_c") ptest.RunAndValidate(t, p) @@ -137,7 +141,7 @@ func TestXLang_CoGroupBy(t *testing.T) { // Using the cross-language transform col1 := beam.ParDo(s, getIntString, beam.Create(s, IntString{X: 0, Y: "1"}, IntString{X: 0, Y: "2"}, IntString{X: 1, Y: "3"})) col2 := beam.ParDo(s, getIntString, beam.Create(s, IntString{X: 0, Y: "4"}, IntString{X: 1, Y: "5"}, IntString{X: 1, Y: "6"})) - c := xlang.CoGroupByKey(s, *integration.TestExpansionAddr, col1, col2) + c := xlang.CoGroupByKey(s, expansionAddr, col1, col2) sums := beam.ParDo(s, sumCounts, c) formatted := beam.ParDo(s, formatIntStringsFn, sums) passert.Equals(s, formatted, "0:[1 2 4]", "1:[3 5 6]") @@ -155,7 +159,7 @@ func TestXLang_Combine(t *testing.T) { // Using the cross-language transform kvs := beam.Create(s, StringInt{X: "a", Y: 1}, StringInt{X: "a", Y: 2}, StringInt{X: "b", Y: 3}) ins := beam.ParDo(s, getStringInt, kvs) - c := xlang.CombinePerKey(s, *integration.TestExpansionAddr, ins) + c := xlang.CombinePerKey(s, expansionAddr, ins) formatted := beam.ParDo(s, formatStringIntFn, c) passert.Equals(s, formatted, "a:3", "b:3") @@ -173,7 +177,7 @@ func TestXLang_CombineGlobally(t *testing.T) { in := beam.CreateList(s, []int64{1, 2, 3}) // Using the cross-language transform - c := xlang.CombineGlobally(s, *integration.TestExpansionAddr, in) + c := xlang.CombineGlobally(s, expansionAddr, in) formatted := beam.ParDo(s, formatIntFn, c) passert.Equals(s, formatted, "6") @@ -192,7 +196,7 @@ func TestXLang_Flatten(t *testing.T) { col2 := beam.CreateList(s, []int64{4, 5, 6}) // Using the cross-language transform - c := xlang.Flatten(s, *integration.TestExpansionAddr, col1, col2) + c := xlang.Flatten(s, expansionAddr, col1, col2) formatted := beam.ParDo(s, formatIntFn, c) passert.Equals(s, formatted, "1", "2", "3", "4", "5", "6") @@ -210,7 +214,7 @@ func TestXLang_GroupBy(t *testing.T) { // Using the cross-language transform kvs := beam.Create(s, StringInt{X: "0", Y: 1}, StringInt{X: "0", Y: 2}, StringInt{X: "1", Y: 3}) in := beam.ParDo(s, getStringInt, kvs) - out := xlang.GroupByKey(s, *integration.TestExpansionAddr, in) + out := xlang.GroupByKey(s, expansionAddr, in) vals := beam.ParDo(s, collectValues, out) formatted := beam.ParDo(s, formatStringIntsFn, vals) @@ -231,7 +235,7 @@ func TestXLang_Multi(t *testing.T) { side := beam.CreateList(s, []string{"s"}) // Using the cross-language transform - mainOut, sideOut := xlang.Multi(s, *integration.TestExpansionAddr, main1, main2, side) + mainOut, sideOut := xlang.Multi(s, expansionAddr, main1, main2, side) passert.Equals(s, mainOut, "as", "bbs", "xs", "yys", "zzzs") passert.Equals(s, sideOut, "ss") @@ -249,7 +253,7 @@ func TestXLang_Partition(t *testing.T) { col := beam.CreateList(s, []int64{1, 2, 3, 4, 5, 6}) // Using the cross-language transform - out0, out1 := xlang.Partition(s, *integration.TestExpansionAddr, col) + out0, out1 := xlang.Partition(s, expansionAddr, col) formatted0 := beam.ParDo(s, formatIntFn, out0) formatted1 := beam.ParDo(s, formatIntFn, out1) @@ -260,5 +264,17 @@ func TestXLang_Partition(t *testing.T) { } func TestMain(m *testing.M) { - ptest.Main(m) + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("test") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + ptest.MainRet(m) }