Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sdks/go/pkg/beam/core/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ type TimerMap struct {
FireTimestamp, HoldTimestamp mtime.Time
}

// timerConfig is used transiently to hold configuration from the functional options.
type timerConfig struct {
Tag string
HoldSet bool // Whether the HoldTimestamp was set.
HoldTimestamp mtime.Time
}

Expand All @@ -68,6 +70,7 @@ func WithTag(tag string) timerOptions {
// WithOutputTimestamp sets the output timestamp for the timer.
func WithOutputTimestamp(outputTimestamp time.Time) timerOptions {
return func(tm *timerConfig) {
tm.HoldSet = true
tm.HoldTimestamp = mtime.FromTime(outputTimestamp)
}
}
Expand Down Expand Up @@ -108,7 +111,7 @@ func (et EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOpti
opt(&tc)
}
tm := TimerMap{Family: et.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)}
if !tc.HoldTimestamp.ToTime().IsZero() {
if tc.HoldSet {
tm.HoldTimestamp = tc.HoldTimestamp
}
p.Set(tm)
Expand Down Expand Up @@ -142,7 +145,7 @@ func (pt ProcessingTime) Set(p Provider, FiringTimestamp time.Time, opts ...time
opt(&tc)
}
tm := TimerMap{Family: pt.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)}
if !tc.HoldTimestamp.ToTime().IsZero() {
if tc.HoldSet {
tm.HoldTimestamp = tc.HoldTimestamp
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reade
}
}

// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the given coder.
// pullDecoderNoAlloc returns a function that decodes a single element of the given coder.
// Intended to only be used as an internal function for pullDecoder, which will use a io.TeeReader
// to extract the bytes.
func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) {
Expand Down
16 changes: 16 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ type StateData struct {
Multimap map[string][][]byte
}

// TimerKey is for use as a key for timers.
type TimerKey struct {
Transform, Family string
}

// TentativeData is where data for in progress bundles is put
// until the bundle executes successfully.
type TentativeData struct {
Raw map[string][][]byte

// state is a map from transformID + UserStateID, to window, to userKey, to datavalues.
state map[LinkID]map[typex.Window]map[string]StateData
// timers is a map from the Timer transform+family to the encoded timer.
timers map[TimerKey][][]byte
}

// WriteData adds data to a given global collectionID.
Expand All @@ -49,6 +56,15 @@ func (d *TentativeData) WriteData(colID string, data []byte) {
d.Raw[colID] = append(d.Raw[colID], data)
}

// WriteTimers adds timers to the associated transform handler.
func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte) {
if d.timers == nil {
d.timers = map[TimerKey][][]byte{}
}
link := TimerKey{Transform: transformID, Family: familyID}
d.timers[link] = append(d.timers[link], timers)
}

func (d *TentativeData) toWindow(wKey []byte) typex.Window {
if len(wKey) == 0 {
return window.GlobalWindow{}
Expand Down
Loading