diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index de997fa3282c..08056ae18b22 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -53,6 +53,7 @@ type element struct { // No synchronization is required in specifying this, // since keyed elements are only processed by a single bundle at a time, // if stateful stages are concerned. + // If a timer element has sequence set to -1, it means it is being cleared. sequence int elmBytes []byte // When nil, indicates this is a timer. @@ -959,11 +960,6 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag for _, e := range ret.elms { keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e } - if len(ret.elms) == 0 { - for _, w := range ret.windows { - delete(keyToTimers, timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w}) - } - } // Indicate we'd like to continue iterating. return true }) @@ -1347,16 +1343,23 @@ func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPend if e.IsTimer() { if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok { // existing timer! - // don't increase the count this time, as "this" timer is already pending. + // don't increase the count this time, as "e" is already pending. count-- // clear out the existing hold for accounting purposes. ss.watermarkHolds.Drop(lastSet.hold, 1) } - // Update the last set time on the timer. - dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp} + if e.sequence >= 0 { + // Update the last set time on the timer. + dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp} - // Mark the hold in the heap. - ss.watermarkHolds.Add(e.holdTimestamp, 1) + // Mark the hold in the heap. + ss.watermarkHolds.Add(e.holdTimestamp, 1) + } else { + // decrement the pending count since "e" is not a real timer (merely a clearing signal) + count-- + // timer is to be cleared + delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) + } } } return count diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 8b90591974b8..636ed5a644cc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -86,8 +86,19 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [ clear := d.Bool() hold := mtime.MaxTimestamp if clear { - if !yield(timerRet{keyBytes, tag, nil, ws}) { - return // Halt iteration if yeild returns false. + var elms []element + for _, w := range ws { + elms = append(elms, element{ + tag: tag, + elmBytes: nil, // indicates this is a timer. + keyBytes: keyBytes, + window: w, + sequence: -1, // indicates this timer is being cleared. + }) + } + + if !yield(timerRet{keyBytes, tag, elms, ws}) { + return // Halt iteration if yield returns false. } // Otherwise continue handling the remaining bytes. raw = d.UnusedBytes() @@ -113,7 +124,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [ } if !yield(timerRet{keyBytes, tag, elms, ws}) { - return // Halt iteration if yeild returns false. + return // Halt iteration if yield returns false. } // Otherwise continue handling the remaining bytes. raw = d.UnusedBytes()