From 770df91e7be866c62f41076441ae5cbe3ee6f59a Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:53:17 -0700 Subject: [PATCH 1/7] [#32115] Fix timer support, support timer clears. --- .../prism/internal/engine/elementmanager.go | 23 ++-- .../runners/prism/internal/engine/timers.go | 115 +++++++++++------- .../runners/portability/prism_runner_test.py | 34 ++++++ 3 files changed, 122 insertions(+), 50 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index bc8449c72b39..afda786527a5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -869,14 +869,20 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag for tentativeKey, timers := range d.timers { keyToTimers := map[timerKey]element{} for _, t := range timers { - key, tag, elms := decodeTimer(inputInfo.KeyDec, true, t) - for _, e := range elms { - keyToTimers[timerKey{key: string(key), tag: tag, win: e.window}] = e - } - if len(elms) == 0 { - // TODO(lostluck): Determine best way to mark a timer cleared. - continue - } + // TODO: Call in a for:range loop when Beam's minimum Go version hits 1.23.0 + iter := decodeTimerIter(inputInfo.KeyDec, true, t) + iter(func(ret timerRet) bool { + for _, e := range ret.elms { + keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e + } + if len(ret.elms) == 0 { + for _, w := range ret.windows { + delete(keyToTimers, timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w}) + } + } + // Indicate we'd like to continue iterating. + return true + }) } for _, elm := range keyToTimers { @@ -901,6 +907,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag } if len(pendingEventTimers) > 0 { + fmt.Println("pendingEventTimers", len(pendingEventTimers), pendingEventTimers) count := stage.AddPending(pendingEventTimers) em.addPending(count) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 787d27858a0e..5250816732df 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -31,53 +31,77 @@ import ( "google.golang.org/protobuf/encoding/protowire" ) -// DecodeTimer extracts timers to elements for insertion into their keyed queues. -// Returns the key bytes, tag, window exploded elements, and the hold timestamp. +type timerRet struct { + keyBytes []byte + tag string + elms []element + windows []typex.Window +} + +// decodeTimerIter extracts timers to to elements for insertion into their keyed queues, +// through a go iterator function, to be called by the caller with their processing function. +// +// For each timer, a key, tag, windowed elements, and the window set are returned. +// // If the timer has been cleared, no elements will be returned. Any existing timers -// for the tag *must* be cleared from the pending queue. -func decodeTimer(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) ([]byte, string, []element) { - keyBytes := keyDec(bytes.NewBuffer(raw)) - - d := decoder{raw: raw, cursor: len(keyBytes)} - tag := string(d.Bytes()) - - var ws []typex.Window - numWin := d.Fixed32() - if usesGlobalWindow { - for i := 0; i < int(numWin); i++ { - ws = append(ws, window.GlobalWindow{}) - } - } else { - // Assume interval windows here, since we don't understand custom windows yet. - for i := 0; i < int(numWin); i++ { - ws = append(ws, d.IntervalWindow()) - } - } +// for the tag *must* be cleared from the pending queue. The windows associated with +// the clear are provided to be able to delete pending timers. +func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) func(yeild func(timerRet) bool) { + return func(yeild func(timerRet) bool) { + for len(raw) > 0 { + keyBytes := keyDec(bytes.NewBuffer(raw)) + d := decoder{raw: raw, cursor: len(keyBytes)} + tag := string(d.Bytes()) + + var ws []typex.Window + numWin := d.Fixed32() + if usesGlobalWindow { + for i := 0; i < int(numWin); i++ { + ws = append(ws, window.GlobalWindow{}) + } + } else { + // Assume interval windows here, since we don't understand custom windows yet. + for i := 0; i < int(numWin); i++ { + ws = append(ws, d.IntervalWindow()) + } + } - clear := d.Bool() - hold := mtime.MaxTimestamp - if clear { - return keyBytes, tag, nil - } + clear := d.Bool() + hold := mtime.MaxTimestamp + if clear { + if !yeild(timerRet{keyBytes, tag, nil, ws}) { + return // Halt iteration if yeild returns false. + } + // Otherwise continue handling the remaining bytes. + raw = d.UnusedBytes() + continue + } - firing := d.Timestamp() - hold = d.Timestamp() - pane := d.Pane() + firing := d.Timestamp() + hold = d.Timestamp() + pane := d.Pane() + + var elms []element + for _, w := range ws { + elms = append(elms, element{ + tag: tag, + elmBytes: nil, // indicates this is a timer. + keyBytes: keyBytes, + window: w, + timestamp: firing, + holdTimestamp: hold, + pane: pane, + sequence: len(elms), + }) + } - var ret []element - for _, w := range ws { - ret = append(ret, element{ - tag: tag, - elmBytes: nil, // indicates this is a timer. - keyBytes: keyBytes, - window: w, - timestamp: firing, - holdTimestamp: hold, - pane: pane, - sequence: len(ret), - }) + if !yeild(timerRet{keyBytes, tag, elms, ws}) { + return // Halt iteration if yeild returns false. + } + // Otherwise continue handling the remaining bytes. + raw = d.UnusedBytes() + } } - return keyBytes, tag, ret } type decoder struct { @@ -140,6 +164,13 @@ func (d *decoder) Bytes() []byte { return b } +// UnusedBytes returns the remainder of bytes in the buffer that weren't yet used. +// Multiple timers can be provided in a single timers buffer, since multiple dynamic +// timer tags may be set. +func (d *decoder) UnusedBytes() []byte { + return d.raw[d.cursor:] +} + func (d *decoder) Bool() bool { if b := d.Byte(); b == 0 { return false diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 324fe5a17b54..c8aa1e25d3b6 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -40,6 +40,7 @@ from apache_beam.runners.portability import portable_runner_test from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms import userstate from apache_beam.transforms import window from apache_beam.transforms.sql import SqlTransform from apache_beam.utils import timestamp @@ -200,6 +201,39 @@ def test_windowing(self): assert_that( res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) + # The fn_runner_test.py version of this test doesn't execute the process method + # for some reason. Overridden here to validate that the cleared timer won't + # re-fire. + def test_pardo_timers_clear(self): + timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) + + class TimerDoFn(beam.DoFn): + def process( + self, + element, + timer=beam.DoFn.TimerParam(timer_spec)): + unused_key, ts = element + timer.set(ts) + timer.set(2 * ts) + + @userstate.on_timer(timer_spec) + def process_timer(self, + ts=beam.DoFn.TimestampParam, + timer=beam.DoFn.TimerParam(timer_spec)): + timer.set(timestamp.Timestamp(micros=2 * ts.micros)) + timer.clear() # Shouldn't fire again + yield 'fired' + + with self.create_pipeline() as p: + actual = ( + p + | beam.Create([('k1', 10), ('k2', 100)]) + | beam.ParDo(TimerDoFn()) + | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts))) + + expected = [('fired', ts) for ts in (20, 200)] + assert_that(actual, equal_to(expected)) + # Can't read host files from within docker, read a "local" file there. def test_read(self): print('name:', __name__) From 33946a31628684dcd538dd0b78fb9b6cc7af8a31 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:36:47 -0700 Subject: [PATCH 2/7] pylint fmt --- .../runners/portability/prism_runner_test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index c8aa1e25d3b6..a21724b82c04 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -208,20 +208,19 @@ def test_pardo_timers_clear(self): timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) class TimerDoFn(beam.DoFn): - def process( - self, - element, - timer=beam.DoFn.TimerParam(timer_spec)): + + def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)): unused_key, ts = element timer.set(ts) timer.set(2 * ts) @userstate.on_timer(timer_spec) - def process_timer(self, + def process_timer( + self, ts=beam.DoFn.TimestampParam, timer=beam.DoFn.TimerParam(timer_spec)): timer.set(timestamp.Timestamp(micros=2 * ts.micros)) - timer.clear() # Shouldn't fire again + timer.clear() # Shouldn't fire again yield 'fired' with self.create_pipeline() as p: From 5c4d2b91931a4e8ebb794cc45a423051cb9b8791 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:47:53 -0700 Subject: [PATCH 3/7] typo --- sdks/go/pkg/beam/runners/prism/internal/engine/timers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 5250816732df..1bf67f895bcc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -38,7 +38,7 @@ type timerRet struct { windows []typex.Window } -// decodeTimerIter extracts timers to to elements for insertion into their keyed queues, +// decodeTimerIter extracts timers to elements for insertion into their keyed queues, // through a go iterator function, to be called by the caller with their processing function. // // For each timer, a key, tag, windowed elements, and the window set are returned. From b51770fdef60720d55eda1abd9dad130c4486288 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 9 Aug 2024 10:32:20 -0700 Subject: [PATCH 4/7] line length limit --- .../apache_beam/runners/portability/prism_runner_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index a21724b82c04..1510d9a525b1 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -201,9 +201,9 @@ def test_windowing(self): assert_that( res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) - # The fn_runner_test.py version of this test doesn't execute the process method - # for some reason. Overridden here to validate that the cleared timer won't - # re-fire. + # The fn_runner_test.py version of this test doesn't execute the process + # method for some reason. Overridden here to validate that the cleared + # timer won't re-fire. def test_pardo_timers_clear(self): timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) From 9f551a3b7a2ca5c9ee39e12fc847462c9cfb6592 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 9 Aug 2024 10:50:24 -0700 Subject: [PATCH 5/7] remove debug print. --- sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index afda786527a5..c73db507c792 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -907,7 +907,6 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag } if len(pendingEventTimers) > 0 { - fmt.Println("pendingEventTimers", len(pendingEventTimers), pendingEventTimers) count := stage.AddPending(pendingEventTimers) em.addPending(count) } From 72badf558e68b19e62df53f064ecb38d417498ee Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 9 Aug 2024 11:09:53 -0700 Subject: [PATCH 6/7] Fix yeild typo. --- sdks/go/pkg/beam/runners/prism/internal/engine/timers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 1bf67f895bcc..9a3bd6f9682b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -46,8 +46,8 @@ type timerRet struct { // If the timer has been cleared, no elements will be returned. Any existing timers // for the tag *must* be cleared from the pending queue. The windows associated with // the clear are provided to be able to delete pending timers. -func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) func(yeild func(timerRet) bool) { - return func(yeild func(timerRet) bool) { +func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) func(func(timerRet) bool) { + return func(yield func(timerRet) bool) { for len(raw) > 0 { keyBytes := keyDec(bytes.NewBuffer(raw)) d := decoder{raw: raw, cursor: len(keyBytes)} @@ -69,7 +69,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw [ clear := d.Bool() hold := mtime.MaxTimestamp if clear { - if !yeild(timerRet{keyBytes, tag, nil, ws}) { + if !yield(timerRet{keyBytes, tag, nil, ws}) { return // Halt iteration if yeild returns false. } // Otherwise continue handling the remaining bytes. @@ -95,7 +95,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw [ }) } - if !yeild(timerRet{keyBytes, tag, elms, ws}) { + if !yield(timerRet{keyBytes, tag, elms, ws}) { return // Halt iteration if yeild returns false. } // Otherwise continue handling the remaining bytes. From 28ad5229cc7942df20e3b2aeaed3f385025e5252 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 9 Aug 2024 13:07:57 -0700 Subject: [PATCH 7/7] further formatting nits --- sdks/python/apache_beam/runners/portability/prism_runner_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 1510d9a525b1..b179156877e4 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -208,7 +208,6 @@ def test_pardo_timers_clear(self): timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) class TimerDoFn(beam.DoFn): - def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)): unused_key, ts = element timer.set(ts)