From 0b1b2f2555d71f284f310c35898c8e3cb5a6d697 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 30 May 2025 16:23:13 -0400 Subject: [PATCH 1/3] Fix the issue of timer clearing not working across bundle --- .../prism/internal/engine/elementmanager.go | 32 +++++++++---------- .../runners/prism/internal/engine/timers.go | 13 +++++++- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index de997fa3282c..6c2f92d0c60e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -959,11 +959,6 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag for _, e := range ret.elms { keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e } - if len(ret.elms) == 0 { - for _, w := range ret.windows { - delete(keyToTimers, timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w}) - } - } // Indicate we'd like to continue iterating. return true }) @@ -1345,18 +1340,23 @@ func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPend heap.Push(&dnt.elements, e) if e.IsTimer() { - if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { - // existing timer! - // don't increase the count this time, as "this" timer is already pending. - count-- - // clear out the existing hold for accounting purposes. - ss.watermarkHolds.Drop(lastSet.hold, 1) - } - // Update the last set time on the timer. - dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp} + if e.sequence > 0 { + if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { + // existing timer! + // don't increase the count this time, as "this" timer is already pending. + count-- + // clear out the existing hold for accounting purposes. + ss.watermarkHolds.Drop(lastSet.hold, 1) + } + // Update the last set time on the timer. + dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp} - // Mark the hold in the heap. - ss.watermarkHolds.Add(e.holdTimestamp, 1) + // Mark the hold in the heap. + ss.watermarkHolds.Add(e.holdTimestamp, 1) + } else { + // timer is to be cleared + delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) + } } } return count diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 8b90591974b8..4e8a394ff80b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -86,7 +86,18 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [ clear := d.Bool() hold := mtime.MaxTimestamp if clear { - if !yield(timerRet{keyBytes, tag, nil, ws}) { + var elms []element + for _, w := range ws { + elms = append(elms, element{ + tag: tag, + elmBytes: nil, // indicates this is a timer. + keyBytes: keyBytes, + window: w, + sequence: -1, + }) + } + + if !yield(timerRet{keyBytes, tag, elms, ws}) { return // Halt iteration if yeild returns false. } // Otherwise continue handling the remaining bytes. From 8e500e09c1ca5d6e9c7453bcfcbceaa09fe02d3c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 30 May 2025 20:07:11 -0400 Subject: [PATCH 2/3] Correct keep track of pending counts for cleared timers. Add some more comments and fix typos. --- .../prism/internal/engine/elementmanager.go | 22 ++++++++++++------- .../runners/prism/internal/engine/timers.go | 6 ++--- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 6c2f92d0c60e..4c03ac53fe37 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -53,6 +53,7 @@ type element struct { // No synchronization is required in specifying this, // since keyed elements are only processed by a single bundle at a time, // if stateful stages are concerned. + // If a timer element has sequence set to -1, it means it is being cleared. sequence int elmBytes []byte // When nil, indicates this is a timer. @@ -1340,20 +1341,25 @@ func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPend heap.Push(&dnt.elements, e) if e.IsTimer() { - if e.sequence > 0 { - if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { - // existing timer! - // don't increase the count this time, as "this" timer is already pending. - count-- - // clear out the existing hold for accounting purposes. - ss.watermarkHolds.Drop(lastSet.hold, 1) - } + lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] + if ok { + // existing timer! + // don't increase the count this time, as "this" timer is already pending. + count-- + // clear out the existing hold for accounting purposes. + ss.watermarkHolds.Drop(lastSet.hold, 1) + } + if e.sequence >= 0 { // Update the last set time on the timer. dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp} // Mark the hold in the heap. ss.watermarkHolds.Add(e.holdTimestamp, 1) } else { + // we need to decrement the pending count only if the timer to be cleared is in the pending list + if ok { + count-- + } // timer is to be cleared delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 4e8a394ff80b..636ed5a644cc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -93,12 +93,12 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [ elmBytes: nil, // indicates this is a timer. keyBytes: keyBytes, window: w, - sequence: -1, + sequence: -1, // indicates this timer is being cleared. }) } if !yield(timerRet{keyBytes, tag, elms, ws}) { - return // Halt iteration if yeild returns false. + return // Halt iteration if yield returns false. } // Otherwise continue handling the remaining bytes. raw = d.UnusedBytes() @@ -124,7 +124,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [ } if !yield(timerRet{keyBytes, tag, elms, ws}) { - return // Halt iteration if yeild returns false. + return // Halt iteration if yield returns false. } // Otherwise continue handling the remaining bytes. raw = d.UnusedBytes() From 136b50448af88e735b0ff0153368591e3fcdbbc0 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 30 May 2025 21:12:03 -0400 Subject: [PATCH 3/3] Fix pending counts for clearing signal again. Minor edits on comment. --- .../runners/prism/internal/engine/elementmanager.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 4c03ac53fe37..08056ae18b22 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1341,10 +1341,9 @@ func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPend heap.Push(&dnt.elements, e) if e.IsTimer() { - lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] - if ok { + if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { // existing timer! - // don't increase the count this time, as "this" timer is already pending. + // don't increase the count this time, as "e" is already pending. count-- // clear out the existing hold for accounting purposes. ss.watermarkHolds.Drop(lastSet.hold, 1) @@ -1356,10 +1355,8 @@ func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPend // Mark the hold in the heap. ss.watermarkHolds.Add(e.holdTimestamp, 1) } else { - // we need to decrement the pending count only if the timer to be cleared is in the pending list - if ok { - count-- - } + // decrement the pending count since "e" is not a real timer (merely a clearing signal) + count-- // timer is to be cleared delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) }