From 52ad21bc59b24272221cf22fd3c9bba322c07117 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 28 Jul 2023 12:09:49 -0700 Subject: [PATCH 1/8] [#23043] Re-enable single iteration for the Go SDK. --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 4c3192949139..5717d53a23af 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -84,7 +84,7 @@ func (n *DataSource) ID() UnitID { // Up initializes this datasource. func (n *DataSource) Up(ctx context.Context) error { // TODO(https://github.com/apache/beam/issues/23043) - Reenable single iteration or more fully rip this out. - safeToSingleIterate := false + safeToSingleIterate := true switch n.Out.(type) { case *Expand, *Multiplex: // CoGBK Expands aren't safe, as they may re-iterate the GBK stream. From 6c35b812afc0470a8e774eb3536aba5e6b477f25 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 28 Jul 2023 15:47:23 -0700 Subject: [PATCH 2/8] more debuging --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 2 +- sdks/go/pkg/beam/core/runtime/exec/plan.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 5717d53a23af..1678377d4f28 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -223,7 +223,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { // Decode key or parallel element. pe, err := cp.Decode(bcr) if err != nil { - return errors.Wrap(err, "source decode failed") + return errors.Wrapf(err, "source decode failed: ws %v, t %v, pn %v, pe %+v", ws, t, pn, pe) } pe.Timestamp = t pe.Windows = ws diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 77063ce18df8..ed48859171cc 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -233,7 +233,8 @@ func (p *Plan) Down(ctx context.Context) error { func (p *Plan) String() string { var units []string - for _, u := range p.units { + for i := len(units) - 1; i >= 0; i-- { + u := p.units[i] units = append(units, fmt.Sprintf("%v: %v", u.ID(), u)) } return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n")) From 71d13d39becf6264bd0b480d857cf199a5ec30a1 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 28 Jul 2023 16:30:59 -0700 Subject: [PATCH 3/8] don't drop plan --- sdks/go/pkg/beam/core/runtime/exec/plan.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index ed48859171cc..8c27191b35ab 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -233,7 +233,7 @@ func (p *Plan) Down(ctx context.Context) error { func (p *Plan) String() string { var units []string - for i := len(units) - 1; i >= 0; i-- { + for i := len(p.units) - 1; i >= 0; i-- { u := p.units[i] units = append(units, fmt.Sprintf("%v: %v", u.ID(), u)) } From 5e164de78bd71c75e1bb67c261f73296ceaf0e3c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 28 Jul 2023 16:35:38 -0700 Subject: [PATCH 4/8] debug text. --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 1678377d4f28..04d1132697da 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -285,6 +285,7 @@ func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *b } if onlyStream { + log.Warnf(ctx, "ONLY STREAM bytes read: %v, iter size: %v", bcr.count, size) // If we know the stream won't be re-iterated, // decode elements on demand instead to reduce memory usage. switch { From 7e6fc785f7717fe664607c9cf3bef3e198dd1b66 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 31 Jul 2023 13:21:10 -0700 Subject: [PATCH 5/8] Fix beam23043 --- .../pkg/beam/core/runtime/exec/datasource.go | 3 ++- .../go/pkg/beam/core/runtime/exec/fullvalue.go | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 04d1132697da..fab9d7a38b78 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -131,7 +131,9 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader bcr := byteCountReader{reader: &r, count: &byteCount} splitPrimaryComplete := map[string]bool{} + count := -1 for { + count++ var err error select { case e, ok := <-elms: @@ -285,7 +287,6 @@ func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *b } if onlyStream { - log.Warnf(ctx, "ONLY STREAM bytes read: %v, iter size: %v", bcr.count, size) // If we know the stream won't be re-iterated, // decode elements on demand instead to reduce memory usage. switch { diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go index 16bc228944ea..7bcc0db5f82e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go @@ -269,6 +269,7 @@ type singleUseMultiChunkReStream struct { // Open returns the Stream from the start of the in-memory ReStream. Returns error if called twice. func (n *singleUseMultiChunkReStream) Open() (Stream, error) { + fmt.Println("CCCC singleUseMultiChunkReStream.Open") if n.r == nil { return nil, errors.New("decodeReStream opened twice") } @@ -296,12 +297,19 @@ func (s *decodeMultiChunkStream) Close() error { // TODO(https://github.com/apache/beam/issues/22901): // Optimize the case where we have length prefixed values // so we can avoid allocating the values in the first place. - for s.next < s.chunk { - err := s.d.DecodeTo(s.r, &s.ret) - if err != nil { - return errors.Wrap(err, "decodeStream value decode failed on close") + + for { + // If we have a stream, we're with the available bytes, we move to close it after this loop. + if s.stream != nil { + break + } + // Drain the whole available iterable to ensure the reader is in the right position. + _, err := s.Read() + if err == io.EOF { + break + } else if err != nil { + return err } - s.next++ } if s.stream != nil { s.stream.Close() From c32907f623340a2f3527df23d23efd03fd14028c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 31 Jul 2023 13:37:40 -0700 Subject: [PATCH 6/8] clean up debugging. --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 5 +---- sdks/go/pkg/beam/core/runtime/exec/fullvalue.go | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index fab9d7a38b78..401cdbef7a37 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -83,7 +83,6 @@ func (n *DataSource) ID() UnitID { // Up initializes this datasource. func (n *DataSource) Up(ctx context.Context) error { - // TODO(https://github.com/apache/beam/issues/23043) - Reenable single iteration or more fully rip this out. safeToSingleIterate := true switch n.Out.(type) { case *Expand, *Multiplex: @@ -131,9 +130,7 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader bcr := byteCountReader{reader: &r, count: &byteCount} splitPrimaryComplete := map[string]bool{} - count := -1 for { - count++ var err error select { case e, ok := <-elms: @@ -225,7 +222,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { // Decode key or parallel element. pe, err := cp.Decode(bcr) if err != nil { - return errors.Wrapf(err, "source decode failed: ws %v, t %v, pn %v, pe %+v", ws, t, pn, pe) + return errors.Wrap(err, "source decode failed") } pe.Timestamp = t pe.Windows = ws diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go index 7bcc0db5f82e..aaa049510f52 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go @@ -269,7 +269,6 @@ type singleUseMultiChunkReStream struct { // Open returns the Stream from the start of the in-memory ReStream. Returns error if called twice. func (n *singleUseMultiChunkReStream) Open() (Stream, error) { - fmt.Println("CCCC singleUseMultiChunkReStream.Open") if n.r == nil { return nil, errors.New("decodeReStream opened twice") } @@ -299,7 +298,8 @@ func (s *decodeMultiChunkStream) Close() error { // so we can avoid allocating the values in the first place. for { - // If we have a stream, we're with the available bytes, we move to close it after this loop. + // If we have a stream, we're finished with the available bytes from the reader, + // so we move to close it after this loop. if s.stream != nil { break } From 7073e7ce07d8b310b1c664338aeffdc42c158564 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 31 Jul 2023 14:41:45 -0700 Subject: [PATCH 7/8] update unit test. --- sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go index d1482d59d160..8fb6dbeff552 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go @@ -397,11 +397,14 @@ func TestDecodeMultiChunkStream(t *testing.T) { if fv, err := ds.Read(); err != io.EOF { t.Errorf("unexpected error on decodeStream.Read after close: %v, %v", fv, err) } - // Check that next was iterated to equal size + // Check that next was iterated to an empty stream. dds := ds.(*decodeMultiChunkStream) - if got, want := dds.next, int64(size); got != want { + if got, want := dds.next, int64(0); got != want { t.Errorf("unexpected configuration after decodeStream.Close: got %v, want %v", got, want) } + if dds.stream != nil { + t.Errorf("got non-nil stream after close: %#v", dds.stream) + } // Check that a 2nd stream will fail: if s, err := drs.Open(); err == nil || s != nil { From 3ddbae0cd221bb2d188d50ccf35b3cbeb382b7b6 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 1 Aug 2023 07:28:42 -0700 Subject: [PATCH 8/8] go fmt --- sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go index 8fb6dbeff552..f8fc002d9466 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue_test.go @@ -403,7 +403,7 @@ func TestDecodeMultiChunkStream(t *testing.T) { t.Errorf("unexpected configuration after decodeStream.Close: got %v, want %v", got, want) } if dds.stream != nil { - t.Errorf("got non-nil stream after close: %#v", dds.stream) + t.Errorf("got non-nil stream after close: %#v", dds.stream) } // Check that a 2nd stream will fail: