From 1f1e4d5e6a890f051127c9fb360bf915a26c051d Mon Sep 17 00:00:00 2001 From: Daniel Oliveira Date: Tue, 22 Mar 2022 19:11:31 -0700 Subject: [PATCH 1/2] [BEAM-13857] Switched Go IT script to using Go flags for expansion services This is a followup to a previous PR (https://github.com/apache/beam/pull/16908) which added the feature but didn't switch existing tests to using it. This switches existing tests to using the feature (along with making sure it's set up properly in a way that works). --- .../beam/gradle/BeamModulePlugin.groovy | 7 +-- sdks/go/test/build.gradle | 57 ++++++++++++------- sdks/go/test/integration/expansions.go | 7 +-- sdks/go/test/integration/flags.go | 16 ------ .../io/xlang/debezium/debezium_test.go | 21 ++++++- .../integration/io/xlang/jdbc/jdbc_test.go | 27 +++++++-- .../integration/io/xlang/kafka/kafka_test.go | 26 +++++---- sdks/go/test/integration/xlang/xlang_test.go | 36 ++++++++---- 8 files changed, 124 insertions(+), 73 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index eb48d4840d65..9611e7f3a9e1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2419,11 +2419,10 @@ class BeamModulePlugin implements Plugin { config.cleanupJobServer.configure{mustRunAfter pythonSqlTask} // Task for running Java testcases in Go SDK. - def scriptOptions = [ - "--test_expansion_addr localhost:${javaPort}", + def pipelineOpts = [ + "--expansion_addr=test:localhost:${javaPort}", ] - scriptOptions.addAll(config.goScriptOptions) - def goTask = project.project(":sdks:go:test:").goIoValidatesRunnerTask(project, config.name+"GoUsingJava", scriptOptions) + def goTask = project.project(":sdks:go:test:").goIoValidatesRunnerTask(project, config.name+"GoUsingJava", config.goScriptOptions, pipelineOpts) goTask.configure { description = "Validates runner for cross-language capability of using Java transforms from Go SDK" dependsOn setupTask diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index c2e7ee8897af..28e8c5482853 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -32,10 +32,13 @@ task dataflowValidatesRunner() { dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner dataflow", - "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner dataflow", + "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -56,10 +59,13 @@ task flinkValidatesRunner { dependsOn ":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner flink", - "--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner flink", + "--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -78,10 +84,13 @@ task samzaValidatesRunner { dependsOn ":runners:samza:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner samza", - "--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner samza", + "--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -101,10 +110,13 @@ task sparkValidatesRunner { dependsOn ":runners:spark:2:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner spark", - "--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner spark", + "--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -131,9 +143,12 @@ tasks.register("ulrValidatesRunner") { dependsOn ":sdks:python:buildPython" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] def options = [ - "--runner portable", - "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--runner portable", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { executable "sh" @@ -149,7 +164,7 @@ tasks.register("ulrValidatesRunner") { // A method for configuring a cross-language validates runner test task, // intended to be used in calls to createCrossLanguageValidatesRunnerTask. -ext.goIoValidatesRunnerTask = { proj, name, scriptOpts -> +ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts -> return proj.tasks.register(name) { group = "Verification" @@ -165,11 +180,15 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts -> def schemaIoExpJar = project(":sdks:java:extensions:schemaio-expansion-service").shadowJar.archivePath def debeziumIoExpJar = project(":sdks:java:io:debezium:expansion-service").shadowJar.archivePath def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--kafka_jar=${kafkaJar}", + "--expansion_jar=io:${ioExpJar}", + "--expansion_jar=schemaio:${schemaIoExpJar}", + "--expansion_jar=debeziumio:${debeziumIoExpJar}", + ] + pipelineOptions.addAll(pipelineOpts) def options = [ - "--io_expansion_jar ${ioExpJar}", - "--schemaio_expansion_jar ${schemaIoExpJar}", - "--debeziumio_expansion_jar ${debeziumIoExpJar}", - "--pipeline_opts \"--kafka_jar=${kafkaJar}\"", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] options.addAll(scriptOpts) logger.info("Running the command: sh -c ./run_validatesrunner_tests.sh ${options.join(' ')}") diff --git a/sdks/go/test/integration/expansions.go b/sdks/go/test/integration/expansions.go index 2c9c54a62658..660dbfc5034f 100644 --- a/sdks/go/test/integration/expansions.go +++ b/sdks/go/test/integration/expansions.go @@ -39,17 +39,16 @@ import ( // recommended to only use ExpansionServices in TestMain to avoid this. // // Example: -// var retCode int -// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers. +// flag.Parse() +// beam.Init() // services := integration.NewExpansionServices() // defer func() { services.Shutdown() }() // addr, err := services.GetAddr("example") // if err != nil { -// retCode = 1 // panic(err) // } // expansionAddr = addr // Save address to a package-level variable used by tests. -// retCode = ptest.MainRet(m) +// ptest.MainRet(m) type ExpansionServices struct { addrs map[string]string jars map[string]string diff --git a/sdks/go/test/integration/flags.go b/sdks/go/test/integration/flags.go index 6275578900ee..72954f2be066 100644 --- a/sdks/go/test/integration/flags.go +++ b/sdks/go/test/integration/flags.go @@ -26,22 +26,6 @@ import ( // Because any flags used with those commands are used for each package, every // integration test package must import these flags, even if they are not used. var ( - // TestExpansionAddr is the endpoint for the expansion service for test-only - // cross-language transforms. - TestExpansionAddr = flag.String("test_expansion_addr", "", "Address of Expansion Service for test cross-language transforms.") - - // IoExpansionAddr is the endpoint for the expansion service for - // cross-language IO transforms. - IoExpansionAddr = flag.String("io_expansion_addr", "", "Address of Expansion Service for cross-language IOs.") - - // SchemaIoExpansionAddr is the endpoint for the expansion service for - // cross-language SchemaIO-based transforms. - SchemaIoExpansionAddr = flag.String("schemaio_expansion_addr", "", "Address of Expansion Service for cross-language SchemaIO-based IOs.") - - // DebeziumIoExpansionAddr is the endpoint for the expansion service for - // cross-language DebeziumIO transforms. - DebeziumIoExpansionAddr = flag.String("debeziumio_expansion_addr", "", "Address of Expansion Service for cross-language Debezium IOs.") - // BootstrapServers is the address of the bootstrap servers for a Kafka // cluster, used for Kafka IO tests. BootstrapServers = flag.String("bootstrap_servers", "", diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go index 1f519e54d9f5..8083d4f3d237 100644 --- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go +++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go @@ -17,8 +17,10 @@ package debezium import ( "context" + "flag" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/debeziumio" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" @@ -31,8 +33,10 @@ import ( "github.com/testcontainers/testcontainers-go" ) +var expansionAddr string // Populate with expansion address labelled "debeziumio". + func checkFlags(t *testing.T) { - if *integration.DebeziumIoExpansionAddr == "" { + if expansionAddr == "" { t.Skip("No DebeziumIo expansion address provided.") } } @@ -84,10 +88,21 @@ func TestDebeziumIO_BasicRead(t *testing.T) { "database.include.list=inventory", "include.schema.changes=false", } - read := ReadPipeline(*integration.DebeziumIoExpansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties) + read := ReadPipeline(expansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties) ptest.RunAndValidate(t, read) } func TestMain(m *testing.M) { - ptest.Main(m) + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("debeziumio") + if err != nil { + panic(err) + } + expansionAddr = addr + + ptest.MainRet(m) } diff --git a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go index 83f5c16d97a1..acd8394e6403 100644 --- a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go +++ b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go @@ -17,10 +17,12 @@ package jdbc import ( "context" "database/sql" + "flag" "fmt" "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" @@ -34,8 +36,10 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) +var expansionAddr string // Populate with expansion address labelled "schemaio". + func checkFlags(t *testing.T) { - if *integration.SchemaIoExpansionAddr == "" { + if expansionAddr == "" { t.Skip("No Schema IO expansion address provided.") } } @@ -101,10 +105,10 @@ func TestJDBCIO_BasicReadWrite(t *testing.T) { host := "localhost" jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname) - write := WritePipeline(*integration.SchemaIoExpansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) + write := WritePipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) ptest.RunAndValidate(t, write) - read := ReadPipeline(*integration.SchemaIoExpansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) + read := ReadPipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) ptest.RunAndValidate(t, read) } @@ -121,13 +125,24 @@ func TestJDBCIO_PostgresReadWrite(t *testing.T) { host := "localhost" jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname) - write := WriteToPostgres(*integration.SchemaIoExpansionAddr, tableName, jdbcUrl, username, password) + write := WriteToPostgres(expansionAddr, tableName, jdbcUrl, username, password) ptest.RunAndValidate(t, write) - read := ReadFromPostgres(*integration.SchemaIoExpansionAddr, tableName, jdbcUrl, username, password) + read := ReadFromPostgres(expansionAddr, tableName, jdbcUrl, username, password) ptest.RunAndValidate(t, read) } func TestMain(m *testing.M) { - ptest.Main(m) + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("schemaio") + if err != nil { + panic(err) + } + expansionAddr = addr + + ptest.MainRet(m) } diff --git a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go index 51af59bfe658..632f343ffca6 100644 --- a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go +++ b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go @@ -17,10 +17,10 @@ package kafka import ( "flag" - "log" - "os" + "fmt" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" @@ -32,6 +32,7 @@ import ( // bootstrapAddr should be set by TestMain once a Kafka cluster has been // started, and is used by each test. var bootstrapAddr string +var expansionAddr string // Populate with expansion address labelled "io". const ( basicTopic = "xlang_kafkaio_basic_test" @@ -56,20 +57,17 @@ func TestKafkaIO_BasicReadWrite(t *testing.T) { } topic := appendUuid(basicTopic) - write := WritePipeline(*integration.IoExpansionAddr, bootstrapAddr, topic, inputs) + write := WritePipeline(expansionAddr, bootstrapAddr, topic, inputs) ptest.RunAndValidate(t, write) - read := ReadPipeline(*integration.IoExpansionAddr, bootstrapAddr, topic, inputs) + read := ReadPipeline(expansionAddr, bootstrapAddr, topic, inputs) ptest.RunAndValidate(t, read) } // TestMain starts up a Kafka cluster from integration.KafkaJar before running // tests through ptest.Main. func TestMain(m *testing.M) { - // Defer os.Exit so it happens after other defers. - var retCode int - defer func() { os.Exit(retCode) }() - flag.Parse() + beam.Init() // Start local Kafka cluster and defer its shutdown. if *integration.BootstrapServers != "" { @@ -77,11 +75,19 @@ func TestMain(m *testing.M) { } else if *integration.KafkaJar != "" { cluster, err := runLocalKafka(*integration.KafkaJar, *integration.KafkaJarTimeout) if err != nil { - log.Fatalf("Kafka cluster failed to start: %v", err) + panic(fmt.Errorf("kafka cluster failed to start: %v", err)) } defer func() { cluster.Shutdown() }() bootstrapAddr = cluster.bootstrapAddr } - retCode = ptest.MainRet(m) + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("io") + if err != nil { + panic(err) + } + expansionAddr = addr + + ptest.MainRet(m) } diff --git a/sdks/go/test/integration/xlang/xlang_test.go b/sdks/go/test/integration/xlang/xlang_test.go index 560bc5c17beb..d1a8af399891 100644 --- a/sdks/go/test/integration/xlang/xlang_test.go +++ b/sdks/go/test/integration/xlang/xlang_test.go @@ -16,6 +16,7 @@ package xlang import ( + "flag" "fmt" "reflect" "sort" @@ -32,6 +33,8 @@ import ( "github.com/apache/beam/sdks/v2/go/test/integration" ) +var expansionAddr string // Populate with expansion address labelled "test". + func init() { beam.RegisterType(reflect.TypeOf((*IntString)(nil)).Elem()) beam.RegisterType(reflect.TypeOf((*StringInt)(nil)).Elem()) @@ -46,8 +49,8 @@ func init() { } func checkFlags(t *testing.T) { - if *integration.TestExpansionAddr == "" { - t.Skip("No expansion address provided.") + if expansionAddr == "" { + t.Skip("No Test expansion address provided.") } } @@ -121,7 +124,7 @@ func TestXLang_Prefix(t *testing.T) { // Using the cross-language transform strings := beam.Create(s, "a", "b", "c") - prefixed := xlang.Prefix(s, "prefix_", *integration.TestExpansionAddr, strings) + prefixed := xlang.Prefix(s, "prefix_", expansionAddr, strings) passert.Equals(s, prefixed, "prefix_a", "prefix_b", "prefix_c") ptest.RunAndValidate(t, p) @@ -137,7 +140,7 @@ func TestXLang_CoGroupBy(t *testing.T) { // Using the cross-language transform col1 := beam.ParDo(s, getIntString, beam.Create(s, IntString{X: 0, Y: "1"}, IntString{X: 0, Y: "2"}, IntString{X: 1, Y: "3"})) col2 := beam.ParDo(s, getIntString, beam.Create(s, IntString{X: 0, Y: "4"}, IntString{X: 1, Y: "5"}, IntString{X: 1, Y: "6"})) - c := xlang.CoGroupByKey(s, *integration.TestExpansionAddr, col1, col2) + c := xlang.CoGroupByKey(s, expansionAddr, col1, col2) sums := beam.ParDo(s, sumCounts, c) formatted := beam.ParDo(s, formatIntStringsFn, sums) passert.Equals(s, formatted, "0:[1 2 4]", "1:[3 5 6]") @@ -155,7 +158,7 @@ func TestXLang_Combine(t *testing.T) { // Using the cross-language transform kvs := beam.Create(s, StringInt{X: "a", Y: 1}, StringInt{X: "a", Y: 2}, StringInt{X: "b", Y: 3}) ins := beam.ParDo(s, getStringInt, kvs) - c := xlang.CombinePerKey(s, *integration.TestExpansionAddr, ins) + c := xlang.CombinePerKey(s, expansionAddr, ins) formatted := beam.ParDo(s, formatStringIntFn, c) passert.Equals(s, formatted, "a:3", "b:3") @@ -173,7 +176,7 @@ func TestXLang_CombineGlobally(t *testing.T) { in := beam.CreateList(s, []int64{1, 2, 3}) // Using the cross-language transform - c := xlang.CombineGlobally(s, *integration.TestExpansionAddr, in) + c := xlang.CombineGlobally(s, expansionAddr, in) formatted := beam.ParDo(s, formatIntFn, c) passert.Equals(s, formatted, "6") @@ -192,7 +195,7 @@ func TestXLang_Flatten(t *testing.T) { col2 := beam.CreateList(s, []int64{4, 5, 6}) // Using the cross-language transform - c := xlang.Flatten(s, *integration.TestExpansionAddr, col1, col2) + c := xlang.Flatten(s, expansionAddr, col1, col2) formatted := beam.ParDo(s, formatIntFn, c) passert.Equals(s, formatted, "1", "2", "3", "4", "5", "6") @@ -210,7 +213,7 @@ func TestXLang_GroupBy(t *testing.T) { // Using the cross-language transform kvs := beam.Create(s, StringInt{X: "0", Y: 1}, StringInt{X: "0", Y: 2}, StringInt{X: "1", Y: 3}) in := beam.ParDo(s, getStringInt, kvs) - out := xlang.GroupByKey(s, *integration.TestExpansionAddr, in) + out := xlang.GroupByKey(s, expansionAddr, in) vals := beam.ParDo(s, collectValues, out) formatted := beam.ParDo(s, formatStringIntsFn, vals) @@ -231,7 +234,7 @@ func TestXLang_Multi(t *testing.T) { side := beam.CreateList(s, []string{"s"}) // Using the cross-language transform - mainOut, sideOut := xlang.Multi(s, *integration.TestExpansionAddr, main1, main2, side) + mainOut, sideOut := xlang.Multi(s, expansionAddr, main1, main2, side) passert.Equals(s, mainOut, "as", "bbs", "xs", "yys", "zzzs") passert.Equals(s, sideOut, "ss") @@ -249,7 +252,7 @@ func TestXLang_Partition(t *testing.T) { col := beam.CreateList(s, []int64{1, 2, 3, 4, 5, 6}) // Using the cross-language transform - out0, out1 := xlang.Partition(s, *integration.TestExpansionAddr, col) + out0, out1 := xlang.Partition(s, expansionAddr, col) formatted0 := beam.ParDo(s, formatIntFn, out0) formatted1 := beam.ParDo(s, formatIntFn, out1) @@ -260,5 +263,16 @@ func TestXLang_Partition(t *testing.T) { } func TestMain(m *testing.M) { - ptest.Main(m) + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("test") + if err != nil { + panic(err) + } + expansionAddr = addr + + ptest.MainRet(m) } From b5907f61848ab623889342ee38c9bbb644b8b979 Mon Sep 17 00:00:00 2001 From: Daniel Oliveira Date: Tue, 22 Mar 2022 19:32:39 -0700 Subject: [PATCH 2/2] Fixup: Avoid panicking with missing expansion services. --- sdks/go/test/integration/io/xlang/debezium/debezium_test.go | 6 ++++-- sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go | 6 ++++-- sdks/go/test/integration/io/xlang/kafka/kafka_test.go | 6 ++++-- sdks/go/test/integration/xlang/xlang_test.go | 6 ++++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go index 8083d4f3d237..d347e4436f2e 100644 --- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go +++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go @@ -18,6 +18,7 @@ package debezium import ( "context" "flag" + "log" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" @@ -100,9 +101,10 @@ func TestMain(m *testing.M) { defer func() { services.Shutdown() }() addr, err := services.GetAddr("debeziumio") if err != nil { - panic(err) + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr } - expansionAddr = addr ptest.MainRet(m) } diff --git a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go index acd8394e6403..286e706c3201 100644 --- a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go +++ b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go @@ -19,6 +19,7 @@ import ( "database/sql" "flag" "fmt" + "log" "testing" "time" @@ -140,9 +141,10 @@ func TestMain(m *testing.M) { defer func() { services.Shutdown() }() addr, err := services.GetAddr("schemaio") if err != nil { - panic(err) + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr } - expansionAddr = addr ptest.MainRet(m) } diff --git a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go index 632f343ffca6..e1cdc2e935db 100644 --- a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go +++ b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go @@ -18,6 +18,7 @@ package kafka import ( "flag" "fmt" + "log" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" @@ -85,9 +86,10 @@ func TestMain(m *testing.M) { defer func() { services.Shutdown() }() addr, err := services.GetAddr("io") if err != nil { - panic(err) + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr } - expansionAddr = addr ptest.MainRet(m) } diff --git a/sdks/go/test/integration/xlang/xlang_test.go b/sdks/go/test/integration/xlang/xlang_test.go index d1a8af399891..f1473f199057 100644 --- a/sdks/go/test/integration/xlang/xlang_test.go +++ b/sdks/go/test/integration/xlang/xlang_test.go @@ -18,6 +18,7 @@ package xlang import ( "flag" "fmt" + "log" "reflect" "sort" "testing" @@ -270,9 +271,10 @@ func TestMain(m *testing.M) { defer func() { services.Shutdown() }() addr, err := services.GetAddr("test") if err != nil { - panic(err) + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr } - expansionAddr = addr ptest.MainRet(m) }