diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 4c3192949139..401cdbef7a37 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -83,8 +83,7 @@ func (n *DataSource) ID() UnitID { // Up initializes this datasource. func (n *DataSource) Up(ctx context.Context) error { - // TODO(https://github.com/apache/beam/issues/23043) - Reenable single iteration or more fully rip this out. - safeToSingleIterate := false + safeToSingleIterate := true switch n.Out.(type) { case *Expand, *Multiplex: // CoGBK Expands aren't safe, as they may re-iterate the GBK stream. diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go index 16bc228944ea..aaa049510f52 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go @@ -296,12 +296,20 @@ func (s *decodeMultiChunkStream) Close() error { // TODO(https://github.com/apache/beam/issues/22901): // Optimize the case where we have length prefixed values // so we can avoid allocating the values in the first place. - for s.next < s.chunk { - err := s.d.DecodeTo(s.r, &s.ret) - if err != nil { - return errors.Wrap(err, "decodeStream value decode failed on close") + + for { + // If we have a stream, we're finished with the available bytes from the reader, + // so we move to close it after this loop. + if s.stream != nil { + break + } + // Drain the whole available iterable to ensure the reader is in the right position. + _, err := s.Read() + if err == io.EOF { + break + } else if err != nil { + return err } - s.next++ } if s.stream != nil { s.stream.Close() diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go index d1482d59d160..f8fc002d9466 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go @@ -397,11 +397,14 @@ func TestDecodeMultiChunkStream(t *testing.T) { if fv, err := ds.Read(); err != io.EOF { t.Errorf("unexpected error on decodeStream.Read after close: %v, %v", fv, err) } - // Check that next was iterated to equal size + // Check that next was iterated to an empty stream. dds := ds.(*decodeMultiChunkStream) - if got, want := dds.next, int64(size); got != want { + if got, want := dds.next, int64(0); got != want { t.Errorf("unexpected configuration after decodeStream.Close: got %v, want %v", got, want) } + if dds.stream != nil { + t.Errorf("got non-nil stream after close: %#v", dds.stream) + } // Check that a 2nd stream will fail: if s, err := drs.Open(); err == nil || s != nil { diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 77063ce18df8..8c27191b35ab 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -233,7 +233,8 @@ func (p *Plan) Down(ctx context.Context) error { func (p *Plan) String() string { var units []string - for _, u := range p.units { + for i := len(p.units) - 1; i >= 0; i-- { + u := p.units[i] units = append(units, fmt.Sprintf("%v: %v", u.ID(), u)) } return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))