Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -80,19 +81,24 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str
return wvcID, nil
}

// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn.
func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
// makeWindowCoders categorizes and provides the encoder, decoder pair for the type of window.
func makeWindowCoders(wc *pipepb.Coder) (engine.WinCoderType, exec.WindowDecoder, exec.WindowEncoder) {
var cwc *coder.WindowCoder
var winCoder engine.WinCoderType
switch wc.GetSpec().GetUrn() {
case urns.CoderGlobalWindow:
winCoder = engine.WinGlobal
cwc = coder.NewGlobalWindow()
case urns.CoderIntervalWindow:
winCoder = engine.WinInterval
cwc = coder.NewIntervalWindow()
default:
// TODO(https://github.com/apache/beam/issues/31921): Support custom windowfns instead of panicking here.
winCoder = engine.WinCustom
slog.LogAttrs(context.TODO(), slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn()))
panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", prototext.Format(wc)))
}
return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)
return winCoder, exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)
}

// lpUnknownCoders takes a coder, and populates coders with any new coders
Expand Down
16 changes: 11 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/coders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
Expand Down Expand Up @@ -91,22 +92,27 @@ func Test_makeWindowedValueCoder(t *testing.T) {

func Test_makeWindowCoders(t *testing.T) {
tests := []struct {
urn string
window typex.Window
urn string
window typex.Window
coderType engine.WinCoderType
}{
{urns.CoderGlobalWindow, window.GlobalWindow{}},
{urns.CoderGlobalWindow, window.GlobalWindow{}, engine.WinGlobal},
{urns.CoderIntervalWindow, window.IntervalWindow{
Start: mtime.MinTimestamp,
End: mtime.MaxTimestamp,
}},
}, engine.WinInterval},
}
for _, test := range tests {
undertest := &pipepb.Coder{
Spec: &pipepb.FunctionSpec{
Urn: test.urn,
},
}
dec, enc := makeWindowCoders(undertest)
gotCoderType, dec, enc := makeWindowCoders(undertest)

if got, want := gotCoderType, test.coderType; got != want {
t.Errorf("makeWindowCoders returned different coder type: got %v, want %v", got, want)
}

// Validate we're getting a round trip coder.
var buf bytes.Buffer
Expand Down
34 changes: 28 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,35 @@ type elements struct {
}

type PColInfo struct {
GlobalID string
WDec exec.WindowDecoder
WEnc exec.WindowEncoder
EDec func(io.Reader) []byte
KeyDec func(io.Reader) []byte
GlobalID string
WindowCoder WinCoderType
WDec exec.WindowDecoder
WEnc exec.WindowEncoder
EDec func(io.Reader) []byte
KeyDec func(io.Reader) []byte
}

// WinCoderType indicates what kind of coder
// the window is using. There are only 3
// valid single window encodings.
//
// - Global (for Global windows)
// - Interval (for fixed, sliding, and session windows)
// - Custom (for custom user windows)
//
// TODO: Handle custom variants with built in "known" coders, and length prefixed ones as separate cases.
// As a rule we don't care about the bytes, but we do need to be able to get to the next element.
type WinCoderType int

const (
// WinGlobal indicates the window is empty coded, with 0 bytes.
WinGlobal WinCoderType = iota
// WinInterval indicates the window is interval coded with the end event time timestamp followed by the duration in milliseconds
WinInterval
// WinCustom indicates the window customm coded with end event time timestamp followed by a custom coder.
WinCustom
)

// ToData recodes the elements with their approprate windowed value header.
func (es elements) ToData(info PColInfo) [][]byte {
var ret [][]byte
Expand Down Expand Up @@ -870,7 +892,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
keyToTimers := map[timerKey]element{}
for _, t := range timers {
// TODO: Call in a for:range loop when Beam's minimum Go version hits 1.23.0
iter := decodeTimerIter(inputInfo.KeyDec, true, t)
iter := decodeTimerIter(inputInfo.KeyDec, inputInfo.WindowCoder, t)
iter(func(ret timerRet) bool {
for _, e := range ret.elms {
keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
Expand Down
68 changes: 58 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,31 @@ type timerRet struct {
// If the timer has been cleared, no elements will be returned. Any existing timers
// for the tag *must* be cleared from the pending queue. The windows associated with
// the clear are provided to be able to delete pending timers.
func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byte) func(func(timerRet) bool) {
func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder WinCoderType, raw []byte) func(func(timerRet) bool) {

var singleWindowExtractor func(*decoder) typex.Window
switch winCoder {
case WinGlobal:
singleWindowExtractor = func(*decoder) typex.Window {
return window.GlobalWindow{}
}
case WinInterval:
singleWindowExtractor = func(d *decoder) typex.Window {
return d.IntervalWindow()
}
case WinCustom:
// Default to a length prefixed window coder here until we have different information.
// Everything else is either:: variable, 1, 4, or 8 bytes long
// KVs (especially nested ones, could occur but are unlikely, and it would be
// easier for Prism to force such coders to be length prefixed.
singleWindowExtractor = func(d *decoder) typex.Window {
return d.CustomWindowLengthPrefixed()
}
default:
// Unsupported
panic(fmt.Sprintf("unsupported WindowCoder Type: %v", winCoder))
}

return func(yield func(timerRet) bool) {
for len(raw) > 0 {
keyBytes := keyDec(bytes.NewBuffer(raw))
Expand All @@ -55,15 +79,8 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw [

var ws []typex.Window
numWin := d.Fixed32()
if usesGlobalWindow {
for i := 0; i < int(numWin); i++ {
ws = append(ws, window.GlobalWindow{})
}
} else {
// Assume interval windows here, since we don't understand custom windows yet.
for i := 0; i < int(numWin); i++ {
ws = append(ws, d.IntervalWindow())
}
for i := 0; i < int(numWin); i++ {
ws = append(ws, singleWindowExtractor(&d))
}

clear := d.Bool()
Expand Down Expand Up @@ -149,6 +166,37 @@ func (d *decoder) IntervalWindow() window.IntervalWindow {
}
}

// CustomWindowLengthPrefixed assumes the custom window coder is a variable, length prefixed type
// such as string, bytes, or a length prefix wrapped coder.
func (d *decoder) CustomWindowLengthPrefixed() customWindow {
end := d.Timestamp()

customStart := d.cursor
l := d.Varint()
endCursor := d.cursor + int(l)
d.cursor = endCursor
return customWindow{
End: end,
Custom: d.raw[customStart:endCursor],
}
}

type customWindow struct {
End typex.EventTime
Custom []byte // The custom portion of the window, ignored by the runner
}

func (w customWindow) MaxTimestamp() typex.EventTime {
return w.End
}

func (w customWindow) Equals(o typex.Window) bool {
if c, ok := o.(customWindow); ok {
return w.End == c.End && bytes.Equal(w.Custom, c.Custom)
}
return false
}

func (d *decoder) Byte() byte {
defer func() {
d.cursor += 1
Expand Down
39 changes: 21 additions & 18 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,33 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic

col := comps.GetPcollections()[onlyOut]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)

var kd func(io.Reader) []byte
if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {
kd = collectionPullDecoder(kcid, coders, comps)
}
stage.OutputsToCoders[onlyOut] = engine.PColInfo{
GlobalID: onlyOut,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: onlyOut,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}

// There's either 0, 1 or many inputs, but they should be all the same
// so break after the first one.
for _, global := range t.GetInputs() {
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)
stage.inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
GlobalID: global,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
}
break
}
Expand All @@ -222,18 +224,19 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
for _, global := range t.GetInputs() {
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)

var kd func(io.Reader) []byte
if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {
kd = collectionPullDecoder(kcid, coders, comps)
}
stage.inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: global,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}
}
em.StageAggregates(stage.ID)
Expand Down Expand Up @@ -378,7 +381,7 @@ func extractKVCoderID(coldCId string, coders map[string]*pipepb.Coder) (string,
return "", false
}

func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (engine.WinCoderType, exec.WindowDecoder, exec.WindowEncoder) {
ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime()))
}

wDec, wEnc := makeWindowCoders(wc)
_, wDec, wEnc := makeWindowCoders(wc)

type keyTime struct {
key []byte
Expand Down
26 changes: 14 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,15 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
kd = collectionPullDecoder(kcid, coders, comps)
}

wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)
sink2Col[sinkID] = o.Global
col2Coders[o.Global] = engine.PColInfo{
GlobalID: o.Global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: o.Global,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}
transforms[sinkID] = sinkTransform(sinkID, portFor(wOutCid, wk), o.Global)
}
Expand Down Expand Up @@ -493,19 +494,20 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w\n%v", stg.ID, stg.primaryInput, prototext.Format(col), err, stg.transforms)
}
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
winCoder, wDec, wEnc := getWindowValueCoders(comps, col, coders)

var kd func(io.Reader) []byte
if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {
kd = collectionPullDecoder(kcid, coders, comps)
}

inputInfo := engine.PColInfo{
GlobalID: stg.primaryInput,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
GlobalID: stg.primaryInput,
WindowCoder: winCoder,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
KeyDec: kd,
}

stg.inputTransformID = stg.ID + "_source"
Expand Down