diff --git a/sdks/go/pkg/beam/testing/teststream/teststream.go b/sdks/go/pkg/beam/testing/teststream/teststream.go index f9f98a853fb1..a0efaf274725 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream.go @@ -18,7 +18,11 @@ // // See https://beam.apache.org/blog/test-stream/ for more information. // -// TestStream is supported on the Flink runner. +// TestStream is supported on the Flink runner and currently supports int64, +// float64, and boolean types. +// +// TODO(BEAM-12753): Flink currently displays unexpected behavior with TestStream, +// should not be used until this issue is resolved. package teststream import ( @@ -105,6 +109,8 @@ func (c *Config) AdvanceProcessingTimeToInfinity() { // // On the first call, a type will be inferred from the passed in elements, which must be of all the same type. // Type mismatches on this or subsequent calls will cause AddElements to return an error. +// +// Element types must have built-in coders in Beam. func (c *Config) AddElements(timestamp int64, elements ...interface{}) error { t := reflect.TypeOf(elements[0]) if c.elmType == nil { @@ -132,6 +138,25 @@ func (c *Config) AddElements(timestamp int64, elements ...interface{}) error { return nil } +// AddElementList inserts a slice of elements into the stream at the specified event timestamp. Must be called with +// at least one element. +// +// Calls into AddElements, which panics if an inserted type does not match a previously inserted element type. +// +// Element types must have built-in coders in Beam. +func (c *Config) AddElementList(timestamp int64, elements interface{}) error { + val := reflect.ValueOf(elements) + if val.Kind() != reflect.Slice && val.Kind() != reflect.Array { + return fmt.Errorf("input %v must be a slice or array", elements) + } + + var inputs []interface{} + for i := 0; i < val.Len(); i++ { + inputs = append(inputs, val.Index(i).Interface()) + } + return c.AddElements(timestamp, inputs...) +} + // Create inserts a TestStream primitive into a pipeline, taking a scope and a Config object and // producing an output PCollection. The TestStream must be the first PTransform in the // pipeline. diff --git a/sdks/go/pkg/beam/testing/teststream/teststream_test.go b/sdks/go/pkg/beam/testing/teststream/teststream_test.go index 4de32c24133a..006da8b323d6 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream_test.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream_test.go @@ -18,6 +18,7 @@ package teststream import ( "bytes" "reflect" + "strings" "testing" "github.com/apache/beam/sdks/go/pkg/beam" @@ -90,7 +91,7 @@ func TestAddElements(t *testing.T) { [][]interface{}{{"test", "other test"}}, }, { - "doubles", + "floats", [][]interface{}{{1.1, 2.2, 3.3}}, }, } @@ -114,3 +115,58 @@ func TestAddElements(t *testing.T) { } } } + +func TestAddElementList(t *testing.T) { + tests := []struct { + name string + elementGroups [][]interface{} + }{ + { + "bools", + [][]interface{}{{true, false}}, + }, + { + "multiple bools", + [][]interface{}{{true, false}, {true, false}}, + }, + { + "strings", + [][]interface{}{{"test", "other test"}}, + }, + { + "floats", + [][]interface{}{{1.1, 2.2, 3.3}}, + }, + } + for _, tc := range tests { + con := NewConfig() + for i, elements := range tc.elementGroups { + if err := con.AddElementList(100, elements); err != nil { + t.Fatalf("%v failed to add elements to config, got %v", tc.name, err) + } + for j, event := range con.events[i].GetElementEvent().GetElements() { + dec := beam.NewElementDecoder(reflect.TypeOf(elements[j])) + buf := bytes.NewReader(event.GetEncodedElement()) + val, err := dec.Decode(buf) + if err != nil { + t.Errorf("%v, error decoding element, got %v", tc.name, err) + } + if val != elements[j] { + t.Errorf("%v added element mismatch, want %v, got %v", tc.name, elements[j], val) + } + } + } + } +} + +func TestAddElementList_Bad(t *testing.T) { + con := NewConfig() + err := con.AddElementList(100, true) + if err == nil { + t.Fatalf("pipeline succeeded when it should have failed") + } + str := err.Error() + if !strings.Contains(str, "must be a slice or array") { + t.Errorf("pipeline failed but got unexpected error message, got %v", err) + } +} diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 9067941157aa..5c56a45c19f9 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -72,6 +72,8 @@ var portableFilters = []string{ var flinkFilters = []string{ // TODO(BEAM-11500): Flink tests timing out on reads. "TestXLang_Combine.*", + // TODO(BEAM-12753): Flink test stream fails for non-string/byte slice inputs + "TestTestStream.*Sequence.*", } var samzaFilters = []string{ diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index ab1f43544380..e4879f017ad5 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -23,11 +23,11 @@ import ( // TestStreamSequence tests the TestStream primitive by inserting string elements // then advancing the watermark past the point where they were inserted. -func TestStreamSingleSequence() *beam.Pipeline { +func TestStreamStrings() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() con.AddElements(100, "a", "b", "c") - con.AdvanceWatermark(110) + con.AdvanceWatermarkToInfinity() col := teststream.Create(s, con) @@ -36,18 +36,121 @@ func TestStreamSingleSequence() *beam.Pipeline { return p } -// TestStreamTwoSequences tests the TestStream primitive by inserting two sets of -// string elements that arrive on-time into the TestStream -func TestStreamTwoSequences() *beam.Pipeline { +// TestStreamByteSlice tests the TestStream primitive by inserting byte slice elements +// then advancing the watermark to infinity and comparing the output.. +func TestStreamByteSliceSequence() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() - con.AddElements(100, "a", "b", "c") + b := []byte{91, 92, 93} + con.AddElements(1, b) + con.AdvanceWatermarkToInfinity() + col := teststream.Create(s, con) + passert.Count(s, col, "teststream byte", 1) + passert.Equals(s, col, append([]byte{3}, b...)) + return p +} + +// TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements +// then advancing the watermark past the point where they were inserted. +func TestStreamInt64Sequence() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + ele := []int64{91, 92, 93} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream int64", 3) + passert.EqualsList(s, col, ele) + return p +} + +// TestStreamTwoInt64Sequences tests the TestStream primitive by inserting two sets of +// int64 elements that arrive on-time into the TestStream +func TestStreamTwoInt64Sequences() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + eo := []int64{91, 92, 93} + et := []int64{96, 97, 98} + con.AddElementList(100, eo) + con.AdvanceWatermark(110) + con.AddElementList(120, et) + con.AdvanceWatermark(130) + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream int64", 6) + passert.EqualsList(s, col, append(eo, et...)) + return p +} + +// TestStreamFloat64Sequence tests the TestStream primitive by inserting float64 elements +// then advancing the watermark past the point where they were inserted. +func TestStreamFloat64Sequence() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + ele := []float64{91.1, 92.2, 93.3} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream float64", 3) + passert.EqualsList(s, col, ele) + return p +} + +// TestStreamTwoFloat64Sequences tests the TestStream primitive by inserting two sets of +// float64 elements that arrive on-time into the TestStream +func TestStreamTwoFloat64Sequences() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + eo := []float64{91.1, 92.2, 93.3} + et := []float64{96.4, 97.5, 98.6} + con.AddElementList(100, eo) + con.AdvanceWatermark(110) + con.AddElementList(120, et) + con.AdvanceWatermark(130) + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream float64", 6) + passert.EqualsList(s, col, append(eo, et...)) + return p +} + +// TestStreamBoolSequence tests the TestStream primitive by inserting boolean elements +// then advancing the watermark past the point where they were inserted. +func TestStreamBoolSequence() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + ele := []bool{true, false, true} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream bool", 3) + passert.EqualsList(s, col, ele) + return p +} + +// TestStreamTwoBoolSequences tests the TestStream primitive by inserting two sets of +// boolean elements that arrive on-time into the TestStream +func TestStreamTwoBoolSequences() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + eo := []bool{true, false, true} + et := []bool{false, true, false} + con.AddElementList(100, eo) con.AdvanceWatermark(110) - con.AddElements(120, "d", "e", "f") + con.AddElementList(120, et) con.AdvanceWatermark(130) col := teststream.Create(s, con) - passert.Count(s, col, "teststream strings", 6) + passert.Count(s, col, "teststream bool", 6) + passert.EqualsList(s, col, append(eo, et...)) return p } diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go index 3228980d1f62..1eaafad6dc25 100644 --- a/sdks/go/test/integration/primitives/teststream_test.go +++ b/sdks/go/test/integration/primitives/teststream_test.go @@ -22,12 +22,42 @@ import ( "github.com/apache/beam/sdks/go/test/integration" ) -func TestTestStreamSingleSequence(t *testing.T) { +func TestTestStreamStrings(t *testing.T) { integration.CheckFilters(t) - ptest.RunAndValidate(t, TestStreamSingleSequence()) + ptest.RunAndValidate(t, TestStreamStrings()) } -func TestTestStreamTwoSequences(t *testing.T) { +func TestTestStreamByteSliceSequence(t *testing.T) { integration.CheckFilters(t) - ptest.RunAndValidate(t, TestStreamTwoSequences()) + ptest.RunAndValidate(t, TestStreamByteSliceSequence()) +} + +func TestTestStreamInt64Sequence(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamInt64Sequence()) +} + +func TestTestStreamTwoInt64Sequences(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamTwoInt64Sequences()) +} + +func TestTestStreamFloat64Sequence(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamFloat64Sequence()) +} + +func TestTestStreamTwoFloat64Sequences(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamTwoFloat64Sequences()) +} + +func TestTestStreamBoolSequence(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamBoolSequence()) +} + +func TestTestStreamTwoBoolSequences(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamTwoBoolSequences()) }