Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type element struct {
// No synchronization is required in specifying this,
// since keyed elements are only processed by a single bundle at a time,
// if stateful stages are concerned.
// If a timer element has sequence set to -1, it means it is being cleared.
sequence int

elmBytes []byte // When nil, indicates this is a timer.
Expand Down Expand Up @@ -959,11 +960,6 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
for _, e := range ret.elms {
keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
}
if len(ret.elms) == 0 {
for _, w := range ret.windows {
delete(keyToTimers, timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w})
}
}
// Indicate we'd like to continue iterating.
return true
})
Expand Down Expand Up @@ -1347,16 +1343,23 @@ func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPend
if e.IsTimer() {
if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok {
// existing timer!
// don't increase the count this time, as "this" timer is already pending.
// don't increase the count this time, as "e" is already pending.
count--
// clear out the existing hold for accounting purposes.
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
// Update the last set time on the timer.
dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
if e.sequence >= 0 {
// Update the last set time on the timer.
dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}

// Mark the hold in the heap.
ss.watermarkHolds.Add(e.holdTimestamp, 1)
// Mark the hold in the heap.
ss.watermarkHolds.Add(e.holdTimestamp, 1)
} else {
// decrement the pending count since "e" is not a real timer (merely a clearing signal)
count--
// timer is to be cleared
delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window})
}
}
}
return count
Expand Down
17 changes: 14 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,19 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [
clear := d.Bool()
hold := mtime.MaxTimestamp
if clear {
if !yield(timerRet{keyBytes, tag, nil, ws}) {
return // Halt iteration if yeild returns false.
var elms []element
for _, w := range ws {
elms = append(elms, element{
tag: tag,
elmBytes: nil, // indicates this is a timer.
keyBytes: keyBytes,
window: w,
sequence: -1, // indicates this timer is being cleared.
})
}

if !yield(timerRet{keyBytes, tag, elms, ws}) {
return // Halt iteration if yield returns false.
}
// Otherwise continue handling the remaining bytes.
raw = d.UnusedBytes()
Expand All @@ -113,7 +124,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw [
}

if !yield(timerRet{keyBytes, tag, elms, ws}) {
return // Halt iteration if yeild returns false.
return // Halt iteration if yield returns false.
}
// Otherwise continue handling the remaining bytes.
raw = d.UnusedBytes()
Expand Down
Loading