From 517dc6e16136d06af012bd40800d84d4bf4924f9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 1 Mar 2022 15:15:52 -0500 Subject: [PATCH 01/10] Bundle finalization harness side changes --- sdks/go/pkg/beam/core/runtime/exec/cogbk.go | 17 ++++ sdks/go/pkg/beam/core/runtime/exec/combine.go | 29 ++++-- .../beam/core/runtime/exec/combine_test.go | 9 ++ .../go/pkg/beam/core/runtime/exec/datasink.go | 9 ++ .../pkg/beam/core/runtime/exec/datasource.go | 8 ++ sdks/go/pkg/beam/core/runtime/exec/discard.go | 13 ++- sdks/go/pkg/beam/core/runtime/exec/flatten.go | 9 ++ sdks/go/pkg/beam/core/runtime/exec/fn.go | 52 +++++++++-- .../pkg/beam/core/runtime/exec/multiplex.go | 9 ++ sdks/go/pkg/beam/core/runtime/exec/pardo.go | 54 ++++++++++- .../pkg/beam/core/runtime/exec/pcollection.go | 9 ++ sdks/go/pkg/beam/core/runtime/exec/plan.go | 33 +++++++ .../pkg/beam/core/runtime/exec/reshuffle.go | 17 ++++ sdks/go/pkg/beam/core/runtime/exec/sdf.go | 33 +++++++ .../pkg/beam/core/runtime/exec/sideinput.go | 9 ++ sdks/go/pkg/beam/core/runtime/exec/unit.go | 12 +++ .../pkg/beam/core/runtime/exec/unit_test.go | 33 +++++++ sdks/go/pkg/beam/core/runtime/exec/util.go | 25 ++++- sdks/go/pkg/beam/core/runtime/exec/window.go | 8 ++ .../pkg/beam/core/runtime/harness/harness.go | 91 ++++++++++++++++--- sdks/go/pkg/beam/runners/direct/buffer.go | 16 ++++ sdks/go/pkg/beam/runners/direct/gbk.go | 17 ++++ sdks/go/pkg/beam/runners/direct/impulse.go | 9 ++ 23 files changed, 485 insertions(+), 36 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go index 7107118db7c0..3581f5293c76 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go +++ b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "fmt" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -77,6 +78,14 @@ func (n *Inject) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *Inject) FinalizeBundle(ctx context.Context) error { + return n.Out.FinishBundle(ctx) +} + +func (n *Inject) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *Inject) Down(ctx context.Context) error { return nil } @@ -119,6 +128,14 @@ func (n *Expand) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *Expand) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *Expand) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *Expand) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go index 07dbb2f9a84d..d8350cb30fbe 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go @@ -22,6 +22,7 @@ import ( "io" "path" "reflect" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -74,7 +75,7 @@ func (n *Combine) Up(ctx context.Context) error { n.states = metrics.NewPTransformState(n.PID) - if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil, nil); err != nil { return n.fail(err) } @@ -107,7 +108,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte } in := &MainInput{Key: FullValue{Elm: a}} - val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b) + val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, nil, b) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking MergeAccumulators")) } @@ -206,6 +207,14 @@ func (n *Combine) FinishBundle(ctx context.Context) error { return nil } +func (n *Combine) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *Combine) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + // Down runs the ParDo's TeardownFn. func (n *Combine) Down(ctx context.Context) error { if n.status == Down { @@ -213,7 +222,7 @@ func (n *Combine) Down(ctx context.Context) error { } n.status = Down - if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil { n.err.TrySetError(err) } return n.err.Error() @@ -230,7 +239,7 @@ func (n *Combine) newAccum(ctx context.Context, key interface{}) (interface{}, e opt = &MainInput{Key: FullValue{Elm: key}} } - val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt) + val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt, nil) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking CreateAccumulator")) } @@ -273,7 +282,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, value interface{}, t } v := n.aiValConvert(value) - val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v) + val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, nil, v) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking AddInput")) } @@ -287,7 +296,7 @@ func (n *Combine) extract(ctx context.Context, accum interface{}) (interface{}, return accum, nil } - val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum) + val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, nil, accum) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking ExtractOutput")) } @@ -417,6 +426,14 @@ func (n *LiftedCombine) FinishBundle(ctx context.Context) error { return n.Combine.FinishBundle(n.Combine.ctx) } +func (n *LiftedCombine) FinalizeBundle(ctx context.Context) error { + return n.Combine.FinalizeBundle(ctx) +} + +func (n *LiftedCombine) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Combine.GetBundleExpirationTime(ctx) +} + // Down tears down the cache. func (n *LiftedCombine) Down(ctx context.Context) error { if err := n.Combine.Down(ctx); err != nil { diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go index 5797fbfa4c02..56f75770e181 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go @@ -23,6 +23,7 @@ import ( "runtime" "strconv" "testing" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -619,6 +620,14 @@ func (n *simpleGBK) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *simpleGBK) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *simpleGBK) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *simpleGBK) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go index 36f2a5195ca2..a4f8b4a39255 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "sync/atomic" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -97,6 +98,14 @@ func (n *DataSink) FinishBundle(ctx context.Context) error { return n.w.Close() } +func (m *DataSink) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (m *DataSink) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + // Down is a no-op. func (n *DataSink) Down(ctx context.Context) error { return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 2e8f25d41d31..16e068d5ae6e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -268,6 +268,14 @@ func (n *DataSource) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *DataSource) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *DataSource) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + // Down resets the source. func (n *DataSource) Down(ctx context.Context) error { n.source = nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/discard.go b/sdks/go/pkg/beam/core/runtime/exec/discard.go index ad1e81f24da4..3ee66dd7048f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/discard.go +++ b/sdks/go/pkg/beam/core/runtime/exec/discard.go @@ -15,7 +15,10 @@ package exec -import "context" +import ( + "context" + "time" +) // Discard silently discard all elements. It is implicitly inserted for any // loose ends in the pipeline. @@ -44,6 +47,14 @@ func (d *Discard) FinishBundle(ctx context.Context) error { return nil } +func (d *Discard) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (d *Discard) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + func (d *Discard) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/flatten.go b/sdks/go/pkg/beam/core/runtime/exec/flatten.go index 113bcabb9a62..c8b522c415da 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/flatten.go +++ b/sdks/go/pkg/beam/core/runtime/exec/flatten.go @@ -18,6 +18,7 @@ package exec import ( "context" "fmt" + "time" ) // Flatten is a fan-in node. It ensures that Start/FinishBundle are only @@ -66,6 +67,14 @@ func (m *Flatten) FinishBundle(ctx context.Context) error { return m.Out.FinishBundle(ctx) } +func (m *Flatten) FinalizeBundle(ctx context.Context) error { + return m.Out.FinalizeBundle(ctx) +} + +func (m *Flatten) GetBundleExpirationTime(ctx context.Context) time.Time { + return m.Out.GetBundleExpirationTime(ctx) +} + func (m *Flatten) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index 456edeba4838..d69eb88919cb 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "reflect" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" @@ -39,23 +40,47 @@ type MainInput struct { RTracker sdf.RTracker } +type bundleFinalizationCallback struct { + callback func() error + validUntil time.Time +} + +// bundleFinalizer holds all the user defined callbacks to be run on bundle finalization. +// Implements typex.BundleFinalization +type bundleFinalizer struct { + callbacks []bundleFinalizationCallback + lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer +} + +// RegisterCallback is used to register callbacks during DoFn execution. +func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) { + callback := bundleFinalizationCallback{ + callback: cb, + validUntil: time.Now().Add(t), + } + bf.callbacks = append(bf.callbacks, callback) + if bf.lastValidCallback.Before(callback.validUntil) { + bf.lastValidCallback = callback.validUntil + } +} + // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) { +func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke } inv := newInvoker(fn) - return inv.Invoke(ctx, pn, ws, ts, opt, extra...) + return inv.Invoke(ctx, pn, ws, ts, opt, bf, extra...) } // InvokeWithoutEventTime runs the given function at time 0 in the global window. -func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) { +func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke } inv := newInvoker(fn) - return inv.InvokeWithoutEventTime(ctx, opt, extra...) + return inv.InvokeWithoutEventTime(ctx, opt, bf, extra...) } // invoker is a container struct for hot path invocations of DoFns, to avoid @@ -64,9 +89,9 @@ type invoker struct { fn *funcx.Fn args []interface{} // TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement. - ctxIdx, pnIdx, wndIdx, etIdx int // specialized input indexes - outEtIdx, outErrIdx int // specialized output indexes - in, out []int // general indexes + ctxIdx, pnIdx, wndIdx, etIdx, bfIdx int // specialized input indexes + outEtIdx, outErrIdx int // specialized output indexes + in, out []int // general indexes ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue. elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types. @@ -99,6 +124,10 @@ func newInvoker(fn *funcx.Fn) *invoker { if n.outErrIdx, ok = fn.Error(); !ok { n.outErrIdx = -1 } + // TODO(@damccorm) - add this back in once BundleFinalization is implemented + // if n.bfIdx, ok = fn.BundleFinalization(); !ok { + // n.bfIdx = -1 + // } n.initCall() @@ -115,13 +144,13 @@ func (n *invoker) Reset() { } // InvokeWithoutEventTime runs the function at time 0 in the global window. -func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, extra ...interface{}) (*FullValue, error) { - return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, extra...) +func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { + return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, bf, extra...) } // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) { +func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { // (1) Populate contexts // extract these to make things easier to read. args := n.args @@ -143,6 +172,9 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind if n.etIdx >= 0 { args[n.etIdx] = ts } + if n.bfIdx >= 0 { + args[n.bfIdx] = bf + } // (2) Main input from value, if any. i := 0 diff --git a/sdks/go/pkg/beam/core/runtime/exec/multiplex.go b/sdks/go/pkg/beam/core/runtime/exec/multiplex.go index 8edb6c99d9ad..e7a8b7b815ad 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/multiplex.go +++ b/sdks/go/pkg/beam/core/runtime/exec/multiplex.go @@ -18,6 +18,7 @@ package exec import ( "context" "fmt" + "time" ) // Multiplex is a fan-out node. It simply forwards any call to all downstream nodes. @@ -53,6 +54,14 @@ func (m *Multiplex) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, m.Out...) } +func (m *Multiplex) FinalizeBundle(ctx context.Context) error { + return MultiFinalizeBundle(ctx, m.Out...) +} + +func (m *Multiplex) GetBundleExpirationTime(ctx context.Context) time.Time { + return MultiGetBundleExpirationTime(ctx, m.Out...) +} + func (m *Multiplex) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index acd086745ce7..7a196af3ba1a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "path" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" @@ -42,6 +43,7 @@ type ParDo struct { emitters []ReusableEmitter ctx context.Context inv *invoker + bf bundleFinalizer reader StateReader cache *cacheElm @@ -76,6 +78,10 @@ func (n *ParDo) Up(ctx context.Context) error { } n.status = Up n.inv = newInvoker(n.Fn.ProcessElementFn()) + n.bf = bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } n.states = metrics.NewPTransformState(n.PID) @@ -83,7 +89,7 @@ func (n *ParDo) Up(ctx context.Context) error { // Subsequent bundles might run this same node, and the context here would be // incorrectly refering to the older bundleId. setupCtx := metrics.SetPTransformID(ctx, n.PID) - if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil); err != nil { return n.fail(err) } @@ -220,6 +226,46 @@ func (n *ParDo) FinishBundle(_ context.Context) error { return nil } +func (n *ParDo) FinalizeBundle(ctx context.Context) error { + failedIndices := []int{} + for idx, bfc := range n.bf.callbacks { + if time.Now().Before(bfc.validUntil) { + if err := bfc.callback(); err != nil { + failedIndices = append(failedIndices, idx) + } + } + } + + newFinalizer := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + + for _, idx := range failedIndices { + newFinalizer.callbacks = append(newFinalizer.callbacks, n.bf.callbacks[idx]) + if newFinalizer.lastValidCallback.Before(n.bf.callbacks[idx].validUntil) { + newFinalizer.lastValidCallback = n.bf.callbacks[idx].validUntil + } + } + + outErr := MultiFinalizeBundle(ctx, n.Out...) + if len(failedIndices) > 0 { + err := errors.Errorf("Pardo %v failed %v callbacks", n.Fn.Fn.String(), len(failedIndices)) + if outErr != nil { + return errors.Wrap(err, outErr.Error()) + } + } + return outErr +} + +func (n *ParDo) GetBundleExpirationTime(ctx context.Context) time.Time { + outExp := MultiGetBundleExpirationTime(ctx, n.Out...) + if outExp.Before(n.bf.lastValidCallback) { + return n.bf.lastValidCallback + } + return outExp +} + // Down performs best-effort teardown of DoFn resources. (May not run.) func (n *ParDo) Down(ctx context.Context) error { if n.status == Down { @@ -229,7 +275,7 @@ func (n *ParDo) Down(ctx context.Context) error { n.reader = nil n.cache = nil - if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil { n.err.TrySetError(err) } return n.err.Error() @@ -295,7 +341,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex. if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.cache.extra...) + val, err = Invoke(ctx, pn, ws, ts, fn, opt, &n.bf, n.cache.extra...) if err != nil { return nil, err } @@ -313,7 +359,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.cache.extra...) + val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, &n.bf, n.cache.extra...) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go index 3b2e3ab3bf2c..d7878668a85a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go @@ -22,6 +22,7 @@ import ( "math/rand" "sync" "sync/atomic" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" ) @@ -127,6 +128,14 @@ func (p *PCollection) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, p.Out) } +func (p *PCollection) FinalizeBundle(ctx context.Context) error { + return MultiFinalizeBundle(ctx, p.Out) +} + +func (p *PCollection) GetBundleExpirationTime(ctx context.Context) time.Time { + return MultiGetBundleExpirationTime(ctx, p.Out) +} + // Down is a no-op. func (p *PCollection) Down(ctx context.Context) error { return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 7f89ce37322c..cba88d17611f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -131,6 +132,38 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro return nil } +func (p *Plan) Finalize(ctx context.Context, id string) error { + if p.status != Up { + return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) + } + var err error + p.status = Active + for _, root := range p.roots { + if rootErr := callNoPanic(ctx, root.FinalizeBundle); rootErr != nil { + if err == nil { + err = errors.Wrapf(rootErr, "while executing FinalizeBundle for %v", p) + } else { + err = errors.Wrap(rootErr, err.Error()) + } + } + } + p.status = Up + + return err +} + +func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time { + exp := time.Now() + for _, root := range p.roots { + rootExp := root.GetBundleExpirationTime(ctx) + if exp.Before(rootExp) { + exp = rootExp + } + } + + return exp +} + // Down takes the plan and associated units down. Does not panic. func (p *Plan) Down(ctx context.Context) error { if p.status == Down { diff --git a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go index 3a1900f6dc85..c79ae6e1c8f8 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go +++ b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math/rand" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -79,6 +80,14 @@ func (n *ReshuffleInput) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, n.Out) } +func (n *ReshuffleInput) FinalizeBundle(ctx context.Context) error { + return MultiFinalizeBundle(ctx, n.Out) +} + +func (n *ReshuffleInput) GetBundleExpirationTime(ctx context.Context) time.Time { + return MultiGetBundleExpirationTime(ctx, n.Out) +} + // Down is a no-op. func (n *ReshuffleInput) Down(ctx context.Context) error { return nil @@ -158,6 +167,14 @@ func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, n.Out) } +func (n *ReshuffleOutput) FinalizeBundle(ctx context.Context) error { + return MultiFinalizeBundle(ctx, n.Out) +} + +func (n *ReshuffleOutput) GetBundleExpirationTime(ctx context.Context) time.Time { + return MultiGetBundleExpirationTime(ctx, n.Out) +} + // Down is a no-op. func (n *ReshuffleOutput) Down(ctx context.Context) error { return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index 3b14afd1323b..65d4c1e47206 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "path" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" @@ -89,6 +90,14 @@ func (n *PairWithRestriction) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *PairWithRestriction) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *PairWithRestriction) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + // Down currently does nothing. func (n *PairWithRestriction) Down(_ context.Context) error { return nil @@ -201,6 +210,14 @@ func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *SplitAndSizeRestrictions) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *SplitAndSizeRestrictions) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + // Down currently does nothing. func (n *SplitAndSizeRestrictions) Down(_ context.Context) error { return nil @@ -379,6 +396,14 @@ func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) return n.PDo.FinishBundle(ctx) } +func (n *ProcessSizedElementsAndRestrictions) FinalizeBundle(ctx context.Context) error { + return n.PDo.FinalizeBundle(ctx) +} + +func (n *ProcessSizedElementsAndRestrictions) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.PDo.GetBundleExpirationTime(ctx) +} + // Down calls the ParDo's Down method. func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error { return n.PDo.Down(ctx) @@ -745,6 +770,14 @@ func (n *SdfFallback) FinishBundle(ctx context.Context) error { return n.PDo.FinishBundle(ctx) } +func (n *SdfFallback) FinalizeBundle(ctx context.Context) error { + return n.PDo.FinalizeBundle(ctx) +} + +func (n *SdfFallback) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.PDo.GetBundleExpirationTime(ctx) +} + // Down calls the ParDo's Down method. func (n *SdfFallback) Down(ctx context.Context) error { return n.PDo.Down(ctx) diff --git a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go index 0d26c315d3b0..4c23e0697823 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "io" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" @@ -205,6 +206,14 @@ func (n *FixedKey) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *FixedKey) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *FixedKey) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *FixedKey) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit.go b/sdks/go/pkg/beam/core/runtime/exec/unit.go index 04635086ab12..1a427ed6f983 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit.go @@ -17,6 +17,7 @@ package exec import ( "context" + "time" ) // UnitID is a unit identifier. Used for debugging. @@ -42,6 +43,17 @@ type Unit interface { // data connections must be closed. FinishBundle(ctx context.Context) error + // FinalizeBundle runs any non-expired user callbacks registered via the + // BundleFinalizer during function execution. + FinalizeBundle(ctx context.Context) error + + // GetBundleExpirationTime gets the earliest time when it is safe to + // expire all of the bundle's finalization callbacks. If it returns a + // time earlier than the current time, that indicates that we are + // completely done with the bundle. If no callbacks are registered for the + // bundle, returns the current time. + GetBundleExpirationTime(ctx context.Context) time.Time + // Down tears down the processing node. It is notably called if the unit // or plan encounters an error and must thus robustly handle cleanup of // unfinished bundles. If a unit itself (as opposed to downstream units) diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go index efb9edcdbce6..a7ada0d59c18 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go @@ -18,6 +18,7 @@ package exec import ( "context" "io" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -69,6 +70,14 @@ func (n *CaptureNode) FinishBundle(ctx context.Context) error { return nil } +func (n *CaptureNode) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (n *CaptureNode) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + func (n *CaptureNode) Down(ctx context.Context) error { if n.status != Up { return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) @@ -150,6 +159,14 @@ func (n *FixedRoot) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *FixedRoot) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (n *FixedRoot) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + func (n *FixedRoot) Down(ctx context.Context) error { return nil } @@ -199,6 +216,14 @@ func (n *BenchRoot) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *BenchRoot) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (n *BenchRoot) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + func (n *BenchRoot) Down(ctx context.Context) error { return nil } @@ -253,6 +278,14 @@ func (n *BlockingNode) FinishBundle(ctx context.Context) error { return err } +func (n *BlockingNode) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (n *BlockingNode) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + func (n *BlockingNode) Down(ctx context.Context) error { if n.status != Up { return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go index 2996b6dd159b..4270c775e406 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/util.go +++ b/sdks/go/pkg/beam/core/runtime/exec/util.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "runtime/debug" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -72,7 +73,7 @@ func MultiStartBundle(ctx context.Context, id string, data DataContext, list ... return nil } -// MultiFinishBundle calls StartBundle on multiple nodes. Convenience function. +// MultiFinishBundle calls FinishBundle on multiple nodes. Convenience function. func MultiFinishBundle(ctx context.Context, list ...Node) error { for _, n := range list { if err := n.FinishBundle(ctx); err != nil { @@ -82,6 +83,28 @@ func MultiFinishBundle(ctx context.Context, list ...Node) error { return nil } +// MultiFinalizeBundle calls FinalizeBundle on multiple nodes. Convenience function. +func MultiFinalizeBundle(ctx context.Context, list ...Node) error { + for _, n := range list { + if err := n.FinalizeBundle(ctx); err != nil { + return err + } + } + return nil +} + +// MultiGetBundleExpirationTime calls GetBundleExpirationTime on multiple nodes. Convenience function. +func MultiGetBundleExpirationTime(ctx context.Context, list ...Node) time.Time { + exp := time.Now() + for _, n := range list { + exp2 := n.GetBundleExpirationTime(ctx) + if exp.Before(exp2) { + exp = exp2 + } + } + return exp +} + // IDs returns the unit IDs of the given nodes. func IDs(list ...Node) []UnitID { var ret []UnitID diff --git a/sdks/go/pkg/beam/core/runtime/exec/window.go b/sdks/go/pkg/beam/core/runtime/exec/window.go index 02640f5dcfad..e929027aaadb 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/window.go +++ b/sdks/go/pkg/beam/core/runtime/exec/window.go @@ -89,6 +89,14 @@ func (w *WindowInto) FinishBundle(ctx context.Context) error { return w.Out.FinishBundle(ctx) } +func (w *WindowInto) FinalizeBundle(ctx context.Context) error { + return w.Out.FinalizeBundle(ctx) +} + +func (w *WindowInto) GetBundleExpirationTime(ctx context.Context) time.Time { + return w.Out.GetBundleExpirationTime(ctx) +} + func (w *WindowInto) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5b0e77883f43..87935dbf6324 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -102,16 +102,17 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { sideCache.Init(cacheSize) ctrl := &control{ - lookupDesc: lookupDesc, - descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor), - plans: make(map[bundleDescriptorID][]*exec.Plan), - active: make(map[instructionID]*exec.Plan), - inactive: newCircleBuffer(), - metStore: make(map[instructionID]*metrics.Store), - failed: make(map[instructionID]error), - data: &DataChannelManager{}, - state: &StateChannelManager{}, - cache: &sideCache, + lookupDesc: lookupDesc, + descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor), + plans: make(map[bundleDescriptorID][]*exec.Plan), + active: make(map[instructionID]*exec.Plan), + awaitingFinalization: make(map[instructionID]awaitingFinalization), + inactive: newCircleBuffer(), + metStore: make(map[instructionID]*metrics.Store), + failed: make(map[instructionID]error), + data: &DataChannelManager{}, + state: &StateChannelManager{}, + cache: &sideCache, } // gRPC requires all readers of a stream be the same goroutine, so this goroutine @@ -222,11 +223,19 @@ func (c *circleBuffer) Contains(instID instructionID) bool { return ok } +type awaitingFinalization struct { + expiration time.Time + plan *exec.Plan + bdID bundleDescriptorID +} + type control struct { lookupDesc func(bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) descriptors map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor // protected by mu // plans that are candidates for execution. plans map[bundleDescriptorID][]*exec.Plan // protected by mu + // plans that are awaiting bundle finalization. + awaitingFinalization map[instructionID]awaitingFinalization //protected by mu // plans that are actively being executed. // a plan can only be in one of these maps at any time. active map[instructionID]*exec.Plan // protected by mu @@ -338,14 +347,36 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe c.cache.CompleteBundle(tokens...) mons, pylds := monitoring(plan, store) + requiresFinalization := false // Move the plan back to the candidate state c.mu.Lock() // Mark the instruction as failed. if err != nil { c.failed[instID] = err } else { - // Non failure plans can be re-used. - c.plans[bdID] = append(c.plans[bdID], plan) + // Non failure plans should either be moved to the finalized state + // or to plans so they can be re-used. + expiration := plan.GetExpirationTime(ctx, string(instID)) + if time.Now().Before(expiration) { + // TODO(damccorm) - we can be a little smarter about data structures here by + // by storing plans awaiting finalization in a heap. That way when we expire plans + // here its O(1) instead of O(n) (though adding/finalizing will still be O(logn)) + requiresFinalization = true + c.awaitingFinalization[instID] = awaitingFinalization{ + expiration: expiration, + plan: plan, + bdID: bdID, + } + // Move any plans that have exceeded their expiration back into the re-use pool + for id, af := range c.awaitingFinalization { + if time.Now().After(af.expiration) { + c.plans[af.bdID] = append(c.plans[af.bdID], af.plan) + delete(c.awaitingFinalization, id) + } + } + } else { + c.plans[bdID] = append(c.plans[bdID], plan) + } } delete(c.active, instID) if removed, ok := c.inactive.Insert(instID); ok { @@ -362,12 +393,38 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe InstructionId: string(instID), Response: &fnpb.InstructionResponse_ProcessBundle{ ProcessBundle: &fnpb.ProcessBundleResponse{ - MonitoringData: pylds, - MonitoringInfos: mons, + MonitoringData: pylds, + MonitoringInfos: mons, + RequiresFinalization: requiresFinalization, }, }, } + case req.GetFinalizeBundle() != nil: + msg := req.GetFinalizeBundle() + + ref := instructionID(msg.GetInstructionId()) + + af, ok := c.awaitingFinalization[ref] + if !ok { + return fail(ctx, instID, "finalize bundle failed for instruction %v: couldn't find plan in finalizing map", ref) + } + + if time.Now().Before(af.expiration) { + if err := af.plan.Finalize(ctx, string(instID)); err != nil { + return fail(ctx, instID, "finalize bundle failed for instruction %v using plan %v : %v", ref, af.bdID, err) + } + } + c.plans[af.bdID] = append(c.plans[af.bdID], af.plan) + delete(c.awaitingFinalization, ref) + + return &fnpb.InstructionResponse{ + InstructionId: string(instID), + Response: &fnpb.InstructionResponse_FinalizeBundle{ + FinalizeBundle: &fnpb.FinalizeBundleResponse{}, + }, + } + case req.GetProcessBundleProgress() != nil: msg := req.GetProcessBundleProgress() @@ -506,6 +563,12 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe func (c *control) getPlanOrResponse(ctx context.Context, kind string, instID, ref instructionID) (*exec.Plan, *metrics.Store, *fnpb.InstructionResponse) { c.mu.Lock() plan, ok := c.active[ref] + if !ok { + awaitingFinalization, ok := c.awaitingFinalization[ref] + if ok { + plan = awaitingFinalization.plan + } + } err := c.failed[ref] store := c.metStore[ref] defer c.mu.Unlock() diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go b/sdks/go/pkg/beam/runners/direct/buffer.go index 901cfe929af7..202f8792f0d8 100644 --- a/sdks/go/pkg/beam/runners/direct/buffer.go +++ b/sdks/go/pkg/beam/runners/direct/buffer.go @@ -18,6 +18,7 @@ package direct import ( "context" "fmt" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" @@ -60,6 +61,14 @@ func (n *buffer) FinishBundle(ctx context.Context) error { return n.notify(ctx) } +func (n *buffer) FinalizeBundle(ctx context.Context) error { + return nil +} + +func (n *buffer) GetBundleExpirationTime(ctx context.Context) time.Time { + return time.Now() +} + func (n *buffer) Down(ctx context.Context) error { return nil } @@ -163,7 +172,14 @@ func (w *wait) FinishBundle(ctx context.Context) error { } w.done = true return w.next.FinishBundle(ctx) +} + +func (w *wait) FinalizeBundle(ctx context.Context) error { + return w.next.FinalizeBundle(ctx) +} +func (w *wait) GetBundleExpirationTime(ctx context.Context) time.Time { + return w.next.GetBundleExpirationTime(ctx) } func (w *wait) Down(ctx context.Context) error { diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go b/sdks/go/pkg/beam/runners/direct/gbk.go index f1edb8129248..f743adbb95d2 100644 --- a/sdks/go/pkg/beam/runners/direct/gbk.go +++ b/sdks/go/pkg/beam/runners/direct/gbk.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sort" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" @@ -125,6 +126,14 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *CoGBK) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *CoGBK) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *CoGBK) mergeWindows() (map[typex.Window]int, error) { sort.Slice(n.wins, func(i int, j int) bool { return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp() @@ -213,6 +222,14 @@ func (n *Inject) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *Inject) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *Inject) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *Inject) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/runners/direct/impulse.go b/sdks/go/pkg/beam/runners/direct/impulse.go index 1d6f78d06200..bf71409d8de3 100644 --- a/sdks/go/pkg/beam/runners/direct/impulse.go +++ b/sdks/go/pkg/beam/runners/direct/impulse.go @@ -18,6 +18,7 @@ package direct import ( "context" "fmt" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" @@ -56,6 +57,14 @@ func (n *Impulse) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } +func (n *Impulse) FinalizeBundle(ctx context.Context) error { + return n.Out.FinalizeBundle(ctx) +} + +func (n *Impulse) GetBundleExpirationTime(ctx context.Context) time.Time { + return n.Out.GetBundleExpirationTime(ctx) +} + func (n *Impulse) Down(ctx context.Context) error { return nil } From 1b9abf211d0132d93c02647af7053774d8920a8d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 1 Mar 2022 16:29:23 -0500 Subject: [PATCH 02/10] Add testing --- sdks/go/pkg/beam/core/runtime/exec/fn.go | 1 + sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 59 +++++- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 3 + .../pkg/beam/core/runtime/exec/pardo_test.go | 190 ++++++++++++++++++ 4 files changed, 246 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index d69eb88919cb..6c1d6dee02d9 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -128,6 +128,7 @@ func newInvoker(fn *funcx.Fn) *invoker { // if n.bfIdx, ok = fn.BundleFinalization(); !ok { // n.bfIdx = -1 // } + n.bfIdx = -1 n.initCall() diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go index b7ab50108296..cd85e5843832 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go @@ -17,6 +17,7 @@ package exec import ( "context" + "errors" "fmt" "reflect" "testing" @@ -178,7 +179,7 @@ func TestInvoke(t *testing.T) { test.ExpectedTime = ts } - val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...) + val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...) if err != nil { t.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err) } @@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) { } } +func TestRegisterCallback(t *testing.T) { + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + testVar := 0 + bf.RegisterCallback(500*time.Minute, func() error { + testVar += 5 + return nil + }) + bf.RegisterCallback(2*time.Minute, func() error { + testVar = 25 + return nil + }) + callbackErr := errors.New("Callback error") + bf.RegisterCallback(2*time.Minute, func() error { + return callbackErr + }) + + // We can't do exact equality since this relies on real time, we'll give it a broad range + if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) { + t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback) + } + if got, want := len(bf.callbacks), 3; got != want { + t.Fatalf("RegisterCallback() called twice, got %v callbacks, want %v", got, want) + } + + if err := bf.callbacks[0].callback(); err != nil { + t.Errorf("RegisterCallback() first callback returned error %v, want nil", err) + } + if got, want := testVar, 5; got != want { + t.Errorf("RegisterCallback() first callback set testvar to %v, want %v", got, want) + } + if err := bf.callbacks[1].callback(); err != nil { + t.Errorf("RegisterCallback() second callback returned error %v, want nil", err) + } + if got, want := testVar, 25; got != want { + t.Errorf("RegisterCallback() second callback set testvar to %v, want %v", got, want) + } + if err := bf.callbacks[2].callback(); err != callbackErr { + t.Errorf("RegisterCallback() second callback returned error %v, want %v", err, callbackErr) + } +} + // Benchmarks // Invoke is implemented as a single use of a cached invoker, so a measure of @@ -314,7 +359,7 @@ func BenchmarkInvoke(b *testing.B) { ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond) b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b *testing.B) { for i := 0; i < b.N; i++ { - _, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...) + _, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...) if err != nil { b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err) } @@ -323,7 +368,7 @@ func BenchmarkInvoke(b *testing.B) { b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b *testing.B) { inv := newInvoker(fn) for i := 0; i < b.N; i++ { - _, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, test.Args...) + _, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, nil, test.Args...) if err != nil { b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err) } @@ -416,7 +461,7 @@ func BenchmarkInvokeCall(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}) + ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil) n = ret.Elm.(int) } b.Log(n) @@ -427,7 +472,7 @@ func BenchmarkInvokeCallExtra(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n) + ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n) n = ret.Elm.(int) } b.Log(n) @@ -453,7 +498,7 @@ func BenchmarkInvokeFnCall(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}) + ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil) n = ret.Elm.(int) } b.Log(n) @@ -464,7 +509,7 @@ func BenchmarkInvokeFnCallExtra(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n) + ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n) n = ret.Elm.(int) } b.Log(n) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 7a196af3ba1a..ff3b400f80fa 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -247,6 +247,7 @@ func (n *ParDo) FinalizeBundle(ctx context.Context) error { newFinalizer.lastValidCallback = n.bf.callbacks[idx].validUntil } } + n.bf = newFinalizer outErr := MultiFinalizeBundle(ctx, n.Out...) if len(failedIndices) > 0 { @@ -254,6 +255,8 @@ func (n *ParDo) FinalizeBundle(ctx context.Context) error { if outErr != nil { return errors.Wrap(err, outErr.Error()) } + + return err } return outErr } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go index f131f6f8ee70..b1a34e5f5e4e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go @@ -19,12 +19,14 @@ import ( "context" "fmt" "testing" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) func sumFn(n int, a int, b []int, c func(*int) bool, d func() func(*int) bool, e func(int)) int { @@ -98,6 +100,194 @@ func TestParDo(t *testing.T) { } } +func TestParDo_BundleFinalization(t *testing.T) { + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + testVar1 := 0 + testVar2 := 0 + bf.RegisterCallback(500*time.Minute, func() error { + testVar1 += 5 + return nil + }) + bf.RegisterCallback(2*time.Minute, func() error { + testVar2 = 25 + return nil + }) + + fn, err := graph.NewDoFn(sumFn) + if err != nil { + t.Fatalf("invalid function: %v", err) + } + g := graph.New() + nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + aN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + bN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + + edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil) + if err != nil { + t.Fatalf("invalid pardo: %v", err) + } + + out := &CaptureNode{UID: 1} + sum := &CaptureNode{UID: 2} + pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out, sum}, bf: bf, Side: []SideInputAdapter{ + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(1)}}, // a + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 3, 4)}}, // b + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 6)}}, // c + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 8, 9)}}, // d + }} + + // We can't do exact equality since this relies on real time, we'll give it a broad range + if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(600*time.Minute)) || got.Before(time.Now().Add(400*time.Minute)) { + t.Errorf("Before FinalizeBundle() getBundleExpirationTime=%v, want a time around 500 minutes from now", got) + } + if err := pardo.FinalizeBundle(context.Background()); err != nil { + t.Fatalf("FinalizeBundle() failed with error %v", err) + } + if got, want := testVar1, 5; got != want { + t.Errorf("After FinalizeBundle() testVar1=%v, want %v", got, want) + } + if got, want := testVar2, 25; got != want { + t.Errorf("After FinalizeBundle() testVar2=%v, want %v", got, want) + } + if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now()) { + t.Errorf("After FinalizeBundle() getBundleExpirationTime=%v, want a time before current time", got) + } +} + +func TestParDo_BundleFinalization_ExpiredCallback(t *testing.T) { + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + testVar1 := 0 + testVar2 := 0 + bf.RegisterCallback(-500*time.Minute, func() error { + testVar1 += 5 + return nil + }) + bf.RegisterCallback(200*time.Minute, func() error { + testVar2 = 25 + return nil + }) + + fn, err := graph.NewDoFn(sumFn) + if err != nil { + t.Fatalf("invalid function: %v", err) + } + g := graph.New() + nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + aN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + bN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + + edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil) + if err != nil { + t.Fatalf("invalid pardo: %v", err) + } + + out := &CaptureNode{UID: 1} + sum := &CaptureNode{UID: 2} + pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out, sum}, bf: bf, Side: []SideInputAdapter{ + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(1)}}, // a + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 3, 4)}}, // b + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 6)}}, // c + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 8, 9)}}, // d + }} + + // We can't do exact equality since this relies on real time, we'll give it a broad range + if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(300*time.Minute)) || got.Before(time.Now().Add(100*time.Minute)) { + t.Errorf("Before FinalizeBundle() getBundleExpirationTime=%v, want a time around 200 minutes from now", got) + } + if err := pardo.FinalizeBundle(context.Background()); err != nil { + t.Fatalf("FinalizeBundle() failed with error %v", err) + } + if got, want := testVar1, 0; got != want { + t.Errorf("After FinalizeBundle() testVar1=%v, want %v", got, want) + } + if got, want := testVar2, 25; got != want { + t.Errorf("After FinalizeBundle() testVar2=%v, want %v", got, want) + } + if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now()) { + t.Errorf("After FinalizeBundle() getBundleExpirationTime=%v, want a time before current time", got) + } +} + +func TestParDo_BundleFinalization_CallbackErrors(t *testing.T) { + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + testVar1 := 0 + testVar2 := 0 + errored := false + bf.RegisterCallback(500*time.Minute, func() error { + testVar1 += 5 + return nil + }) + bf.RegisterCallback(200*time.Minute, func() error { + if errored { + return nil + } + errored = true + return errors.New("Callback error") + }) + bf.RegisterCallback(2*time.Minute, func() error { + testVar2 = 25 + return nil + }) + + fn, err := graph.NewDoFn(sumFn) + if err != nil { + t.Fatalf("invalid function: %v", err) + } + g := graph.New() + nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + aN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + bN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) + + edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil) + if err != nil { + t.Fatalf("invalid pardo: %v", err) + } + + out := &CaptureNode{UID: 1} + sum := &CaptureNode{UID: 2} + pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out, sum}, bf: bf, Side: []SideInputAdapter{ + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(1)}}, // a + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 3, 4)}}, // b + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 6)}}, // c + &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 8, 9)}}, // d + }} + + // We can't do exact equality since this relies on real time, we'll give it a broad range + if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(600*time.Minute)) || got.Before(time.Now().Add(400*time.Minute)) { + t.Errorf("Before FinalizeBundle() getBundleExpirationTime=%v, want a time around 500 minutes from now", got) + } + if err := pardo.FinalizeBundle(context.Background()); err == nil { + t.Errorf("FinalizeBundle() succeeded, expected error") + } + if got, want := testVar1, 5; got != want { + t.Errorf("After FinalizeBundle() testVar1=%v, want %v", got, want) + } + if got, want := testVar2, 25; got != want { + t.Errorf("After FinalizeBundle() testVar2=%v, want %v", got, want) + } + if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(300*time.Minute)) || got.Before(time.Now().Add(100*time.Minute)) { + t.Errorf("After FinalizeBundle() getBundleExpirationTime=%v, want a time around 200 minutes from now", got) + } + if err := pardo.FinalizeBundle(context.Background()); err != nil { + t.Fatalf("FinalizeBundle() failed with error %v, expected to succeed the second time", err) + } +} + func emitSumFn(n int, emit func(int)) { emit(n + 1) } From ac4d3c2f571b9eed2ac5b7273459aaa42973fdf1 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 2 Mar 2022 07:09:12 -0500 Subject: [PATCH 03/10] Iterate over pardos directly --- sdks/go/pkg/beam/core/runtime/exec/cogbk.go | 17 ---------- sdks/go/pkg/beam/core/runtime/exec/combine.go | 17 ---------- .../beam/core/runtime/exec/combine_test.go | 9 ----- .../go/pkg/beam/core/runtime/exec/datasink.go | 9 ----- .../pkg/beam/core/runtime/exec/datasource.go | 8 ----- sdks/go/pkg/beam/core/runtime/exec/discard.go | 13 +------- sdks/go/pkg/beam/core/runtime/exec/flatten.go | 9 ----- .../pkg/beam/core/runtime/exec/multiplex.go | 9 ----- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 23 ++++++------- .../pkg/beam/core/runtime/exec/pcollection.go | 9 ----- sdks/go/pkg/beam/core/runtime/exec/plan.go | 18 ++++++---- .../pkg/beam/core/runtime/exec/reshuffle.go | 17 ---------- sdks/go/pkg/beam/core/runtime/exec/sdf.go | 33 ------------------- .../pkg/beam/core/runtime/exec/sideinput.go | 9 ----- sdks/go/pkg/beam/core/runtime/exec/unit.go | 12 ------- .../pkg/beam/core/runtime/exec/unit_test.go | 33 ------------------- sdks/go/pkg/beam/core/runtime/exec/util.go | 23 ------------- sdks/go/pkg/beam/core/runtime/exec/window.go | 8 ----- sdks/go/pkg/beam/runners/direct/buffer.go | 17 ---------- sdks/go/pkg/beam/runners/direct/gbk.go | 17 ---------- sdks/go/pkg/beam/runners/direct/impulse.go | 9 ----- 21 files changed, 23 insertions(+), 296 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go index 3581f5293c76..7107118db7c0 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go +++ b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go @@ -19,7 +19,6 @@ import ( "bytes" "context" "fmt" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -78,14 +77,6 @@ func (n *Inject) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *Inject) FinalizeBundle(ctx context.Context) error { - return n.Out.FinishBundle(ctx) -} - -func (n *Inject) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *Inject) Down(ctx context.Context) error { return nil } @@ -128,14 +119,6 @@ func (n *Expand) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *Expand) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *Expand) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *Expand) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go index d8350cb30fbe..692f3f464710 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go @@ -22,7 +22,6 @@ import ( "io" "path" "reflect" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -207,14 +206,6 @@ func (n *Combine) FinishBundle(ctx context.Context) error { return nil } -func (n *Combine) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *Combine) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - // Down runs the ParDo's TeardownFn. func (n *Combine) Down(ctx context.Context) error { if n.status == Down { @@ -426,14 +417,6 @@ func (n *LiftedCombine) FinishBundle(ctx context.Context) error { return n.Combine.FinishBundle(n.Combine.ctx) } -func (n *LiftedCombine) FinalizeBundle(ctx context.Context) error { - return n.Combine.FinalizeBundle(ctx) -} - -func (n *LiftedCombine) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Combine.GetBundleExpirationTime(ctx) -} - // Down tears down the cache. func (n *LiftedCombine) Down(ctx context.Context) error { if err := n.Combine.Down(ctx); err != nil { diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go index 56f75770e181..5797fbfa4c02 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go @@ -23,7 +23,6 @@ import ( "runtime" "strconv" "testing" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -620,14 +619,6 @@ func (n *simpleGBK) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *simpleGBK) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *simpleGBK) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *simpleGBK) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go index a4f8b4a39255..36f2a5195ca2 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "sync/atomic" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -98,14 +97,6 @@ func (n *DataSink) FinishBundle(ctx context.Context) error { return n.w.Close() } -func (m *DataSink) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (m *DataSink) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - // Down is a no-op. func (n *DataSink) Down(ctx context.Context) error { return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 16e068d5ae6e..2e8f25d41d31 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -268,14 +268,6 @@ func (n *DataSource) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *DataSource) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *DataSource) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - // Down resets the source. func (n *DataSource) Down(ctx context.Context) error { n.source = nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/discard.go b/sdks/go/pkg/beam/core/runtime/exec/discard.go index 3ee66dd7048f..ad1e81f24da4 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/discard.go +++ b/sdks/go/pkg/beam/core/runtime/exec/discard.go @@ -15,10 +15,7 @@ package exec -import ( - "context" - "time" -) +import "context" // Discard silently discard all elements. It is implicitly inserted for any // loose ends in the pipeline. @@ -47,14 +44,6 @@ func (d *Discard) FinishBundle(ctx context.Context) error { return nil } -func (d *Discard) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (d *Discard) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - func (d *Discard) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/flatten.go b/sdks/go/pkg/beam/core/runtime/exec/flatten.go index c8b522c415da..113bcabb9a62 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/flatten.go +++ b/sdks/go/pkg/beam/core/runtime/exec/flatten.go @@ -18,7 +18,6 @@ package exec import ( "context" "fmt" - "time" ) // Flatten is a fan-in node. It ensures that Start/FinishBundle are only @@ -67,14 +66,6 @@ func (m *Flatten) FinishBundle(ctx context.Context) error { return m.Out.FinishBundle(ctx) } -func (m *Flatten) FinalizeBundle(ctx context.Context) error { - return m.Out.FinalizeBundle(ctx) -} - -func (m *Flatten) GetBundleExpirationTime(ctx context.Context) time.Time { - return m.Out.GetBundleExpirationTime(ctx) -} - func (m *Flatten) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/multiplex.go b/sdks/go/pkg/beam/core/runtime/exec/multiplex.go index e7a8b7b815ad..8edb6c99d9ad 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/multiplex.go +++ b/sdks/go/pkg/beam/core/runtime/exec/multiplex.go @@ -18,7 +18,6 @@ package exec import ( "context" "fmt" - "time" ) // Multiplex is a fan-out node. It simply forwards any call to all downstream nodes. @@ -54,14 +53,6 @@ func (m *Multiplex) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, m.Out...) } -func (m *Multiplex) FinalizeBundle(ctx context.Context) error { - return MultiFinalizeBundle(ctx, m.Out...) -} - -func (m *Multiplex) GetBundleExpirationTime(ctx context.Context) time.Time { - return MultiGetBundleExpirationTime(ctx, m.Out...) -} - func (m *Multiplex) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index ff3b400f80fa..38e609a03011 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -226,6 +226,8 @@ func (n *ParDo) FinishBundle(_ context.Context) error { return nil } +// FinalizeBundle runs any non-expired user callbacks registered via the +// BundleFinalizer during function execution. func (n *ParDo) FinalizeBundle(ctx context.Context) error { failedIndices := []int{} for idx, bfc := range n.bf.callbacks { @@ -249,24 +251,19 @@ func (n *ParDo) FinalizeBundle(ctx context.Context) error { } n.bf = newFinalizer - outErr := MultiFinalizeBundle(ctx, n.Out...) if len(failedIndices) > 0 { - err := errors.Errorf("Pardo %v failed %v callbacks", n.Fn.Fn.String(), len(failedIndices)) - if outErr != nil { - return errors.Wrap(err, outErr.Error()) - } - - return err + return errors.Errorf("Pardo %v failed %v callbacks", n.Fn.Fn.String(), len(failedIndices)) } - return outErr + return nil } +// GetBundleExpirationTime gets the earliest time when it is safe to +// expire all of the bundle's finalization callbacks. If it returns a +// time earlier than the current time, that indicates that we are +// completely done with the bundle. If no callbacks are registered for the +// bundle, returns the current time. func (n *ParDo) GetBundleExpirationTime(ctx context.Context) time.Time { - outExp := MultiGetBundleExpirationTime(ctx, n.Out...) - if outExp.Before(n.bf.lastValidCallback) { - return n.bf.lastValidCallback - } - return outExp + return n.bf.lastValidCallback } // Down performs best-effort teardown of DoFn resources. (May not run.) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go index d7878668a85a..3b2e3ab3bf2c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go @@ -22,7 +22,6 @@ import ( "math/rand" "sync" "sync/atomic" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" ) @@ -128,14 +127,6 @@ func (p *PCollection) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, p.Out) } -func (p *PCollection) FinalizeBundle(ctx context.Context) error { - return MultiFinalizeBundle(ctx, p.Out) -} - -func (p *PCollection) GetBundleExpirationTime(ctx context.Context) time.Time { - return MultiGetBundleExpirationTime(ctx, p.Out) -} - // Down is a no-op. func (p *PCollection) Down(ctx context.Context) error { return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index cba88d17611f..21c086c57049 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -30,10 +30,11 @@ import ( // from a part of a pipeline. A plan can be used to process multiple bundles // serially. type Plan struct { - id string // id of the bundle descriptor for this plan - roots []Root - units []Unit - pcols []*PCollection + id string // id of the bundle descriptor for this plan + roots []Root + units []Unit + pcols []*PCollection + pardos []*ParDo status Status @@ -45,6 +46,7 @@ type Plan struct { func NewPlan(id string, units []Unit) (*Plan, error) { var roots []Root var pcols []*PCollection + var pardos []*ParDo var source *DataSource for _, u := range units { @@ -60,6 +62,9 @@ func NewPlan(id string, units []Unit) (*Plan, error) { if p, ok := u.(*PCollection); ok { pcols = append(pcols, p) } + if p, ok := u.(*ParDo); ok { + pardos = append(pardos, p) + } } if len(roots) == 0 { return nil, errors.Errorf("no root units") @@ -71,6 +76,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { roots: roots, units: units, pcols: pcols, + pardos: pardos, source: source, }, nil } @@ -138,7 +144,7 @@ func (p *Plan) Finalize(ctx context.Context, id string) error { } var err error p.status = Active - for _, root := range p.roots { + for _, root := range p.pardos { if rootErr := callNoPanic(ctx, root.FinalizeBundle); rootErr != nil { if err == nil { err = errors.Wrapf(rootErr, "while executing FinalizeBundle for %v", p) @@ -154,7 +160,7 @@ func (p *Plan) Finalize(ctx context.Context, id string) error { func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time { exp := time.Now() - for _, root := range p.roots { + for _, root := range p.pardos { rootExp := root.GetBundleExpirationTime(ctx) if exp.Before(rootExp) { exp = rootExp diff --git a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go index c79ae6e1c8f8..3a1900f6dc85 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go +++ b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "math/rand" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -80,14 +79,6 @@ func (n *ReshuffleInput) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, n.Out) } -func (n *ReshuffleInput) FinalizeBundle(ctx context.Context) error { - return MultiFinalizeBundle(ctx, n.Out) -} - -func (n *ReshuffleInput) GetBundleExpirationTime(ctx context.Context) time.Time { - return MultiGetBundleExpirationTime(ctx, n.Out) -} - // Down is a no-op. func (n *ReshuffleInput) Down(ctx context.Context) error { return nil @@ -167,14 +158,6 @@ func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error { return MultiFinishBundle(ctx, n.Out) } -func (n *ReshuffleOutput) FinalizeBundle(ctx context.Context) error { - return MultiFinalizeBundle(ctx, n.Out) -} - -func (n *ReshuffleOutput) GetBundleExpirationTime(ctx context.Context) time.Time { - return MultiGetBundleExpirationTime(ctx, n.Out) -} - // Down is a no-op. func (n *ReshuffleOutput) Down(ctx context.Context) error { return nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index 65d4c1e47206..3b14afd1323b 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "path" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" @@ -90,14 +89,6 @@ func (n *PairWithRestriction) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *PairWithRestriction) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *PairWithRestriction) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - // Down currently does nothing. func (n *PairWithRestriction) Down(_ context.Context) error { return nil @@ -210,14 +201,6 @@ func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *SplitAndSizeRestrictions) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *SplitAndSizeRestrictions) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - // Down currently does nothing. func (n *SplitAndSizeRestrictions) Down(_ context.Context) error { return nil @@ -396,14 +379,6 @@ func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) return n.PDo.FinishBundle(ctx) } -func (n *ProcessSizedElementsAndRestrictions) FinalizeBundle(ctx context.Context) error { - return n.PDo.FinalizeBundle(ctx) -} - -func (n *ProcessSizedElementsAndRestrictions) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.PDo.GetBundleExpirationTime(ctx) -} - // Down calls the ParDo's Down method. func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error { return n.PDo.Down(ctx) @@ -770,14 +745,6 @@ func (n *SdfFallback) FinishBundle(ctx context.Context) error { return n.PDo.FinishBundle(ctx) } -func (n *SdfFallback) FinalizeBundle(ctx context.Context) error { - return n.PDo.FinalizeBundle(ctx) -} - -func (n *SdfFallback) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.PDo.GetBundleExpirationTime(ctx) -} - // Down calls the ParDo's Down method. func (n *SdfFallback) Down(ctx context.Context) error { return n.PDo.Down(ctx) diff --git a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go index 4c23e0697823..0d26c315d3b0 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "io" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" @@ -206,14 +205,6 @@ func (n *FixedKey) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *FixedKey) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *FixedKey) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *FixedKey) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit.go b/sdks/go/pkg/beam/core/runtime/exec/unit.go index 1a427ed6f983..04635086ab12 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit.go @@ -17,7 +17,6 @@ package exec import ( "context" - "time" ) // UnitID is a unit identifier. Used for debugging. @@ -43,17 +42,6 @@ type Unit interface { // data connections must be closed. FinishBundle(ctx context.Context) error - // FinalizeBundle runs any non-expired user callbacks registered via the - // BundleFinalizer during function execution. - FinalizeBundle(ctx context.Context) error - - // GetBundleExpirationTime gets the earliest time when it is safe to - // expire all of the bundle's finalization callbacks. If it returns a - // time earlier than the current time, that indicates that we are - // completely done with the bundle. If no callbacks are registered for the - // bundle, returns the current time. - GetBundleExpirationTime(ctx context.Context) time.Time - // Down tears down the processing node. It is notably called if the unit // or plan encounters an error and must thus robustly handle cleanup of // unfinished bundles. If a unit itself (as opposed to downstream units) diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go index a7ada0d59c18..efb9edcdbce6 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go @@ -18,7 +18,6 @@ package exec import ( "context" "io" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -70,14 +69,6 @@ func (n *CaptureNode) FinishBundle(ctx context.Context) error { return nil } -func (n *CaptureNode) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (n *CaptureNode) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - func (n *CaptureNode) Down(ctx context.Context) error { if n.status != Up { return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) @@ -159,14 +150,6 @@ func (n *FixedRoot) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *FixedRoot) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (n *FixedRoot) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - func (n *FixedRoot) Down(ctx context.Context) error { return nil } @@ -216,14 +199,6 @@ func (n *BenchRoot) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *BenchRoot) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (n *BenchRoot) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - func (n *BenchRoot) Down(ctx context.Context) error { return nil } @@ -278,14 +253,6 @@ func (n *BlockingNode) FinishBundle(ctx context.Context) error { return err } -func (n *BlockingNode) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (n *BlockingNode) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - func (n *BlockingNode) Down(ctx context.Context) error { if n.status != Up { return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status) diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go index 4270c775e406..019b19070570 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/util.go +++ b/sdks/go/pkg/beam/core/runtime/exec/util.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "runtime/debug" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -83,28 +82,6 @@ func MultiFinishBundle(ctx context.Context, list ...Node) error { return nil } -// MultiFinalizeBundle calls FinalizeBundle on multiple nodes. Convenience function. -func MultiFinalizeBundle(ctx context.Context, list ...Node) error { - for _, n := range list { - if err := n.FinalizeBundle(ctx); err != nil { - return err - } - } - return nil -} - -// MultiGetBundleExpirationTime calls GetBundleExpirationTime on multiple nodes. Convenience function. -func MultiGetBundleExpirationTime(ctx context.Context, list ...Node) time.Time { - exp := time.Now() - for _, n := range list { - exp2 := n.GetBundleExpirationTime(ctx) - if exp.Before(exp2) { - exp = exp2 - } - } - return exp -} - // IDs returns the unit IDs of the given nodes. func IDs(list ...Node) []UnitID { var ret []UnitID diff --git a/sdks/go/pkg/beam/core/runtime/exec/window.go b/sdks/go/pkg/beam/core/runtime/exec/window.go index e929027aaadb..02640f5dcfad 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/window.go +++ b/sdks/go/pkg/beam/core/runtime/exec/window.go @@ -89,14 +89,6 @@ func (w *WindowInto) FinishBundle(ctx context.Context) error { return w.Out.FinishBundle(ctx) } -func (w *WindowInto) FinalizeBundle(ctx context.Context) error { - return w.Out.FinalizeBundle(ctx) -} - -func (w *WindowInto) GetBundleExpirationTime(ctx context.Context) time.Time { - return w.Out.GetBundleExpirationTime(ctx) -} - func (w *WindowInto) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go b/sdks/go/pkg/beam/runners/direct/buffer.go index 202f8792f0d8..8e92ff72d0b5 100644 --- a/sdks/go/pkg/beam/runners/direct/buffer.go +++ b/sdks/go/pkg/beam/runners/direct/buffer.go @@ -18,7 +18,6 @@ package direct import ( "context" "fmt" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" @@ -61,14 +60,6 @@ func (n *buffer) FinishBundle(ctx context.Context) error { return n.notify(ctx) } -func (n *buffer) FinalizeBundle(ctx context.Context) error { - return nil -} - -func (n *buffer) GetBundleExpirationTime(ctx context.Context) time.Time { - return time.Now() -} - func (n *buffer) Down(ctx context.Context) error { return nil } @@ -174,14 +165,6 @@ func (w *wait) FinishBundle(ctx context.Context) error { return w.next.FinishBundle(ctx) } -func (w *wait) FinalizeBundle(ctx context.Context) error { - return w.next.FinalizeBundle(ctx) -} - -func (w *wait) GetBundleExpirationTime(ctx context.Context) time.Time { - return w.next.GetBundleExpirationTime(ctx) -} - func (w *wait) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go b/sdks/go/pkg/beam/runners/direct/gbk.go index f743adbb95d2..f1edb8129248 100644 --- a/sdks/go/pkg/beam/runners/direct/gbk.go +++ b/sdks/go/pkg/beam/runners/direct/gbk.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "sort" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" @@ -126,14 +125,6 @@ func (n *CoGBK) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *CoGBK) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *CoGBK) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *CoGBK) mergeWindows() (map[typex.Window]int, error) { sort.Slice(n.wins, func(i int, j int) bool { return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp() @@ -222,14 +213,6 @@ func (n *Inject) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *Inject) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *Inject) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *Inject) Down(ctx context.Context) error { return nil } diff --git a/sdks/go/pkg/beam/runners/direct/impulse.go b/sdks/go/pkg/beam/runners/direct/impulse.go index bf71409d8de3..1d6f78d06200 100644 --- a/sdks/go/pkg/beam/runners/direct/impulse.go +++ b/sdks/go/pkg/beam/runners/direct/impulse.go @@ -18,7 +18,6 @@ package direct import ( "context" "fmt" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" @@ -57,14 +56,6 @@ func (n *Impulse) FinishBundle(ctx context.Context) error { return n.Out.FinishBundle(ctx) } -func (n *Impulse) FinalizeBundle(ctx context.Context) error { - return n.Out.FinalizeBundle(ctx) -} - -func (n *Impulse) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.Out.GetBundleExpirationTime(ctx) -} - func (n *Impulse) Down(ctx context.Context) error { return nil } From c780f594237da99e0866d47a2d8d705bad001502 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 2 Mar 2022 08:02:28 -0500 Subject: [PATCH 04/10] Track bundlefinalizer in plan.go not pardo --- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 51 ++---------------- sdks/go/pkg/beam/core/runtime/exec/plan.go | 58 ++++++++++++--------- 2 files changed, 35 insertions(+), 74 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 38e609a03011..aefcd57e0ed3 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "path" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" @@ -43,7 +42,7 @@ type ParDo struct { emitters []ReusableEmitter ctx context.Context inv *invoker - bf bundleFinalizer + bf *bundleFinalizer reader StateReader cache *cacheElm @@ -78,10 +77,6 @@ func (n *ParDo) Up(ctx context.Context) error { } n.status = Up n.inv = newInvoker(n.Fn.ProcessElementFn()) - n.bf = bundleFinalizer{ - callbacks: []bundleFinalizationCallback{}, - lastValidCallback: time.Now(), - } n.states = metrics.NewPTransformState(n.PID) @@ -226,46 +221,6 @@ func (n *ParDo) FinishBundle(_ context.Context) error { return nil } -// FinalizeBundle runs any non-expired user callbacks registered via the -// BundleFinalizer during function execution. -func (n *ParDo) FinalizeBundle(ctx context.Context) error { - failedIndices := []int{} - for idx, bfc := range n.bf.callbacks { - if time.Now().Before(bfc.validUntil) { - if err := bfc.callback(); err != nil { - failedIndices = append(failedIndices, idx) - } - } - } - - newFinalizer := bundleFinalizer{ - callbacks: []bundleFinalizationCallback{}, - lastValidCallback: time.Now(), - } - - for _, idx := range failedIndices { - newFinalizer.callbacks = append(newFinalizer.callbacks, n.bf.callbacks[idx]) - if newFinalizer.lastValidCallback.Before(n.bf.callbacks[idx].validUntil) { - newFinalizer.lastValidCallback = n.bf.callbacks[idx].validUntil - } - } - n.bf = newFinalizer - - if len(failedIndices) > 0 { - return errors.Errorf("Pardo %v failed %v callbacks", n.Fn.Fn.String(), len(failedIndices)) - } - return nil -} - -// GetBundleExpirationTime gets the earliest time when it is safe to -// expire all of the bundle's finalization callbacks. If it returns a -// time earlier than the current time, that indicates that we are -// completely done with the bundle. If no callbacks are registered for the -// bundle, returns the current time. -func (n *ParDo) GetBundleExpirationTime(ctx context.Context) time.Time { - return n.bf.lastValidCallback -} - // Down performs best-effort teardown of DoFn resources. (May not run.) func (n *ParDo) Down(ctx context.Context) error { if n.status == Down { @@ -341,7 +296,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex. if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = Invoke(ctx, pn, ws, ts, fn, opt, &n.bf, n.cache.extra...) + val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.bf, n.cache.extra...) if err != nil { return nil, err } @@ -359,7 +314,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, &n.bf, n.cache.extra...) + val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.cache.extra...) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 21c086c57049..e50961dec20d 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -30,11 +30,11 @@ import ( // from a part of a pipeline. A plan can be used to process multiple bundles // serially. type Plan struct { - id string // id of the bundle descriptor for this plan - roots []Root - units []Unit - pcols []*PCollection - pardos []*ParDo + id string // id of the bundle descriptor for this plan + roots []Root + units []Unit + pcols []*PCollection + bf bundleFinalizer status Status @@ -46,8 +46,11 @@ type Plan struct { func NewPlan(id string, units []Unit) (*Plan, error) { var roots []Root var pcols []*PCollection - var pardos []*ParDo var source *DataSource + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } for _, u := range units { if u == nil { @@ -63,7 +66,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { pcols = append(pcols, p) } if p, ok := u.(*ParDo); ok { - pardos = append(pardos, p) + p.bf = &bf } } if len(roots) == 0 { @@ -76,7 +79,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { roots: roots, units: units, pcols: pcols, - pardos: pardos, + bf: bf, source: source, }, nil } @@ -142,32 +145,35 @@ func (p *Plan) Finalize(ctx context.Context, id string) error { if p.status != Up { return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) } - var err error - p.status = Active - for _, root := range p.pardos { - if rootErr := callNoPanic(ctx, root.FinalizeBundle); rootErr != nil { - if err == nil { - err = errors.Wrapf(rootErr, "while executing FinalizeBundle for %v", p) - } else { - err = errors.Wrap(rootErr, err.Error()) + failedIndices := []int{} + for idx, bfc := range p.bf.callbacks { + if time.Now().Before(bfc.validUntil) { + if err := bfc.callback(); err != nil { + failedIndices = append(failedIndices, idx) } } } - p.status = Up - return err -} + newFinalizer := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } -func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time { - exp := time.Now() - for _, root := range p.pardos { - rootExp := root.GetBundleExpirationTime(ctx) - if exp.Before(rootExp) { - exp = rootExp + for _, idx := range failedIndices { + newFinalizer.callbacks = append(newFinalizer.callbacks, p.bf.callbacks[idx]) + if newFinalizer.lastValidCallback.Before(p.bf.callbacks[idx].validUntil) { + newFinalizer.lastValidCallback = p.bf.callbacks[idx].validUntil } } - return exp + if len(failedIndices) > 0 { + return errors.Errorf("Plan %v failed %v callbacks", p.ID(), len(failedIndices)) + } + return nil +} + +func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time { + return p.bf.lastValidCallback } // Down takes the plan and associated units down. Does not panic. From 64f40fc20bf7a0e1e5feb99226d1fa3aa8b7b355 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 2 Mar 2022 08:06:04 -0500 Subject: [PATCH 05/10] Remove outdated test --- .../pkg/beam/core/runtime/exec/pardo_test.go | 190 ------------------ 1 file changed, 190 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go index b1a34e5f5e4e..f131f6f8ee70 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go @@ -19,14 +19,12 @@ import ( "context" "fmt" "testing" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) func sumFn(n int, a int, b []int, c func(*int) bool, d func() func(*int) bool, e func(int)) int { @@ -100,194 +98,6 @@ func TestParDo(t *testing.T) { } } -func TestParDo_BundleFinalization(t *testing.T) { - bf := bundleFinalizer{ - callbacks: []bundleFinalizationCallback{}, - lastValidCallback: time.Now(), - } - testVar1 := 0 - testVar2 := 0 - bf.RegisterCallback(500*time.Minute, func() error { - testVar1 += 5 - return nil - }) - bf.RegisterCallback(2*time.Minute, func() error { - testVar2 = 25 - return nil - }) - - fn, err := graph.NewDoFn(sumFn) - if err != nil { - t.Fatalf("invalid function: %v", err) - } - g := graph.New() - nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - aN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - bN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - - edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil) - if err != nil { - t.Fatalf("invalid pardo: %v", err) - } - - out := &CaptureNode{UID: 1} - sum := &CaptureNode{UID: 2} - pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out, sum}, bf: bf, Side: []SideInputAdapter{ - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(1)}}, // a - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 3, 4)}}, // b - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 6)}}, // c - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 8, 9)}}, // d - }} - - // We can't do exact equality since this relies on real time, we'll give it a broad range - if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(600*time.Minute)) || got.Before(time.Now().Add(400*time.Minute)) { - t.Errorf("Before FinalizeBundle() getBundleExpirationTime=%v, want a time around 500 minutes from now", got) - } - if err := pardo.FinalizeBundle(context.Background()); err != nil { - t.Fatalf("FinalizeBundle() failed with error %v", err) - } - if got, want := testVar1, 5; got != want { - t.Errorf("After FinalizeBundle() testVar1=%v, want %v", got, want) - } - if got, want := testVar2, 25; got != want { - t.Errorf("After FinalizeBundle() testVar2=%v, want %v", got, want) - } - if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now()) { - t.Errorf("After FinalizeBundle() getBundleExpirationTime=%v, want a time before current time", got) - } -} - -func TestParDo_BundleFinalization_ExpiredCallback(t *testing.T) { - bf := bundleFinalizer{ - callbacks: []bundleFinalizationCallback{}, - lastValidCallback: time.Now(), - } - testVar1 := 0 - testVar2 := 0 - bf.RegisterCallback(-500*time.Minute, func() error { - testVar1 += 5 - return nil - }) - bf.RegisterCallback(200*time.Minute, func() error { - testVar2 = 25 - return nil - }) - - fn, err := graph.NewDoFn(sumFn) - if err != nil { - t.Fatalf("invalid function: %v", err) - } - g := graph.New() - nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - aN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - bN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - - edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil) - if err != nil { - t.Fatalf("invalid pardo: %v", err) - } - - out := &CaptureNode{UID: 1} - sum := &CaptureNode{UID: 2} - pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out, sum}, bf: bf, Side: []SideInputAdapter{ - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(1)}}, // a - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 3, 4)}}, // b - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 6)}}, // c - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 8, 9)}}, // d - }} - - // We can't do exact equality since this relies on real time, we'll give it a broad range - if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(300*time.Minute)) || got.Before(time.Now().Add(100*time.Minute)) { - t.Errorf("Before FinalizeBundle() getBundleExpirationTime=%v, want a time around 200 minutes from now", got) - } - if err := pardo.FinalizeBundle(context.Background()); err != nil { - t.Fatalf("FinalizeBundle() failed with error %v", err) - } - if got, want := testVar1, 0; got != want { - t.Errorf("After FinalizeBundle() testVar1=%v, want %v", got, want) - } - if got, want := testVar2, 25; got != want { - t.Errorf("After FinalizeBundle() testVar2=%v, want %v", got, want) - } - if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now()) { - t.Errorf("After FinalizeBundle() getBundleExpirationTime=%v, want a time before current time", got) - } -} - -func TestParDo_BundleFinalization_CallbackErrors(t *testing.T) { - bf := bundleFinalizer{ - callbacks: []bundleFinalizationCallback{}, - lastValidCallback: time.Now(), - } - testVar1 := 0 - testVar2 := 0 - errored := false - bf.RegisterCallback(500*time.Minute, func() error { - testVar1 += 5 - return nil - }) - bf.RegisterCallback(200*time.Minute, func() error { - if errored { - return nil - } - errored = true - return errors.New("Callback error") - }) - bf.RegisterCallback(2*time.Minute, func() error { - testVar2 = 25 - return nil - }) - - fn, err := graph.NewDoFn(sumFn) - if err != nil { - t.Fatalf("invalid function: %v", err) - } - g := graph.New() - nN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - aN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - bN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - cN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - dN := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true) - - edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN, aN, bN, cN, dN}, nil, nil) - if err != nil { - t.Fatalf("invalid pardo: %v", err) - } - - out := &CaptureNode{UID: 1} - sum := &CaptureNode{UID: 2} - pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out, sum}, bf: bf, Side: []SideInputAdapter{ - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(1)}}, // a - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 3, 4)}}, // b - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 6)}}, // c - &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 8, 9)}}, // d - }} - - // We can't do exact equality since this relies on real time, we'll give it a broad range - if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(600*time.Minute)) || got.Before(time.Now().Add(400*time.Minute)) { - t.Errorf("Before FinalizeBundle() getBundleExpirationTime=%v, want a time around 500 minutes from now", got) - } - if err := pardo.FinalizeBundle(context.Background()); err == nil { - t.Errorf("FinalizeBundle() succeeded, expected error") - } - if got, want := testVar1, 5; got != want { - t.Errorf("After FinalizeBundle() testVar1=%v, want %v", got, want) - } - if got, want := testVar2, 25; got != want { - t.Errorf("After FinalizeBundle() testVar2=%v, want %v", got, want) - } - if got := pardo.GetBundleExpirationTime(context.Background()); got.After(time.Now().Add(300*time.Minute)) || got.Before(time.Now().Add(100*time.Minute)) { - t.Errorf("After FinalizeBundle() getBundleExpirationTime=%v, want a time around 200 minutes from now", got) - } - if err := pardo.FinalizeBundle(context.Background()); err != nil { - t.Fatalf("FinalizeBundle() failed with error %v, expected to succeed the second time", err) - } -} - func emitSumFn(n int, emit func(int)) { emit(n + 1) } From dc5029d19dd4ceb265060ac7c274823196910b2e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 2 Mar 2022 08:11:32 -0500 Subject: [PATCH 06/10] Fix pointer issue --- sdks/go/pkg/beam/core/runtime/exec/plan.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index e50961dec20d..87e8f74db834 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -34,7 +34,7 @@ type Plan struct { roots []Root units []Unit pcols []*PCollection - bf bundleFinalizer + bf *bundleFinalizer status Status @@ -79,7 +79,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { roots: roots, units: units, pcols: pcols, - bf: bf, + bf: &bf, source: source, }, nil } From dbd0cb139697d65004381d0d5e51c3b88aea2651 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 3 Mar 2022 09:04:14 -0500 Subject: [PATCH 07/10] Update todos to reference jiras --- sdks/go/pkg/beam/core/runtime/exec/fn.go | 2 +- sdks/go/pkg/beam/core/runtime/harness/harness.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index 6c1d6dee02d9..5f90bdf1d774 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -124,7 +124,7 @@ func newInvoker(fn *funcx.Fn) *invoker { if n.outErrIdx, ok = fn.Error(); !ok { n.outErrIdx = -1 } - // TODO(@damccorm) - add this back in once BundleFinalization is implemented + // TODO(BEAM-10976) - add this back in once BundleFinalization is implemented // if n.bfIdx, ok = fn.BundleFinalization(); !ok { // n.bfIdx = -1 // } diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 87935dbf6324..0d5ee5610af0 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -358,7 +358,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe // or to plans so they can be re-used. expiration := plan.GetExpirationTime(ctx, string(instID)) if time.Now().Before(expiration) { - // TODO(damccorm) - we can be a little smarter about data structures here by + // TODO(BEAM-10976) - we can be a little smarter about data structures here by // by storing plans awaiting finalization in a heap. That way when we expire plans // here its O(1) instead of O(n) (though adding/finalizing will still be O(logn)) requiresFinalization = true From e7dfa56fc9ce2f30ce344bb0c41c19875f7eab40 Mon Sep 17 00:00:00 2001 From: github-actions Date: Mon, 7 Mar 2022 09:52:57 -0500 Subject: [PATCH 08/10] Cleanup from feedback --- sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 19 +++++++++++-------- sdks/go/pkg/beam/core/runtime/exec/plan.go | 7 +++++-- .../pkg/beam/core/runtime/harness/harness.go | 9 +++++---- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go index cd85e5843832..b4db872395c6 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go @@ -220,23 +220,26 @@ func TestRegisterCallback(t *testing.T) { t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback) } if got, want := len(bf.callbacks), 3; got != want { - t.Fatalf("RegisterCallback() called twice, got %v callbacks, want %v", got, want) + t.Fatalf("Callbacks in bundleFinalizer does not match number of calls to RegisterCallback(), got %v callbacks, want %v", got, want) } - if err := bf.callbacks[0].callback(); err != nil { - t.Errorf("RegisterCallback() first callback returned error %v, want nil", err) + callbackIdx := 0 + if err := bf.callbacks[callbackIdx].callback(); err != nil { + t.Errorf("RegisterCallback() callback at index %v returned unexpected error: %v", callbackIdx, err) } if got, want := testVar, 5; got != want { - t.Errorf("RegisterCallback() first callback set testvar to %v, want %v", got, want) + t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want) } - if err := bf.callbacks[1].callback(); err != nil { - t.Errorf("RegisterCallback() second callback returned error %v, want nil", err) + callbackIdx = 1 + if err := bf.callbacks[callbackIdx].callback(); err != nil { + t.Errorf("RegisterCallback() callback at index %v returned error %v, want nil", callbackIdx, err) } if got, want := testVar, 25; got != want { - t.Errorf("RegisterCallback() second callback set testvar to %v, want %v", got, want) + t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want) } + callbackIdx = 2 if err := bf.callbacks[2].callback(); err != callbackErr { - t.Errorf("RegisterCallback() second callback returned error %v, want %v", err, callbackErr) + t.Errorf("RegisterCallback() callback at index %v returned error %v, want %v", callbackIdx, err, callbackErr) } } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 87e8f74db834..cd13d9072516 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -141,7 +141,8 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro return nil } -func (p *Plan) Finalize(ctx context.Context, id string) error { +// Runs any callbacks registered by the bundleFinalizer. Should be run on bundle finalization. +func (p *Plan) Finalize() error { if p.status != Up { return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) } @@ -166,13 +167,15 @@ func (p *Plan) Finalize(ctx context.Context, id string) error { } } + p.bf = &newFinalizer + if len(failedIndices) > 0 { return errors.Errorf("Plan %v failed %v callbacks", p.ID(), len(failedIndices)) } return nil } -func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time { +func (p *Plan) GetExpirationTime() time.Time { return p.bf.lastValidCallback } diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 0d5ee5610af0..5b861f15188a 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -356,7 +356,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } else { // Non failure plans should either be moved to the finalized state // or to plans so they can be re-used. - expiration := plan.GetExpirationTime(ctx, string(instID)) + expiration := plan.GetExpirationTime() if time.Now().Before(expiration) { // TODO(BEAM-10976) - we can be a little smarter about data structures here by // by storing plans awaiting finalization in a heap. That way when we expire plans @@ -411,7 +411,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } if time.Now().Before(af.expiration) { - if err := af.plan.Finalize(ctx, string(instID)); err != nil { + if err := af.plan.Finalize(); err != nil { return fail(ctx, instID, "finalize bundle failed for instruction %v using plan %v : %v", ref, af.bdID, err) } } @@ -564,9 +564,10 @@ func (c *control) getPlanOrResponse(ctx context.Context, kind string, instID, re c.mu.Lock() plan, ok := c.active[ref] if !ok { - awaitingFinalization, ok := c.awaitingFinalization[ref] + var af awaitingFinalization + af, ok = c.awaitingFinalization[ref] if ok { - plan = awaitingFinalization.plan + plan = af.plan } } err := c.failed[ref] From 3e4a89d78f8d13f6983a44d821148820288ea552 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 8 Mar 2022 08:35:23 -0500 Subject: [PATCH 09/10] Doc nit Co-authored-by: Daniel Oliveira --- sdks/go/pkg/beam/core/runtime/exec/plan.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index cd13d9072516..4a66907ff28c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -141,7 +141,7 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro return nil } -// Runs any callbacks registered by the bundleFinalizer. Should be run on bundle finalization. +// Finalize runs any callbacks registered by the bundleFinalizer. Should be run on bundle finalization. func (p *Plan) Finalize() error { if p.status != Up { return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) From 36e1450618fb8465832713d98c84bfb0c7360580 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 8 Mar 2022 08:38:04 -0500 Subject: [PATCH 10/10] GetExpirationTime comment --- sdks/go/pkg/beam/core/runtime/exec/plan.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 4a66907ff28c..abda426f639c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -175,6 +175,8 @@ func (p *Plan) Finalize() error { return nil } +// GetExpirationTime returns the last expiration time of any of the callbacks registered by the bundleFinalizer. +// Once we have passed this time, it is safe to move this plan to inactive without missing any valid callbacks. func (p *Plan) GetExpirationTime() time.Time { return p.bf.lastValidCallback }