diff --git a/sdks/go/examples/timer_wordcap/wordcap.go b/sdks/go/examples/timer_wordcap/wordcap.go new file mode 100644 index 000000000000..4d1faac50396 --- /dev/null +++ b/sdks/go/examples/timer_wordcap/wordcap.go @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// timer_wordcap is a toy streaming pipeline that demonstrates the use of State and Timers. +// Periodic Impulse is used as a streaming source that produces sequence of elements upto 5 minutes +// from the start of the pipeline every 5 seconds. These elements are keyed and fed to the Stateful DoFn +// where state and timers are set and cleared. Since this pipeline uses a Periodic Impulse, +// the pipeline is terminated automatically after it is done producing elements for 5 minutes. +package main + +import ( + "bytes" + "context" + "encoding/binary" + "flag" + "fmt" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" +) + +type Stateful struct { + ElementBag state.Bag[string] + TimerTime state.Value[int64] + MinTime state.Combining[int64, int64, int64] + + OutputState timers.ProcessingTime +} + +func NewStateful() *Stateful { + return &Stateful{ + ElementBag: state.MakeBagState[string]("elementBag"), + TimerTime: state.MakeValueState[int64]("timerTime"), + MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 { + if a < b { + return a + } + return b + }), + + OutputState: timers.InProcessingTime("outputState"), + } +} + +func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) { + switch timerKey { + case "outputState": + log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key) + s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag("1")) + switch timerTag { + case "1": + s.OutputState.Clear(tp) + log.Infof(ctx, "Timer with tag 1 fired on outputState stateful DoFn.") + emit(timerKey, timerTag) + } + } +} + +func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error { + s.ElementBag.Add(sp, word) + s.MinTime.Add(sp, int64(ts)) + + toFire, ok, err := s.TimerTime.Read(sp) + if err != nil { + return err + } + if !ok { + toFire = int64(mtime.Now().Add(1 * time.Minute)) + } + minTime, _, err := s.MinTime.Read(sp) + if err != nil { + return err + } + + s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime)), timers.WithTag(word)) + s.TimerTime.Write(sp, toFire) + + return nil +} + +func init() { + register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(string, string), error](&Stateful{}) + register.Emitter2[string, string]() + register.Emitter2[beam.EventTime, int64]() +} + +func main() { + flag.Parse() + beam.Init() + + ctx := context.Background() + + p := beam.NewPipeline() + s := p.Root() + + out := periodic.Impulse(s, time.Now(), time.Now().Add(5*time.Minute), 5*time.Second, true) + + intOut := beam.ParDo(s, func(b []byte) int64 { + var val int64 + buf := bytes.NewReader(b) + binary.Read(buf, binary.BigEndian, &val) + return val + }, out) + + str := beam.ParDo(s, func(b int64) string { + return fmt.Sprintf("%03d", b) + }, intOut) + + keyed := beam.ParDo(s, func(ctx context.Context, ts beam.EventTime, s string) (string, string) { + return "test", s + }, str) + + timed := beam.ParDo(s, NewStateful(), keyed) + debug.Printf(s, "post stateful: %v", timed) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go index b579cb56d521..23c2dd0e8b2f 100644 --- a/sdks/go/pkg/beam/core/funcx/fn.go +++ b/sdks/go/pkg/beam/core/funcx/fn.go @@ -21,6 +21,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -85,6 +86,8 @@ const ( FnWatermarkEstimator FnParamKind = 0x1000 // FnStateProvider indicates a function input parameter that implements state.Provider FnStateProvider FnParamKind = 0x2000 + // FnTimerProvider indicates a function input parameter that implements timer.Provider + FnTimerProvider FnParamKind = 0x4000 ) func (k FnParamKind) String() string { @@ -117,6 +120,8 @@ func (k FnParamKind) String() string { return "WatermarkEstimator" case FnStateProvider: return "StateProvider" + case FnTimerProvider: + return "TimerProvider" default: return fmt.Sprintf("%v", int(k)) } @@ -305,6 +310,17 @@ func (u *Fn) StateProvider() (pos int, exists bool) { return -1, false } +// TimerProvider returns (index, true) iff the function expects a +// parameter that implements timers.Provider. +func (u *Fn) TimerProvider() (pos int, exists bool) { + for i, p := range u.Param { + if p.Kind == FnTimerProvider { + return i, true + } + } + return -1, false +} + // WatermarkEstimator returns (index, true) iff the function expects a // parameter that implements sdf.WatermarkEstimator. func (u *Fn) WatermarkEstimator() (pos int, exists bool) { @@ -392,6 +408,8 @@ func New(fn reflectx.Func) (*Fn, error) { kind = FnBundleFinalization case t == state.ProviderType: kind = FnStateProvider + case t == timers.ProviderType: + kind = FnTimerProvider case t == reflectx.Type: kind = FnType case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()): @@ -482,7 +500,7 @@ func SubReturns(list []ReturnParam, indices ...int) []ReturnParam { } // The order of present parameters and return values must be as follows: -// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?) +// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, FnTimerProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?) // // where ? indicates 0 or 1, and * indicates any number. // and a SideInput is one of FnValue or FnIter or FnReIter @@ -517,6 +535,7 @@ var ( errRTrackerPrecedence = errors.New("may only have a single sdf.RTracker parameter and it must precede the main input parameter") errBundleFinalizationPrecedence = errors.New("may only have a single BundleFinalization parameter and it must precede the main input parameter") errStateProviderPrecedence = errors.New("may only have a single state.Provider parameter and it must precede the main input parameter") + errTimerProviderPrecedence = errors.New("may only have a single timer.Provider parameter and it must precede the main input parameter") errInputPrecedence = errors.New("inputs parameters must precede emit function parameters") ) @@ -535,6 +554,7 @@ const ( psRTracker psBundleFinalization psStateProvider + psTimerProvider ) func nextParamState(cur paramState, transition FnParamKind) (paramState, error) { @@ -559,6 +579,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psContext: switch transition { @@ -578,6 +600,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psPane: switch transition { @@ -595,6 +619,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psWindow: switch transition { @@ -610,6 +636,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psEventTime: switch transition { @@ -623,6 +651,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psWatermarkEstimator: switch transition { @@ -634,6 +664,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psType: switch transition { @@ -643,6 +675,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psBundleFinalization: switch transition { @@ -650,13 +684,22 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psRTracker: switch transition { case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psStateProvider: + switch transition { + case FnTimerProvider: + return psTimerProvider, nil + } + case psTimerProvider: // Completely handled by the default clause case psInput: switch transition { @@ -689,6 +732,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return -1, errRTrackerPrecedence case FnStateProvider: return -1, errStateProviderPrecedence + case FnTimerProvider: + return -1, errTimerProviderPrecedence case FnIter, FnReIter, FnValue, FnMultiMap: return psInput, nil case FnEmit: diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go index 6890adc8ec39..5e1a2c569647 100644 --- a/sdks/go/pkg/beam/core/funcx/fn_test.go +++ b/sdks/go/pkg/beam/core/funcx/fn_test.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" ) @@ -114,6 +115,11 @@ func TestNew(t *testing.T) { Fn: func(sdf.RTracker, state.Provider, []byte) {}, Param: []FnParamKind{FnRTracker, FnStateProvider, FnValue}, }, + { + Name: "good11", + Fn: func(typex.PaneInfo, typex.Window, typex.EventTime, state.Provider, timers.Provider, []byte) {}, + Param: []FnParamKind{FnPane, FnWindow, FnEventTime, FnStateProvider, FnTimerProvider, FnValue}, + }, { Name: "good-method", Fn: foo{1}.Do, @@ -227,6 +233,21 @@ func TestNew(t *testing.T) { Fn: func(int, state.Provider) {}, Err: errStateProviderPrecedence, }, + { + Name: "errInputPrecedence- TimerProvider before RTracker", + Fn: func(timers.Provider, sdf.RTracker, int) {}, + Err: errRTrackerPrecedence, + }, + { + Name: "errInputPrecedence- TimerProvider after output", + Fn: func(int, timers.Provider) {}, + Err: errTimerProviderPrecedence, + }, + { + Name: "errInputPrecedence- TimerProvider before StateProvider", + Fn: func(int, timers.Provider, state.Provider) {}, + Err: errTimerProviderPrecedence, + }, { Name: "errInputPrecedence- input after output", Fn: func(int, func(int), int) {}, @@ -487,7 +508,7 @@ func TestWindow(t *testing.T) { } fn := &Fn{Param: params} - // Validate we get expected results for pane function. + // Validate we get expected results for Window function. pos, exists := fn.Window() if exists != test.Exists { t.Errorf("Window(%v) - exists: got %v, want %v", params, exists, test.Exists) @@ -531,7 +552,7 @@ func TestBundleFinalization(t *testing.T) { } fn := &Fn{Param: params} - // Validate we get expected results for pane function. + // Validate we get expected results for BundleFinalization function. pos, exists := fn.BundleFinalization() if exists != test.Exists { t.Errorf("BundleFinalization(%v) - exists: got %v, want %v", params, exists, test.Exists) @@ -575,7 +596,7 @@ func TestWatermarkEstimator(t *testing.T) { } fn := &Fn{Param: params} - // Validate we get expected results for pane function. + // Validate we get expected results for WatermarkEstimator function. pos, exists := fn.WatermarkEstimator() if exists != test.Exists { t.Errorf("WatermarkEstimator(%v) - exists: got %v, want %v", params, exists, test.Exists) @@ -587,6 +608,94 @@ func TestWatermarkEstimator(t *testing.T) { } } +func TestTimerProvider(t *testing.T) { + tests := []struct { + Name string + Params []FnParamKind + Pos int + Exists bool + }{ + { + Name: "TimerProvider input", + Params: []FnParamKind{FnContext, FnWindow, FnEventTime, FnTimerProvider}, + Pos: 3, + Exists: true, + }, + { + Name: "no TimerProvider input", + Params: []FnParamKind{FnContext, FnWindow, FnEventTime}, + Pos: -1, + Exists: false, + }, + } + + for _, test := range tests { + test := test + t.Run(test.Name, func(t *testing.T) { + // Create a Fn with a filled params list. + params := make([]FnParam, len(test.Params)) + for i, kind := range test.Params { + params[i].Kind = kind + params[i].T = nil + } + fn := &Fn{Param: params} + + // Validate we get expected results for TimerProvider function. + pos, exists := fn.TimerProvider() + if exists != test.Exists { + t.Errorf("TimerProvider(%v) - exists: got %v, want %v", params, exists, test.Exists) + } + if pos != test.Pos { + t.Errorf("TimerProvider(%v) - pos: got %v, want %v", params, pos, test.Pos) + } + }) + } +} + +func TestStateProvider(t *testing.T) { + tests := []struct { + Name string + Params []FnParamKind + Pos int + Exists bool + }{ + { + Name: "StateProvider input", + Params: []FnParamKind{FnContext, FnWindow, FnStateProvider}, + Pos: 2, + Exists: true, + }, + { + Name: "no StateProvider input", + Params: []FnParamKind{FnContext, FnWindow}, + Pos: -1, + Exists: false, + }, + } + + for _, test := range tests { + test := test + t.Run(test.Name, func(t *testing.T) { + // Create a Fn with a filled params list. + params := make([]FnParam, len(test.Params)) + for i, kind := range test.Params { + params[i].Kind = kind + params[i].T = nil + } + fn := &Fn{Param: params} + + // Validate we get expected results for StateProvider function. + pos, exists := fn.StateProvider() + if exists != test.Exists { + t.Errorf("StateProvider(%v) - exists: got %v, want %v", params, exists, test.Exists) + } + if pos != test.Pos { + t.Errorf("StateProvider(%v) - pos: got %v, want %v", params, pos, test.Pos) + } + }) + } +} + func TestInputs(t *testing.T) { tests := []struct { Name string diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index a9f1c8a092b0..a656cc9fe61e 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -156,6 +156,7 @@ type MultiEdge struct { DoFn *DoFn // ParDo RestrictionCoder *coder.Coder // SplittableParDo StateCoders map[string]*coder.Coder // Stateful ParDo + TimerCoders *coder.Coder // Stateful ParDo CombineFn *CombineFn // Combine AccumCoder *coder.Coder // Combine Value []byte // Impulse diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 54cc02e07b3d..630984272382 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -22,6 +22,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -167,6 +168,8 @@ const ( initialWatermarkEstimatorStateName = "InitialWatermarkEstimatorState" watermarkEstimatorStateName = "WatermarkEstimatorState" + onTimerName = "OnTimer" + createAccumulatorName = "CreateAccumulator" addInputName = "AddInput" mergeAccumulatorsName = "MergeAccumulators" @@ -182,6 +185,7 @@ var doFnNames = []string{ processElementName, finishBundleName, teardownName, + onTimerName, createInitialRestrictionName, splitRestrictionName, restrictionSizeName, @@ -304,6 +308,35 @@ func (f *DoFn) PipelineState() []state.PipelineState { return s } +// OnTimerFn return the "OnTimer" function and a bool indicating whether the +// function is defined or not for the DoFn. +func (f *DoFn) OnTimerFn() (*funcx.Fn, bool) { + m, ok := f.methods[onTimerName] + return m, ok +} + +// PipelineTimers returns the list of PipelineTimer objects defined for the DoFn. +func (f *DoFn) PipelineTimers() ([]timers.PipelineTimer, []string) { + var t []timers.PipelineTimer + var fieldNames []string + + if f.Recv == nil { + return t, fieldNames + } + + v := reflect.Indirect(reflect.ValueOf(f.Recv)) + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + if f.CanInterface() { + if pt, ok := f.Interface().(timers.PipelineTimer); ok { + t = append(t, pt) + fieldNames = append(fieldNames, v.Type().Field(i).Name) + } + } + } + return t, fieldNames +} + // SplittableDoFn represents a DoFn implementing SDF methods. type SplittableDoFn DoFn @@ -607,6 +640,11 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) { return nil, addContext(err, fn) } + err = validateTimer(doFn, numMainIn) + if err != nil { + return nil, addContext(err, fn) + } + return doFn, nil } @@ -1350,6 +1388,78 @@ func validateState(fn *DoFn, numIn mainInputs) error { return nil } +func validateOnTimerFn(fn *DoFn) error { + if _, ok := fn.methods[onTimerName]; !ok { + err := errors.Errorf("OnTimer function not defined for DoFn: %v", fn.Name()) + return errors.SetTopLevelMsgf(err, "OnTimer function not defined for DoFn: %v. Ensure that OnTimer function is implemented for the DoFn.", fn.Name()) + } + + pipelineTimers, _ := fn.PipelineTimers() + + if _, ok := fn.methods[onTimerName].TimerProvider(); !ok { + err := errors.Errorf("OnTimer function doesn't use a TimerProvider, but Timer field is attached to the DoFn(%v): %v", fn.Name(), pipelineTimers) + return errors.SetTopLevelMsgf(err, "OnTimer function doesn't use a TimerProvider, but Timer field is attached to the DoFn(%v): %v"+ + ", Ensure that you are using the TimerProvider to set and clear the timers.", fn.Name(), pipelineTimers) + } + + _, otNum, otExists := fn.methods[onTimerName].Emits() + _, peNum, peExists := fn.methods[processElementName].Emits() + + if otExists == peExists { + if otNum != peNum { + return fmt.Errorf("OnTimer and ProcessElement functions for DoFn should have exactly same emitters, no. of emitters used in OnTimer: %v, no. of emitters used in ProcessElement: %v", otNum, peNum) + } + } else { + return fmt.Errorf("OnTimer and ProcessElement functions for DoFn should have exactly same emitters, emitters used in OnTimer: %v, emitters used in ProcessElement: %v", otExists, peExists) + } + + return nil +} + +func validateTimer(fn *DoFn, numIn mainInputs) error { + pt, fieldNames := fn.PipelineTimers() + + if _, ok := fn.methods[processElementName].TimerProvider(); ok { + if numIn == MainSingle { + err := errors.Errorf("ProcessElement uses a TimerProvider, but is not keyed") + return errors.SetTopLevelMsgf(err, "ProcessElement uses a TimerProvider, but is not keyed. "+ + "All stateful DoFns must take a key/value pair as an input.") + } + if len(pt) == 0 { + err := errors.New("ProcessElement uses a TimerProvider, but no Timer fields are defined in the DoFn") + return errors.SetTopLevelMsgf(err, "ProcessElement uses a TimerProvider, but no timer fields are defined in the DoFn"+ + ", Ensure that your DoFn exports the Timer fields used to set and clear timers.") + } + timerKeys := make(map[string]string) + for i, t := range pt { + for timerFamilyID := range t.Timers() { + if timer, ok := timerKeys[timerFamilyID]; ok { + err := errors.Errorf("Duplicate timer key %v", timerFamilyID) + return errors.SetTopLevelMsgf(err, "Duplicate timer family ID %v used by struct fields %v and %v. Ensure that timer family IDs are unique per DoFn", timerFamilyID, timer, fieldNames[i]) + } else { + timerKeys[timerFamilyID] = fieldNames[i] + } + } + } + if err := validateOnTimerFn(fn); err != nil { + return err + } + } else { + if len(pt) > 0 { + err := errors.Errorf("ProcessElement doesn't use a TimerProvider, but Timer field is attached to the DoFn: %v", pt) + return errors.SetTopLevelMsgf(err, "ProcessElement doesn't use a TimerProvider, but Timer field is attached to the DoFn: %v"+ + ", Ensure that you are using the TimerProvider to set and clear the timers.", pt) + } + if err := validateOnTimerFn(fn); err == nil { + actualErr := errors.New("OnTimer function is defined for the DoFn but no TimerProvider defined in ProcessElement.") + return errors.SetTopLevelMsgf(actualErr, "OnTimer function is defined for the DoFn but no TimerProvider defined in ProcessElement."+ + "Ensure that timers.Provider is defined in the ProcessElement and OnTimer methods of DoFn.") + } + } + + return nil +} + // CombineFn represents a CombineFn. type CombineFn Fn diff --git a/sdks/go/pkg/beam/core/graph/fn_test.go b/sdks/go/pkg/beam/core/graph/fn_test.go index cf44761d4f3c..727230d83035 100644 --- a/sdks/go/pkg/beam/core/graph/fn_test.go +++ b/sdks/go/pkg/beam/core/graph/fn_test.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" ) @@ -53,8 +54,8 @@ func TestNewDoFn(t *testing.T) { {dfn: &GoodDoFnCoGbk2{}, opt: CoGBKMainInput(3)}, {dfn: &GoodDoFnCoGbk7{}, opt: CoGBKMainInput(8)}, {dfn: &GoodDoFnCoGbk1wSide{}, opt: NumMainInputs(MainKv)}, - {dfn: &GoodStatefulDoFn{}, opt: NumMainInputs(MainKv)}, - {dfn: &GoodStatefulDoFn2{}, opt: NumMainInputs(MainKv)}, + {dfn: &GoodStatefulDoFn{Timer1: timers.InProcessingTime("processingTimeTimer")}, opt: NumMainInputs(MainKv)}, + {dfn: &GoodStatefulDoFn2{Timer1: timers.InEventTime("eventTimeTimer")}, opt: NumMainInputs(MainKv)}, {dfn: &GoodStatefulDoFn3{State1: state.MakeCombiningState[int, int, int]("state1", func(a, b int) int { return a * b })}, opt: NumMainInputs(MainKv)}, @@ -103,9 +104,13 @@ func TestNewDoFn(t *testing.T) { {dfn: &BadDoFnReturnValuesInFinishBundle{}}, {dfn: &BadDoFnReturnValuesInSetup{}}, {dfn: &BadDoFnReturnValuesInTeardown{}}, + // Validate stateful DoFn {dfn: &BadStatefulDoFnNoStateProvider{State1: state.Value[int](state.MakeValueState[int]("state1"))}}, {dfn: &BadStatefulDoFnNoStateFields{}}, - {dfn: &BadStatefulDoFnNoKV{State1: state.Value[int](state.MakeValueState[int]("state1"))}, numInputs: 1}, + {dfn: &BadStatefulDoFnNoKV{State1: state.Value[int](state.MakeValueState[int]("state1")), Timer1: timers.InEventTime("timer1")}, numInputs: 1}, + {dfn: &BadStatefulDoFnNoTimerProvider{Timer1: timers.InEventTime("timer1")}, numInputs: 2}, + {dfn: &BadStatefulDoFnNoTimerFields{}, numInputs: 2}, + {dfn: &BadStatefulDoFnNoOnTimer{Timer1: timers.InEventTime("timer1")}, numInputs: 2}, } for _, test := range tests { t.Run(reflect.TypeOf(test.dfn).String(), func(t *testing.T) { @@ -1174,17 +1179,27 @@ func (fn *GoodStatefulWatermarkEstimatingKv) WatermarkEstimatorState(estimator * type GoodStatefulDoFn struct { State1 state.Value[int] + Timer1 timers.ProcessingTime } -func (fn *GoodStatefulDoFn) ProcessElement(state.Provider, int, int) int { +func (fn *GoodStatefulDoFn) ProcessElement(state.Provider, timers.Provider, int, int) int { + return 0 +} + +func (fn *GoodStatefulDoFn) OnTimer(state.Provider, timers.Provider, int) int { return 0 } type GoodStatefulDoFn2 struct { State1 state.Bag[int] + Timer1 timers.EventTime } -func (fn *GoodStatefulDoFn2) ProcessElement(state.Provider, int, int) int { +func (fn *GoodStatefulDoFn2) ProcessElement(state.Provider, timers.Provider, int, int) int { + return 0 +} + +func (fn *GoodStatefulDoFn2) OnTimer(state.Provider, timers.Provider, int) int { return 0 } @@ -1593,12 +1608,44 @@ func (fn *BadStatefulDoFnNoStateFields) ProcessElement(state.Provider, int) int type BadStatefulDoFnNoKV struct { State1 state.Value[int] + Timer1 timers.EventTime } func (fn *BadStatefulDoFnNoKV) ProcessElement(state.Provider, int, int) int { return 0 } +type BadStatefulDoFnNoTimerProvider struct { + Timer1 timers.EventTime +} + +func (fn *BadStatefulDoFnNoTimerProvider) ProcessElement(int, int) int { + return 0 +} + +func (fn *BadStatefulDoFnNoTimerProvider) OnTimer(timers.Provider, int) int { + return 0 +} + +type BadStatefulDoFnNoTimerFields struct { +} + +func (fn *BadStatefulDoFnNoTimerFields) ProcessElement(timers.Provider, int, int) int { + return 0 +} + +func (fn *BadStatefulDoFnNoTimerFields) OnTimer(timers.Provider, int) int { + return 0 +} + +type BadStatefulDoFnNoOnTimer struct { + Timer1 timers.EventTime +} + +func (fn *BadStatefulDoFnNoOnTimer) ProcessElement(timers.Provider, int, int) int { + return 0 +} + // Examples of correct CombineFn signatures type MyAccum struct{} diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index 39248f7f5ac3..f2a9ff728e38 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -145,8 +145,9 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder { } case coder.Timer: + tc := coder.SkipW(c).Components[0] return &timerEncoder{ - elm: MakeElementEncoder(c.Components[0]), + elm: MakeElementEncoder(tc), win: MakeWindowEncoder(c.Window), } @@ -266,8 +267,9 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder { } case coder.Timer: + tc := coder.SkipW(c).Components[0] return &timerDecoder{ - elm: MakeElementDecoder(c.Components[0]), + elm: MakeElementDecoder(tc), win: MakeWindowDecoder(c.Window), } @@ -902,7 +904,7 @@ type timerEncoder struct { } func (e *timerEncoder) Encode(val *FullValue, w io.Writer) error { - return encodeTimer(e.elm, e.win, val.Elm.(typex.TimerMap), w) + return encodeTimer(e.elm, e.win, val.Elm.(TimerRecv), w) } type timerDecoder struct { @@ -1254,11 +1256,13 @@ func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, return ws, t, pn, nil } -// encodeTimer encodes a typex.TimerMap into a byte stream. -func encodeTimer(elm ElementEncoder, win WindowEncoder, tm typex.TimerMap, w io.Writer) error { +// encodeTimer encodes a TimerRecv into a byte stream. +func encodeTimer(elm ElementEncoder, win WindowEncoder, tm TimerRecv, w io.Writer) error { var b bytes.Buffer - - elm.Encode(&FullValue{Elm: tm.Key}, &b) + err := elm.Encode(tm.Key, &b) + if err != nil { + return errors.WithContext(err, "error encoding key") + } if err := coder.EncodeStringUTF8(tm.Tag, &b); err != nil { return errors.WithContext(err, "error encoding tag") @@ -1267,6 +1271,7 @@ func encodeTimer(elm ElementEncoder, win WindowEncoder, tm typex.TimerMap, w io. if err := win.Encode(tm.Windows, &b); err != nil { return errors.WithContext(err, "error encoding window") } + if err := coder.EncodeBool(tm.Clear, &b); err != nil { return errors.WithContext(err, "error encoding clear bit") } @@ -1282,20 +1287,19 @@ func encodeTimer(elm ElementEncoder, win WindowEncoder, tm typex.TimerMap, w io. return errors.WithContext(err, "error encoding paneinfo") } } - w.Write(b.Bytes()) + return nil } // decodeTimer decodes timer byte encoded with standard timer coder spec. -func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (typex.TimerMap, error) { - tm := typex.TimerMap{} - - fv, err := dec.Decode(r) +func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (TimerRecv, error) { + tm := TimerRecv{} + key, err := dec.Decode(r) if err != nil { - return tm, errors.WithContext(err, "error decoding timer key") + return tm, errors.WithContext(err, "error decoding key") } - tm.Key = fv.Elm.(string) + tm.Key = key s, err := coder.DecodeStringUTF8(r) if err != nil && err != io.EOF { diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index b0098787acb1..40c88134a754 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -263,8 +263,16 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { } }, func(bcr *byteCountReader, ptransformID, timerFamilyID string) error { - tmap, err := decodeTimer(cp, wc, bcr) - log.Infof(ctx, "DEBUGLOG: timer received for: %v and %v - %+v err: %v", ptransformID, timerFamilyID, tmap, err) + + if fn, ok := n.OnTimerTransforms[ptransformID].Fn.OnTimerFn(); ok { + _, err := n.OnTimerTransforms[ptransformID].InvokeTimerFn(ctx, fn, timerFamilyID, bcr) + if err != nil { + log.Warnf(ctx, "expected transform %v to have an OnTimer method attached to handle"+ + "Timer Family ID: %v callback, but it did not. Please file an issue with Apache Beam"+ + "if you have defined OnTimer method with reproducible code at https://github.com/apache/beam/issues", ptransformID, timerFamilyID) + return errors.WithContext(err, "ontimer callback invocation failed") + } + } return nil }) diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index eaf9df81e4aa..0655b0f08028 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -70,15 +70,46 @@ func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) { // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error) { +// +// Deprecated: prefer InvokeWithOpts +func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) (*FullValue, error) { + if fn == nil { + return nil, nil + } + inv := newInvoker(fn) + return inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: bf, we: we, sa: sa, sr: sr, extra: extra}) +} + +// InvokeOpts are optional parameters to invoke a Fn. +type InvokeOpts struct { + opt *MainInput + bf *bundleFinalizer + we sdf.WatermarkEstimator + sa UserStateAdapter + sr StateReader + ta UserTimerAdapter + tm DataManager + extra []any +} + +// InvokeWithOpts invokes the fn with the given values. The extra values must match the non-main +// side input and emitters. It returns the direct output, if any. +func InvokeWithOpts(ctx context.Context, fn *funcx.Fn, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke } inv := newInvoker(fn) - return inv.Invoke(ctx, pn, ws, ts, opt, bf, we, sa, reader, extra...) + return inv.invokeWithOpts(ctx, pn, ws, ts, opts) +} + +// InvokeWithOptsWithoutEventTime runs the given function at time 0 in the global window. +func InvokeWithOptsWithoutEventTime(ctx context.Context, fn *funcx.Fn, opts InvokeOpts) (*FullValue, error) { + return InvokeWithOpts(ctx, fn, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opts) } // InvokeWithoutEventTime runs the given function at time 0 in the global window. +// +// Deprecated: prefer InvokeWithOptsWithoutEventTime func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke @@ -93,10 +124,12 @@ type invoker struct { fn *funcx.Fn args []any sp *stateProvider + tp *timerProvider + // TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement. - ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int // specialized input indexes - outEtIdx, outPcIdx, outErrIdx int // specialized output indexes - in, out []int // general indexes + ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx, tpIdx int // specialized input indexes + outEtIdx, outPcIdx, outErrIdx int // specialized output indexes + in, out []int // general indexes ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue. elmConvert, elm2Convert func(any) any // Cached conversion functions, which assums this invoker is always used with the same parameter types. @@ -129,6 +162,9 @@ func newInvoker(fn *funcx.Fn) *invoker { if n.spIdx, ok = fn.StateProvider(); !ok { n.spIdx = -1 } + if n.tpIdx, ok = fn.TimerProvider(); !ok { + n.tpIdx = -1 + } if n.outEtIdx, ok = fn.OutEventTime(); !ok { n.outEtIdx = -1 } @@ -163,7 +199,11 @@ func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, bf // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error) { +func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) (*FullValue, error) { + return n.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: bf, we: we, sa: sa, sr: sr, extra: extra}) +} + +func (n *invoker) invokeWithOpts(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error) { // (1) Populate contexts // extract these to make things easier to read. args := n.args @@ -178,7 +218,7 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind } if n.wndIdx >= 0 { if len(ws) != 1 { - return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) + return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opts.opt.Key.Windows) } args[n.wndIdx] = ws[0] } @@ -186,14 +226,14 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind args[n.etIdx] = ts } if n.bfIdx >= 0 { - args[n.bfIdx] = bf + args[n.bfIdx] = opts.bf } if n.weIdx >= 0 { - args[n.weIdx] = we + args[n.weIdx] = opts.we } if n.spIdx >= 0 { - sp, err := sa.NewStateProvider(ctx, reader, ws[0], opt) + sp, err := opts.sa.NewStateProvider(ctx, opts.sr, ws[0], opts.opt) if err != nil { return nil, err } @@ -201,29 +241,38 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind args[n.spIdx] = n.sp } + if n.tpIdx >= 0 { + tp, err := opts.ta.NewTimerProvider(ctx, opts.tm, ts, ws, opts.opt) + if err != nil { + return nil, err + } + n.tp = &tp + args[n.tpIdx] = n.tp + } + // (2) Main input from value, if any. i := 0 - if opt != nil { - if opt.RTracker != nil { - args[in[i]] = opt.RTracker + if opts.opt != nil { + if opts.opt.RTracker != nil { + args[in[i]] = opts.opt.RTracker i++ } if n.elmConvert == nil { - from := reflect.TypeOf(opt.Key.Elm) + from := reflect.TypeOf(opts.opt.Key.Elm) n.elmConvert = ConvertFn(from, fn.Param[in[i]].T) } - args[in[i]] = n.elmConvert(opt.Key.Elm) + args[in[i]] = n.elmConvert(opts.opt.Key.Elm) i++ - if opt.Key.Elm2 != nil { + if opts.opt.Key.Elm2 != nil { if n.elm2Convert == nil { - from := reflect.TypeOf(opt.Key.Elm2) + from := reflect.TypeOf(opts.opt.Key.Elm2) n.elm2Convert = ConvertFn(from, fn.Param[in[i]].T) } - args[in[i]] = n.elm2Convert(opt.Key.Elm2) + args[in[i]] = n.elm2Convert(opts.opt.Key.Elm2) i++ } - for _, iter := range opt.Values { + for _, iter := range opts.opt.Values { param := fn.Param[in[i]] if param.Kind != funcx.FnIter { @@ -243,7 +292,7 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind } // (3) Precomputed side input and emitters (or other output). - for _, arg := range extra { + for _, arg := range opts.extra { args[in[i]] = arg i++ } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 8cb5342ded87..652a904d16f5 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -48,8 +48,11 @@ type ParDo struct { bf *bundleFinalizer we sdf.WatermarkEstimator - reader StateReader - cache *cacheElm + onTimerInvoker *invoker + Timer UserTimerAdapter + timerManager DataManager + reader StateReader + cache *cacheElm status Status err errorx.GuardedError @@ -74,6 +77,11 @@ func (n *ParDo) ID() UnitID { return n.UID } +// HasOnTimer returns if this ParDo wraps a DoFn that has an OnTimer method. +func (n *ParDo) HasOnTimer() bool { + return n.Timer != nil +} + // Up initializes this ParDo and does one-time DoFn setup. func (n *ParDo) Up(ctx context.Context) error { if n.status != Initializing { @@ -81,6 +89,9 @@ func (n *ParDo) Up(ctx context.Context) error { } n.status = Up n.inv = newInvoker(n.Fn.ProcessElementFn()) + if fn, ok := n.Fn.OnTimerFn(); ok { + n.onTimerInvoker = newInvoker(fn) + } n.states = metrics.NewPTransformState(n.PID) @@ -88,7 +99,7 @@ func (n *ParDo) Up(ctx context.Context) error { // Subsequent bundles might run this same node, and the context here would be // incorrectly refering to the older bundleId. setupCtx := metrics.SetPTransformID(ctx, n.PID) - if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil, nil, nil, nil); err != nil { + if _, err := InvokeWithOptsWithoutEventTime(setupCtx, n.Fn.SetupFn(), InvokeOpts{}); err != nil { return n.fail(err) } @@ -111,6 +122,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) er } n.status = Active n.reader = data.State + n.timerManager = data.Data // Allocating contexts all the time is expensive, but we seldom re-write them, // and never accept modified contexts from users, so we will cache them per-bundle // per-unit, to avoid the constant allocation overhead. @@ -228,6 +240,9 @@ func (n *ParDo) FinishBundle(_ context.Context) error { } n.status = Up n.inv.Reset() + if n.onTimerInvoker != nil { + n.onTimerInvoker.Reset() + } n.states.Set(n.ctx, metrics.FinishBundle) @@ -236,6 +251,7 @@ func (n *ParDo) FinishBundle(_ context.Context) error { } n.reader = nil n.cache = nil + n.timerManager = nil if err := MultiFinishBundle(n.ctx, n.Out...); err != nil { return n.fail(err) @@ -251,8 +267,9 @@ func (n *ParDo) Down(ctx context.Context) error { n.status = Down n.reader = nil n.cache = nil + n.timerManager = nil - if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil, nil, nil, nil); err != nil { + if _, err := InvokeWithOptsWithoutEventTime(ctx, n.Fn.TeardownFn(), InvokeOpts{}); err != nil { n.err.TrySetError(err) } return n.err.Error() @@ -345,6 +362,49 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex. return val, nil } +func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID string, bcr *byteCountReader) (*FullValue, error) { + timerAdapter, ok := n.Timer.(*userTimerAdapter) + if !ok { + return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", n.GetPID()) + } + tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr) + if err != nil { + return nil, errors.WithContext(err, "error decoding received timer callback") + } + + // Defer side input clean-up in case of panic + defer func() { + if postErr := n.postInvoke(); postErr != nil { + err = postErr + } + }() + if err := n.preInvoke(ctx, tmap.Windows, tmap.HoldTimestamp); err != nil { + return nil, err + } + + var extra []any + extra = append(extra, timerFamilyID) + + if tmap.Tag != "" { + extra = append(extra, tmap.Tag) + } + extra = append(extra, n.cache.extra...) + val, err := n.onTimerInvoker.invokeWithOpts(ctx, tmap.Pane, tmap.Windows, tmap.HoldTimestamp, InvokeOpts{ + opt: &MainInput{Key: *tmap.Key}, + bf: n.bf, + we: n.we, + sa: n.UState, + sr: n.reader, + ta: n.Timer, + tm: n.timerManager, + extra: extra, + }) + if err != nil { + return nil, err + } + return val, err +} + // invokeProcessFn handles the per element invocations func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput) (val *FullValue, err error) { // Defer side input clean-up in case of panic @@ -356,7 +416,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.we, n.UState, n.reader, n.cache.extra...) + val, err = n.inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: n.bf, we: n.we, sa: n.UState, sr: n.reader, ta: n.Timer, tm: n.timerManager, extra: n.cache.extra}) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 7958cf382383..7d55b0a0d4ba 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -53,6 +53,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { callbacks: []bundleFinalizationCallback{}, lastValidCallback: time.Now(), } + var onTimers map[string]*ParDo for _, u := range units { if u == nil { @@ -67,6 +68,12 @@ func NewPlan(id string, units []Unit) (*Plan, error) { if p, ok := u.(*PCollection); ok { pcols = append(pcols, p) } + if pd, ok := u.(*ParDo); ok && pd.HasOnTimer() { + if onTimers == nil { + onTimers = map[string]*ParDo{} + } + onTimers[pd.PID] = pd + } if p, ok := u.(needsBundleFinalization); ok { p.AttachFinalizer(&bf) } @@ -75,6 +82,10 @@ func NewPlan(id string, units []Unit) (*Plan, error) { return nil, errors.Errorf("no root units") } + if len(onTimers) > 0 { + source.OnTimerTransforms = onTimers + } + return &Plan{ id: id, status: Initializing, diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers.go b/sdks/go/pkg/beam/core/runtime/exec/timers.go new file mode 100644 index 000000000000..0bc3e3dec35e --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/exec/timers.go @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "context" + "fmt" + "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +// UserTimerAdapter provides a timer provider to be used for manipulating timers. +type UserTimerAdapter interface { + NewTimerProvider(ctx context.Context, manager DataManager, inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput) (timerProvider, error) +} + +type userTimerAdapter struct { + sID StreamID + ec ElementEncoder + dc ElementDecoder + wc WindowDecoder +} + +// NewUserTimerAdapter returns a user timer adapter for the given StreamID and timer coder. +func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoder *coder.Coder) UserTimerAdapter { + if !coder.IsW(c) { + panic(fmt.Sprintf("expected WV coder for user timer %v: %v", sID, c)) + } + ec := MakeElementEncoder(timerCoder) + dc := MakeElementDecoder(coder.SkipW(c).Components[0]) + wc := MakeWindowDecoder(c.Window) + return &userTimerAdapter{sID: sID, ec: ec, wc: wc, dc: dc} +} + +// NewTimerProvider creates and returns a timer provider to set/clear timers. +func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager DataManager, inputTs typex.EventTime, w []typex.Window, element *MainInput) (timerProvider, error) { + userKey := &FullValue{Elm: element.Key.Elm} + tp := timerProvider{ + ctx: ctx, + tm: manager, + userKey: userKey, + inputTimestamp: inputTs, + sID: u.sID, + window: w, + writersByFamily: make(map[string]io.Writer), + timerElementEncoder: u.ec, + keyElementDecoder: u.dc, + } + + return tp, nil +} + +type timerProvider struct { + ctx context.Context + tm DataManager + sID StreamID + inputTimestamp typex.EventTime + userKey *FullValue + window []typex.Window + + pn typex.PaneInfo + + writersByFamily map[string]io.Writer + timerElementEncoder ElementEncoder + keyElementDecoder ElementDecoder +} + +func (p *timerProvider) getWriter(family string) (io.Writer, error) { + if w, ok := p.writersByFamily[family]; ok { + return w, nil + } else { + w, err := p.tm.OpenTimerWrite(p.ctx, p.sID, family) + if err != nil { + return nil, err + } + p.writersByFamily[family] = w + return p.writersByFamily[family], nil + } +} + +// Set writes a new timer. This can be used to both Set as well as Clear the timer. +// Note: This function is intended for internal use only. +func (p *timerProvider) Set(t timers.TimerMap) { + w, err := p.getWriter(t.Family) + if err != nil { + panic(err) + } + tm := TimerRecv{ + Key: p.userKey, + Tag: t.Tag, + Windows: p.window, + Clear: t.Clear, + FireTimestamp: t.FireTimestamp, + HoldTimestamp: t.HoldTimestamp, + Pane: p.pn, + } + fv := FullValue{Elm: tm} + if err := p.timerElementEncoder.Encode(&fv, w); err != nil { + panic(err) + } +} + +// TimerRecv holds the timer metadata while encoding and decoding timers in exec unit. +type TimerRecv struct { + Key *FullValue + Tag string + Windows []typex.Window // []typex.Window + Clear bool + FireTimestamp, HoldTimestamp mtime.Time + Pane typex.PaneInfo +} diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers_test.go b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go index d25f70b94e07..2b3a33de792b 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/timers_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go @@ -22,11 +22,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" ) -func equalTimers(a, b typex.TimerMap) bool { - return a.Key == b.Key && a.Tag == b.Tag && (a.FireTimestamp) == b.FireTimestamp && a.Clear == b.Clear +func equalTimers(a, b TimerRecv) bool { + return a.Key.Elm == b.Key.Elm && a.Tag == b.Tag && (a.FireTimestamp) == b.FireTimestamp && a.Clear == b.Clear } func TestTimerEncodingDecoding(t *testing.T) { @@ -36,13 +35,13 @@ func TestTimerEncodingDecoding(t *testing.T) { tests := []struct { name string - tm typex.TimerMap + tm TimerRecv result bool }{ { name: "all set fields", - tm: typex.TimerMap{ - Key: "Basic", + tm: TimerRecv{ + Key: &FullValue{Elm: "Basic"}, Tag: "first", Windows: window.SingleGlobalWindow, Clear: false, @@ -52,8 +51,8 @@ func TestTimerEncodingDecoding(t *testing.T) { }, { name: "without tag", - tm: typex.TimerMap{ - Key: "Basic", + tm: TimerRecv{ + Key: &FullValue{Elm: "Basic"}, Tag: "", Windows: window.SingleGlobalWindow, Clear: false, @@ -63,8 +62,8 @@ func TestTimerEncodingDecoding(t *testing.T) { }, { name: "with clear set", - tm: typex.TimerMap{ - Key: "Basic", + tm: TimerRecv{ + Key: &FullValue{Elm: "Basic"}, Tag: "first", Windows: window.SingleGlobalWindow, Clear: true, @@ -87,7 +86,7 @@ func TestTimerEncodingDecoding(t *testing.T) { t.Fatalf("failed to decode timer, got %v", err) } - if got, want := gotFv.Elm.(typex.TimerMap), test.tm; test.result != equalTimers(got, want) { + if got, want := gotFv.Elm.(TimerRecv), test.tm; test.result != equalTimers(got, want) { t.Errorf("got timer %v, want %v", got, want) } }) diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index 78cf0ef65cd6..a8d1fdb7deee 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -462,6 +462,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { var data string var sides map[string]*pipepb.SideInput var userState map[string]*pipepb.StateSpec + var userTimers map[string]*pipepb.TimerFamilySpec switch urn { case graphx.URNParDo, urnPairWithRestriction, @@ -475,6 +476,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { data = string(pardo.GetDoFn().GetPayload()) sides = pardo.GetSideInputs() userState = pardo.GetStateSpecs() + userTimers = pardo.GetTimerFamilySpecs() case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert: var cmb pipepb.CombinePayload if err := proto.Unmarshal(payload, &cmb); err != nil { @@ -588,6 +590,16 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { } } + if len(userTimers) > 0 { + sID := StreamID{Port: Port{URL: b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to} + ec, wc, err := b.makeCoderForPCollection(input[0]) + if err != nil { + return nil, err + } + timerCoder := coder.NewT(ec.Components[0], wc) + n.Timer = NewUserTimerAdapter(sID, coder.NewW(ec, wc), timerCoder) + } + for i := 1; i < len(input); i++ { // TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs diff --git a/sdks/go/pkg/beam/core/runtime/genx/genx.go b/sdks/go/pkg/beam/core/runtime/genx/genx.go index d78ef35b8acc..8d1ce6f3e936 100644 --- a/sdks/go/pkg/beam/core/runtime/genx/genx.go +++ b/sdks/go/pkg/beam/core/runtime/genx/genx.go @@ -103,6 +103,9 @@ func handleDoFn(fn *graph.DoFn, c cache) { c.pullMethod(fn.ProcessElementFn()) c.pullMethod(fn.FinishBundleFn()) c.pullMethod(fn.TeardownFn()) + if timerFn, ok := fn.OnTimerFn(); ok { + c.pullMethod(timerFn) + } if !fn.IsSplittable() { return } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go index 498e145f5db4..34b44dd85920 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go @@ -73,7 +73,7 @@ func knownStandardCoders() []string { urnIntervalWindow, urnRowCoder, urnNullableCoder, - // TODO(https://github.com/apache/beam/issues/20510): Add urnTimerCoder once finalized. + urnTimerCoder, } } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go index 65ad1bdd0600..fb62e18e1306 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" @@ -523,6 +524,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, bool) { return v1pb.Type_BUNDLEFINALIZATION, true case state.ProviderType: return v1pb.Type_STATEPROVIDER, true + case timers.ProviderType: + return v1pb.Type_TIMERPROVIDER, true case typex.KVType: return v1pb.Type_KV, true case typex.CoGBKType: @@ -689,6 +692,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, error) { return typex.BundleFinalizationType, nil case v1pb.Type_STATEPROVIDER: return state.ProviderType, nil + case v1pb.Type_TIMERPROVIDER: + return timers.ProviderType, nil case v1pb.Type_KV: return typex.KVType, nil case v1pb.Type_COGBK: diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 68074ac7eb3a..6a5cb313d17e 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -578,6 +578,24 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { } payload.StateSpecs = stateSpecs } + if _, ok := edge.Edge.DoFn.ProcessElementFn().TimerProvider(); ok { + m.requirements[URNRequiresStatefulProcessing] = true + timerSpecs := make(map[string]*pipepb.TimerFamilySpec) + pipelineTimers, _ := edge.Edge.DoFn.PipelineTimers() + for _, pt := range pipelineTimers { + for timerFamilyID, timeDomain := range pt.Timers() { + coderID, err := m.coders.Add(edge.Edge.TimerCoders) + if err != nil { + return handleErr(err) + } + timerSpecs[timerFamilyID] = &pipepb.TimerFamilySpec{ + TimeDomain: pipepb.TimeDomain_Enum(timeDomain), + TimerFamilyCoderId: coderID, + } + } + } + payload.TimerFamilySpecs = timerSpecs + } spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)} annotations = edge.Edge.DoFn.Annotations() diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go index 3850ee3a3667..d8c0f4d1d852 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go @@ -690,8 +690,6 @@ func (w *timerWriter) writeTimers(p []byte) error { w.ch.mu.Lock() defer w.ch.mu.Unlock() - log.Infof(context.TODO(), "DEBUGLOG: timer write for %+v: %v", w.id, p) - msg := &fnpb.Elements{ Timers: []*fnpb.Elements_Timers{ { diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index c7e6a9b4db24..252914a74f92 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -402,7 +402,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe sampler.stop() - data.Close() + dataError := data.Close() state.Close() c.cache.CompleteBundle(tokens...) @@ -416,6 +416,10 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe // Mark the instruction as failed. if err != nil { c.failed[instID] = err + } else if dataError != io.EOF && dataError != nil { + // If there was an error on the data channel reads, fail this bundle + // since we may have had a short read. + c.failed[instID] = dataError } else { // Non failure plans should either be moved to the finalized state // or to plans so they can be re-used. @@ -573,7 +577,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } // Unsuccessful splits without errors indicate we should return an empty response, - // as processing can confinue. + // as processing can continue. if sr.Unsuccessful { return &fnpb.InstructionResponse{ InstructionId: string(instID), diff --git a/sdks/go/pkg/beam/core/timers/timers.go b/sdks/go/pkg/beam/core/timers/timers.go new file mode 100644 index 000000000000..ebffa215efa4 --- /dev/null +++ b/sdks/go/pkg/beam/core/timers/timers.go @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timers contains structs for setting pipeline timers. +package timers + +import ( + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +var ( + // ProviderType represents the type of timer provider. + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +// TimeDomain represents different time domains to set timer. +type TimeDomain int32 + +const ( + // UnspecifiedTimeDomain represents unspecified time domain. + UnspecifiedTimeDomain TimeDomain = 0 + // EventTimeDomain is time from the perspective of the data + EventTimeDomain TimeDomain = 1 + // ProcessingTimeDomain is time from the perspective of the + // execution of your pipeline + ProcessingTimeDomain TimeDomain = 2 +) + +// TimerMap holds timer information obtained from the pipeline. +type TimerMap struct { + Family string + Tag string + Clear bool + FireTimestamp, HoldTimestamp mtime.Time +} + +type timerConfig struct { + Tag string + HoldTimestamp mtime.Time +} + +type timerOptions func(*timerConfig) + +// WithTag sets the tag for the timer. +func WithTag(tag string) timerOptions { + return func(tm *timerConfig) { + tm.Tag = tag + } +} + +// WithOutputTimestamp sets the output timestamp for the timer. +func WithOutputTimestamp(outputTimestamp time.Time) timerOptions { + return func(tm *timerConfig) { + tm.HoldTimestamp = mtime.FromTime(outputTimestamp) + } +} + +// Provider represents a timer provider interface. +type Provider interface { + Set(t TimerMap) +} + +// PipelineTimer interface represents valid timer type. +type PipelineTimer interface { + Timers() map[string]TimeDomain +} + +// EventTime represents the event time timer. +type EventTime struct { + Family string +} + +// Timers returns mapping of timer family ID and its time domain. +func (et EventTime) Timers() map[string]TimeDomain { + return map[string]TimeDomain{et.Family: EventTimeDomain} +} + +// Set sets the timer for a event-time timestamp. Calling this method repeatedly for the same key +// will cause it overwrite previously set timer. +func (et *EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOptions) { + tc := timerConfig{} + for _, opt := range opts { + opt(&tc) + } + tm := TimerMap{Family: et.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)} + if !tc.HoldTimestamp.ToTime().IsZero() { + tm.HoldTimestamp = tc.HoldTimestamp + } + p.Set(tm) +} + +// Clear clears this timer. +func (et *EventTime) Clear(p Provider) { + p.Set(TimerMap{Family: et.Family, Clear: true}) +} + +// ProcessingTime represents the processing time timer. +type ProcessingTime struct { + Family string +} + +// Timers returns mapping of timer family ID and its time domain. +func (pt ProcessingTime) Timers() map[string]TimeDomain { + return map[string]TimeDomain{pt.Family: ProcessingTimeDomain} +} + +// Set sets the timer for processing time domain. Calling this method repeatedly for the same key +// will cause it overwrite previously set timer. +func (pt *ProcessingTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOptions) { + tc := timerConfig{} + for _, opt := range opts { + opt(&tc) + } + tm := TimerMap{Family: pt.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)} + if !tc.HoldTimestamp.ToTime().IsZero() { + tm.HoldTimestamp = tc.HoldTimestamp + } + + p.Set(tm) +} + +// Clear clears this timer. +func (pt ProcessingTime) Clear(p Provider) { + p.Set(TimerMap{Family: pt.Family, Clear: true}) +} + +// InEventTime creates and returns a new EventTime timer object. +func InEventTime(Key string) EventTime { + return EventTime{Family: Key} +} + +// InProcessingTime creates and returns a new ProcessingTime timer object. +func InProcessingTime(Key string) ProcessingTime { + return ProcessingTime{Family: Key} +} diff --git a/sdks/go/pkg/beam/core/typex/class.go b/sdks/go/pkg/beam/core/typex/class.go index e112495ee986..63e4543a3e54 100644 --- a/sdks/go/pkg/beam/core/typex/class.go +++ b/sdks/go/pkg/beam/core/typex/class.go @@ -120,6 +120,7 @@ func isConcrete(t reflect.Type, visited map[uintptr]bool) error { t == EventTimeType || t.Implements(WindowType) || t == PaneInfoType || + t == TimersType || t == BundleFinalizationType || t == reflectx.Error || t == reflectx.Context || diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go index 935371225848..edc1249fe763 100644 --- a/sdks/go/pkg/beam/core/typex/special.go +++ b/sdks/go/pkg/beam/core/typex/special.go @@ -105,15 +105,6 @@ type Timers struct { Pane PaneInfo } -// TimerMap is a placeholder for timer details used in encoding/decoding. -type TimerMap struct { - Key, Tag string - Windows []Window // []typex.Window - Clear bool - FireTimestamp, HoldTimestamp mtime.Time - Pane PaneInfo -} - // KV, Nullable, CoGBK, WindowedValue represent composite generic types. They are not used // directly in user code signatures, but only in FullTypes. diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go index 1314836dfdc2..61091a67a29c 100644 --- a/sdks/go/pkg/beam/pardo.go +++ b/sdks/go/pkg/beam/pardo.go @@ -116,6 +116,17 @@ func TryParDo(s Scope, dofn any, col PCollection, opts ...Option) ([]PCollection } } + wc := inWfn.Coder() + pipelineTimers, _ := fn.PipelineTimers() + if len(pipelineTimers) > 0 { + c, err := inferCoder(typex.New(reflect.TypeOf(col.Type()))) + if err != nil { + return nil, addParDoCtx(errors.New("error infering coder from col"), s) + } + tc := coder.NewT(c, wc) + edge.TimerCoders = tc + } + var ret []PCollection for _, out := range edge.Output { c := PCollection{out.To} diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go index 2566a88cbb2e..466f361b0138 100644 --- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go +++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go @@ -326,13 +326,13 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool { got, want = elem.Elm, rv.Interface() case "beam:coder:timer:v1": pass := true - tm := elem.Elm.(typex.TimerMap) + tm := elem.Elm.(exec.TimerRecv) fs := eg.Value.(yaml.MapSlice) for _, item := range fs { switch item.Key.(string) { case "userKey": - if want := item.Value.(string); want != tm.Key { + if want := item.Value.(string); want != tm.Key.Elm.(string) { pass = false } case "dynamicTimerTag":