From bef7a3a90bf8abe0bd7118cd6fabffd9276bf95b Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 11 Aug 2021 20:56:55 +0000 Subject: [PATCH 1/7] [BEAM-11088] TestStream utility and testing improvements --- .../pkg/beam/testing/teststream/teststream.go | 24 ++++- .../testing/teststream/teststream_test.go | 58 +++++++++++- .../test/integration/primitives/teststream.go | 94 +++++++++++++++++-- .../integration/primitives/teststream_test.go | 28 +++++- 4 files changed, 188 insertions(+), 16 deletions(-) diff --git a/sdks/go/pkg/beam/testing/teststream/teststream.go b/sdks/go/pkg/beam/testing/teststream/teststream.go index f9f98a853fb1..275f5c9eae83 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream.go @@ -18,7 +18,8 @@ // // See https://beam.apache.org/blog/test-stream/ for more information. // -// TestStream is supported on the Flink runner. +// TestStream is supported on the Flink runner and currently supports int64, +// float 64, and boolean types. package teststream import ( @@ -105,6 +106,8 @@ func (c *Config) AdvanceProcessingTimeToInfinity() { // // On the first call, a type will be inferred from the passed in elements, which must be of all the same type. // Type mismatches on this or subsequent calls will cause AddElements to return an error. +// +// Element types must have built-in coders in Beam. func (c *Config) AddElements(timestamp int64, elements ...interface{}) error { t := reflect.TypeOf(elements[0]) if c.elmType == nil { @@ -132,6 +135,25 @@ func (c *Config) AddElements(timestamp int64, elements ...interface{}) error { return nil } +// AddElementSlice inserts a slice of elements into the stream at the specified event timestamp. Must be called with +// at least one element. +// +// Calls into AddElements, which panics if an inserted type does not match a previously inserted element type. +// +// Element types must have built-in coders in Beam. +func (c *Config) AddElementList(timestamp int64, elements interface{}) error { + val := reflect.ValueOf(elements) + if val.Kind() != reflect.Slice && val.Kind() != reflect.Array { + return fmt.Errorf("input %v must be a slice or array", elements) + } + + var inputs []interface{} + for i := 0; i < val.Len(); i++ { + inputs = append(inputs, val.Index(i).Interface()) + } + return c.AddElements(timestamp, inputs...) +} + // Create inserts a TestStream primitive into a pipeline, taking a scope and a Config object and // producing an output PCollection. The TestStream must be the first PTransform in the // pipeline. diff --git a/sdks/go/pkg/beam/testing/teststream/teststream_test.go b/sdks/go/pkg/beam/testing/teststream/teststream_test.go index 4de32c24133a..006da8b323d6 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream_test.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream_test.go @@ -18,6 +18,7 @@ package teststream import ( "bytes" "reflect" + "strings" "testing" "github.com/apache/beam/sdks/go/pkg/beam" @@ -90,7 +91,7 @@ func TestAddElements(t *testing.T) { [][]interface{}{{"test", "other test"}}, }, { - "doubles", + "floats", [][]interface{}{{1.1, 2.2, 3.3}}, }, } @@ -114,3 +115,58 @@ func TestAddElements(t *testing.T) { } } } + +func TestAddElementList(t *testing.T) { + tests := []struct { + name string + elementGroups [][]interface{} + }{ + { + "bools", + [][]interface{}{{true, false}}, + }, + { + "multiple bools", + [][]interface{}{{true, false}, {true, false}}, + }, + { + "strings", + [][]interface{}{{"test", "other test"}}, + }, + { + "floats", + [][]interface{}{{1.1, 2.2, 3.3}}, + }, + } + for _, tc := range tests { + con := NewConfig() + for i, elements := range tc.elementGroups { + if err := con.AddElementList(100, elements); err != nil { + t.Fatalf("%v failed to add elements to config, got %v", tc.name, err) + } + for j, event := range con.events[i].GetElementEvent().GetElements() { + dec := beam.NewElementDecoder(reflect.TypeOf(elements[j])) + buf := bytes.NewReader(event.GetEncodedElement()) + val, err := dec.Decode(buf) + if err != nil { + t.Errorf("%v, error decoding element, got %v", tc.name, err) + } + if val != elements[j] { + t.Errorf("%v added element mismatch, want %v, got %v", tc.name, elements[j], val) + } + } + } + } +} + +func TestAddElementList_Bad(t *testing.T) { + con := NewConfig() + err := con.AddElementList(100, true) + if err == nil { + t.Fatalf("pipeline succeeded when it should have failed") + } + str := err.Error() + if !strings.Contains(str, "must be a slice or array") { + t.Errorf("pipeline failed but got unexpected error message, got %v", err) + } +} diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index ab1f43544380..5f2cbb551b12 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -21,33 +21,107 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/testing/teststream" ) -// TestStreamSequence tests the TestStream primitive by inserting string elements +// TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements // then advancing the watermark past the point where they were inserted. -func TestStreamSingleSequence() *beam.Pipeline { +func TestStreamInt64Sequence() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() - con.AddElements(100, "a", "b", "c") + ele := []int64{91, 92, 93} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream int64", 3) + passert.EqualsList(s, col, ele) + return p +} + +// TestStreamTwoInt64Sequences tests the TestStream primitive by inserting two sets of +// int64 elements that arrive on-time into the TestStream +func TestStreamTwoInt64Sequences() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + eo := []int64{91, 92, 93} + et := []int64{96, 97, 98} + con.AddElementList(100, eo) + con.AdvanceWatermark(110) + con.AddElementList(120, et) + con.AdvanceWatermark(130) + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream int64", 6) + passert.EqualsList(s, col, append(eo, et...)) + return p +} + +// TestStreamFloat64Sequence tests the TestStream primitive by inserting float64 elements +// then advancing the watermark past the point where they were inserted. +func TestStreamFloat64Sequence() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + ele := []float64{91.1, 92.2, 93.3} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream float64", 3) + passert.EqualsList(s, col, ele) + return p +} + +// TestStreamTwoFloat64Sequences tests the TestStream primitive by inserting two sets of +// float64 elements that arrive on-time into the TestStream +func TestStreamTwoFloat64Sequences() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + eo := []float64{91.1, 92.2, 93.3} + et := []float64{96.4, 97.5, 98.6} + con.AddElementList(100, eo) con.AdvanceWatermark(110) + con.AddElementList(120, et) + con.AdvanceWatermark(130) col := teststream.Create(s, con) - passert.Count(s, col, "teststream strings", 3) + passert.Count(s, col, "teststream float64", 6) + passert.EqualsList(s, col, append(eo, et...)) + return p +} + +// TestStreamBoolSequence tests the TestStream primitive by inserting boolean elements +// then advancing the watermark past the point where they were inserted. +func TestStreamBoolSequence() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + ele := []bool{true, false, true} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + passert.Count(s, col, "teststream bool", 3) + passert.EqualsList(s, col, ele) return p } -// TestStreamTwoSequences tests the TestStream primitive by inserting two sets of -// string elements that arrive on-time into the TestStream -func TestStreamTwoSequences() *beam.Pipeline { +// TestStreamTwoBoolSequences tests the TestStream primitive by inserting two sets of +// boolean elements that arrive on-time into the TestStream +func TestStreamTwoBoolSequences() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() - con.AddElements(100, "a", "b", "c") + eo := []bool{true, false, true} + et := []bool{false, true, false} + con.AddElementList(100, eo) con.AdvanceWatermark(110) - con.AddElements(120, "d", "e", "f") + con.AddElementList(120, et) con.AdvanceWatermark(130) col := teststream.Create(s, con) - passert.Count(s, col, "teststream strings", 6) + passert.Count(s, col, "teststream bool", 6) + passert.EqualsList(s, col, append(eo, et...)) return p } diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go index 3228980d1f62..08dc40707801 100644 --- a/sdks/go/test/integration/primitives/teststream_test.go +++ b/sdks/go/test/integration/primitives/teststream_test.go @@ -22,12 +22,32 @@ import ( "github.com/apache/beam/sdks/go/test/integration" ) -func TestTestStreamSingleSequence(t *testing.T) { +func TestTestStreamInt64Sequence(t *testing.T) { integration.CheckFilters(t) - ptest.RunAndValidate(t, TestStreamSingleSequence()) + ptest.RunAndValidate(t, TestStreamInt64Sequence()) } -func TestTestStreamTwoSequences(t *testing.T) { +func TestTestStreamTwoInt64Sequences(t *testing.T) { integration.CheckFilters(t) - ptest.RunAndValidate(t, TestStreamTwoSequences()) + ptest.RunAndValidate(t, TestStreamTwoInt64Sequences()) +} + +func TestTestStreamFloat64Sequence(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamFloat64Sequence()) +} + +func TestTestStreamTwoFloat64Sequences(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamTwoFloat64Sequences()) +} + +func TestTestStreamBoolSequence(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamBoolSequence()) +} + +func TestTestStreamTwoBoolSequences(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamTwoBoolSequences()) } From 65545f27bcdfa57018750b01d26770cd853914a6 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Thu, 12 Aug 2021 08:53:45 -0400 Subject: [PATCH 2/7] Update sdks/go/pkg/beam/testing/teststream/teststream.go Co-authored-by: Robert Burke --- sdks/go/pkg/beam/testing/teststream/teststream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/testing/teststream/teststream.go b/sdks/go/pkg/beam/testing/teststream/teststream.go index 275f5c9eae83..744abe3cafc3 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream.go @@ -19,7 +19,7 @@ // See https://beam.apache.org/blog/test-stream/ for more information. // // TestStream is supported on the Flink runner and currently supports int64, -// float 64, and boolean types. +// float64, and boolean types. package teststream import ( From c7b5bc34c2c26747de14aa501dc2073a904f456a Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 12 Aug 2021 15:18:28 +0000 Subject: [PATCH 3/7] [BEAM-11088] Add byte slice run to temporarily address Flink memory bug --- .../go/test/integration/primitives/teststream.go | 16 ++++++++++++++++ .../integration/primitives/teststream_test.go | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index 5f2cbb551b12..53061e7c9a00 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -21,6 +21,22 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/testing/teststream" ) +// TestStreamByteSlice runs a byte slice through the runner's TestStream implementation +// to prevent memory errors from being thrown by Flink. +// +// TODO(BEAM-12753) Update this test once the first TestStream call on Flink stops throwing +// memory errors. +func TestStreamByteSlice() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + b := []byte{91, 92, 93} + con.AddElements(1, b) + con.AdvanceWatermarkToInfinity() + col := teststream.Create(s, con) + passert.Count(s, col, "byte primer", 1) + return p +} + // TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements // then advancing the watermark past the point where they were inserted. func TestStreamInt64Sequence() *beam.Pipeline { diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go index 08dc40707801..d0eaf1b2d533 100644 --- a/sdks/go/test/integration/primitives/teststream_test.go +++ b/sdks/go/test/integration/primitives/teststream_test.go @@ -22,6 +22,11 @@ import ( "github.com/apache/beam/sdks/go/test/integration" ) +func TestTestStreamByteSlice(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamByteSlice()) +} + func TestTestStreamInt64Sequence(t *testing.T) { integration.CheckFilters(t) ptest.RunAndValidate(t, TestStreamInt64Sequence()) From 9d2021d636e7da9c4835a4dc5075a053c913fbc6 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 12 Aug 2021 18:50:20 +0000 Subject: [PATCH 4/7] [BEAM-11088] Disable Flink teststream integration tests, call out unexpected behavior --- sdks/go/pkg/beam/testing/teststream/teststream.go | 3 +++ sdks/go/test/integration/integration.go | 2 ++ sdks/go/test/integration/primitives/teststream.go | 10 ++++------ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/testing/teststream/teststream.go b/sdks/go/pkg/beam/testing/teststream/teststream.go index 744abe3cafc3..1b9c92f4deb1 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream.go @@ -20,6 +20,9 @@ // // TestStream is supported on the Flink runner and currently supports int64, // float64, and boolean types. +// +// TODO(BEAM-12753): Flink currently displays unexpected behavior with TestStream, +// should not be used until this issue is resolved. package teststream import ( diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 9067941157aa..5c56a45c19f9 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -72,6 +72,8 @@ var portableFilters = []string{ var flinkFilters = []string{ // TODO(BEAM-11500): Flink tests timing out on reads. "TestXLang_Combine.*", + // TODO(BEAM-12753): Flink test stream fails for non-string/byte slice inputs + "TestTestStream.*Sequence.*", } var samzaFilters = []string{ diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index 53061e7c9a00..bec56e792236 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -21,11 +21,8 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/testing/teststream" ) -// TestStreamByteSlice runs a byte slice through the runner's TestStream implementation -// to prevent memory errors from being thrown by Flink. -// -// TODO(BEAM-12753) Update this test once the first TestStream call on Flink stops throwing -// memory errors. +// TestStreamByteSlic tests the TestStream primitive by inserting byte slice elements +// then advancing the watermark to infinity and comparing the output.. func TestStreamByteSlice() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() @@ -33,7 +30,8 @@ func TestStreamByteSlice() *beam.Pipeline { con.AddElements(1, b) con.AdvanceWatermarkToInfinity() col := teststream.Create(s, con) - passert.Count(s, col, "byte primer", 1) + passert.Count(s, col, "teststream byte", 1) + passert.Equals(s, col, append([]byte{3}, b...)) return p } From 0a5fb17b931b547dd7f822a05be2b4f47ee119a7 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 12 Aug 2021 18:55:36 +0000 Subject: [PATCH 5/7] [BEAM-11088] Fix typos --- sdks/go/pkg/beam/testing/teststream/teststream.go | 2 +- sdks/go/test/integration/primitives/teststream.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/testing/teststream/teststream.go b/sdks/go/pkg/beam/testing/teststream/teststream.go index 1b9c92f4deb1..a0efaf274725 100644 --- a/sdks/go/pkg/beam/testing/teststream/teststream.go +++ b/sdks/go/pkg/beam/testing/teststream/teststream.go @@ -138,7 +138,7 @@ func (c *Config) AddElements(timestamp int64, elements ...interface{}) error { return nil } -// AddElementSlice inserts a slice of elements into the stream at the specified event timestamp. Must be called with +// AddElementList inserts a slice of elements into the stream at the specified event timestamp. Must be called with // at least one element. // // Calls into AddElements, which panics if an inserted type does not match a previously inserted element type. diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index bec56e792236..ccbe11ff3216 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -21,7 +21,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/testing/teststream" ) -// TestStreamByteSlic tests the TestStream primitive by inserting byte slice elements +// TestStreamByteSlice tests the TestStream primitive by inserting byte slice elements // then advancing the watermark to infinity and comparing the output.. func TestStreamByteSlice() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() From bfcff41f9525fc3cc93f9a881e0c4505383cea08 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 13 Aug 2021 14:14:15 +0000 Subject: [PATCH 6/7] [BEAM-11088] Reinstate string tests, filter out all others --- .../test/integration/primitives/teststream.go | 17 ++++++++++++++++- .../integration/primitives/teststream_test.go | 9 +++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index ccbe11ff3216..e4879f017ad5 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -21,9 +21,24 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/testing/teststream" ) +// TestStreamSequence tests the TestStream primitive by inserting string elements +// then advancing the watermark past the point where they were inserted. +func TestStreamStrings() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + con := teststream.NewConfig() + con.AddElements(100, "a", "b", "c") + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + + passert.Count(s, col, "teststream strings", 3) + + return p +} + // TestStreamByteSlice tests the TestStream primitive by inserting byte slice elements // then advancing the watermark to infinity and comparing the output.. -func TestStreamByteSlice() *beam.Pipeline { +func TestStreamByteSliceSequence() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() b := []byte{91, 92, 93} diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go index d0eaf1b2d533..5b3295c4555e 100644 --- a/sdks/go/test/integration/primitives/teststream_test.go +++ b/sdks/go/test/integration/primitives/teststream_test.go @@ -22,9 +22,14 @@ import ( "github.com/apache/beam/sdks/go/test/integration" ) -func TestTestStreamByteSlice(t *testing.T) { +func TestTestStreamStrings(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamStrings()) +} + +func TestTestStreamByteSliceSequence(t *testing.T) { integration.CheckFilters(t) - ptest.RunAndValidate(t, TestStreamByteSlice()) + ptest.RunAndValidate(t, TestStreamByteSliceSequence()) } func TestTestStreamInt64Sequence(t *testing.T) { From b267944fbed5d3a4f61dcecd924fb3cca14518d2 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 13 Aug 2021 14:14:51 +0000 Subject: [PATCH 7/7] fix formatting --- sdks/go/test/integration/primitives/teststream_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go index 5b3295c4555e..1eaafad6dc25 100644 --- a/sdks/go/test/integration/primitives/teststream_test.go +++ b/sdks/go/test/integration/primitives/teststream_test.go @@ -23,8 +23,8 @@ import ( ) func TestTestStreamStrings(t *testing.T) { - integration.CheckFilters(t) - ptest.RunAndValidate(t, TestStreamStrings()) + integration.CheckFilters(t) + ptest.RunAndValidate(t, TestStreamStrings()) } func TestTestStreamByteSliceSequence(t *testing.T) {