Skip to content
27 changes: 26 additions & 1 deletion sdks/go/pkg/beam/testing/teststream/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
//
// See https://beam.apache.org/blog/test-stream/ for more information.
//
// TestStream is supported on the Flink runner.
// TestStream is supported on the Flink runner and currently supports int64,
// float64, and boolean types.
//
// TODO(BEAM-12753): Flink currently displays unexpected behavior with TestStream,
// should not be used until this issue is resolved.
package teststream

import (
Expand Down Expand Up @@ -105,6 +109,8 @@ func (c *Config) AdvanceProcessingTimeToInfinity() {
//
// On the first call, a type will be inferred from the passed in elements, which must be of all the same type.
// Type mismatches on this or subsequent calls will cause AddElements to return an error.
//
// Element types must have built-in coders in Beam.
func (c *Config) AddElements(timestamp int64, elements ...interface{}) error {
t := reflect.TypeOf(elements[0])
if c.elmType == nil {
Expand Down Expand Up @@ -132,6 +138,25 @@ func (c *Config) AddElements(timestamp int64, elements ...interface{}) error {
return nil
}

// AddElementList inserts a slice of elements into the stream at the specified event timestamp. Must be called with
// at least one element.
//
// Calls into AddElements, which panics if an inserted type does not match a previously inserted element type.
//
// Element types must have built-in coders in Beam.
func (c *Config) AddElementList(timestamp int64, elements interface{}) error {
val := reflect.ValueOf(elements)
if val.Kind() != reflect.Slice && val.Kind() != reflect.Array {
return fmt.Errorf("input %v must be a slice or array", elements)
}

var inputs []interface{}
for i := 0; i < val.Len(); i++ {
inputs = append(inputs, val.Index(i).Interface())
}
return c.AddElements(timestamp, inputs...)
}

// Create inserts a TestStream primitive into a pipeline, taking a scope and a Config object and
// producing an output PCollection. The TestStream must be the first PTransform in the
// pipeline.
Expand Down
58 changes: 57 additions & 1 deletion sdks/go/pkg/beam/testing/teststream/teststream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package teststream
import (
"bytes"
"reflect"
"strings"
"testing"

"github.com/apache/beam/sdks/go/pkg/beam"
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestAddElements(t *testing.T) {
[][]interface{}{{"test", "other test"}},
},
{
"doubles",
"floats",
[][]interface{}{{1.1, 2.2, 3.3}},
},
}
Expand All @@ -114,3 +115,58 @@ func TestAddElements(t *testing.T) {
}
}
}

func TestAddElementList(t *testing.T) {
tests := []struct {
name string
elementGroups [][]interface{}
}{
{
"bools",
[][]interface{}{{true, false}},
},
{
"multiple bools",
[][]interface{}{{true, false}, {true, false}},
},
{
"strings",
[][]interface{}{{"test", "other test"}},
},
{
"floats",
[][]interface{}{{1.1, 2.2, 3.3}},
},
}
for _, tc := range tests {
con := NewConfig()
for i, elements := range tc.elementGroups {
if err := con.AddElementList(100, elements); err != nil {
t.Fatalf("%v failed to add elements to config, got %v", tc.name, err)
}
for j, event := range con.events[i].GetElementEvent().GetElements() {
dec := beam.NewElementDecoder(reflect.TypeOf(elements[j]))
buf := bytes.NewReader(event.GetEncodedElement())
val, err := dec.Decode(buf)
if err != nil {
t.Errorf("%v, error decoding element, got %v", tc.name, err)
}
if val != elements[j] {
t.Errorf("%v added element mismatch, want %v, got %v", tc.name, elements[j], val)
}
}
}
}
}

func TestAddElementList_Bad(t *testing.T) {
con := NewConfig()
err := con.AddElementList(100, true)
if err == nil {
t.Fatalf("pipeline succeeded when it should have failed")
}
str := err.Error()
if !strings.Contains(str, "must be a slice or array") {
t.Errorf("pipeline failed but got unexpected error message, got %v", err)
}
}
2 changes: 2 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ var portableFilters = []string{
var flinkFilters = []string{
// TODO(BEAM-11500): Flink tests timing out on reads.
"TestXLang_Combine.*",
// TODO(BEAM-12753): Flink test stream fails for non-string/byte slice inputs
"TestTestStream.*Sequence.*",
}

var samzaFilters = []string{
Expand Down
119 changes: 111 additions & 8 deletions sdks/go/test/integration/primitives/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

// TestStreamSequence tests the TestStream primitive by inserting string elements
// then advancing the watermark past the point where they were inserted.
func TestStreamSingleSequence() *beam.Pipeline {
func TestStreamStrings() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
con.AddElements(100, "a", "b", "c")
con.AdvanceWatermark(110)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)

Expand All @@ -36,18 +36,121 @@ func TestStreamSingleSequence() *beam.Pipeline {
return p
}

// TestStreamTwoSequences tests the TestStream primitive by inserting two sets of
// string elements that arrive on-time into the TestStream
func TestStreamTwoSequences() *beam.Pipeline {
// TestStreamByteSlice tests the TestStream primitive by inserting byte slice elements
// then advancing the watermark to infinity and comparing the output..
func TestStreamByteSliceSequence() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
con.AddElements(100, "a", "b", "c")
b := []byte{91, 92, 93}
con.AddElements(1, b)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
passert.Count(s, col, "teststream byte", 1)
passert.Equals(s, col, append([]byte{3}, b...))
return p
}

// TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements
// then advancing the watermark past the point where they were inserted.
func TestStreamInt64Sequence() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
ele := []int64{91, 92, 93}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)

passert.Count(s, col, "teststream int64", 3)
passert.EqualsList(s, col, ele)
return p
}

// TestStreamTwoInt64Sequences tests the TestStream primitive by inserting two sets of
// int64 elements that arrive on-time into the TestStream
func TestStreamTwoInt64Sequences() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
eo := []int64{91, 92, 93}
et := []int64{96, 97, 98}
con.AddElementList(100, eo)
con.AdvanceWatermark(110)
con.AddElementList(120, et)
con.AdvanceWatermark(130)

col := teststream.Create(s, con)

passert.Count(s, col, "teststream int64", 6)
passert.EqualsList(s, col, append(eo, et...))
return p
}

// TestStreamFloat64Sequence tests the TestStream primitive by inserting float64 elements
// then advancing the watermark past the point where they were inserted.
func TestStreamFloat64Sequence() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
ele := []float64{91.1, 92.2, 93.3}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)

passert.Count(s, col, "teststream float64", 3)
passert.EqualsList(s, col, ele)
return p
}

// TestStreamTwoFloat64Sequences tests the TestStream primitive by inserting two sets of
// float64 elements that arrive on-time into the TestStream
func TestStreamTwoFloat64Sequences() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
eo := []float64{91.1, 92.2, 93.3}
et := []float64{96.4, 97.5, 98.6}
con.AddElementList(100, eo)
con.AdvanceWatermark(110)
con.AddElementList(120, et)
con.AdvanceWatermark(130)

col := teststream.Create(s, con)

passert.Count(s, col, "teststream float64", 6)
passert.EqualsList(s, col, append(eo, et...))
return p
}

// TestStreamBoolSequence tests the TestStream primitive by inserting boolean elements
// then advancing the watermark past the point where they were inserted.
func TestStreamBoolSequence() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
ele := []bool{true, false, true}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)

passert.Count(s, col, "teststream bool", 3)
passert.EqualsList(s, col, ele)
return p
}

// TestStreamTwoBoolSequences tests the TestStream primitive by inserting two sets of
// boolean elements that arrive on-time into the TestStream
func TestStreamTwoBoolSequences() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
con := teststream.NewConfig()
eo := []bool{true, false, true}
et := []bool{false, true, false}
con.AddElementList(100, eo)
con.AdvanceWatermark(110)
con.AddElements(120, "d", "e", "f")
con.AddElementList(120, et)
con.AdvanceWatermark(130)

col := teststream.Create(s, con)

passert.Count(s, col, "teststream strings", 6)
passert.Count(s, col, "teststream bool", 6)
passert.EqualsList(s, col, append(eo, et...))
return p
}
38 changes: 34 additions & 4 deletions sdks/go/test/integration/primitives/teststream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,42 @@ import (
"github.com/apache/beam/sdks/go/test/integration"
)

func TestTestStreamSingleSequence(t *testing.T) {
func TestTestStreamStrings(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamSingleSequence())
ptest.RunAndValidate(t, TestStreamStrings())
}

func TestTestStreamTwoSequences(t *testing.T) {
func TestTestStreamByteSliceSequence(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamTwoSequences())
ptest.RunAndValidate(t, TestStreamByteSliceSequence())
}

func TestTestStreamInt64Sequence(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamInt64Sequence())
}

func TestTestStreamTwoInt64Sequences(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamTwoInt64Sequences())
}

func TestTestStreamFloat64Sequence(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamFloat64Sequence())
}

func TestTestStreamTwoFloat64Sequences(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamTwoFloat64Sequences())
}

func TestTestStreamBoolSequence(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamBoolSequence())
}

func TestTestStreamTwoBoolSequences(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, TestStreamTwoBoolSequences())
}