Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.Windo
}
wc, err := b.coders.WindowCoder(ws.GetWindowCoderId())
if err != nil {
return nil, nil, err
return nil, nil, errors.Errorf("could not unmarshal window coder for pcollection %v: %w", id, err)
}
return c, wc, nil
}
Expand Down
14 changes: 7 additions & 7 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error) {

c, err := b.peek(id)
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal window coder: %w", err)
}

w, err := urnToWindowCoder(c.GetSpec().GetUrn())
Expand Down Expand Up @@ -218,7 +218,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
id := components[1]
elm, err := b.peek(id)
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal kv coder value component: %w", err)
}

switch elm.GetSpec().GetUrn() {
Expand Down Expand Up @@ -261,7 +261,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,

sub, err := b.peek(components[0])
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal length prefix coder component: %w", err)
}

// No payload means this coder was length prefixed by the runner
Expand Down Expand Up @@ -307,7 +307,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}
w, err := b.WindowCoder(components[1])
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal window coder: %w", err)
}
t := typex.New(typex.WindowedValueType, elm.T)
wvc := &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}
w, err := b.WindowCoder(components[1])
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal window coder for timer: %w", err)
}
return coder.NewT(elm, w), nil
case urnRowCoder:
Expand Down Expand Up @@ -389,7 +389,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
case urnGlobalWindow:
w, err := b.WindowCoder(id)
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal global window coder: %w", err)
}
return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{})(nil)).Elem()), Window: w}, nil
default:
Expand All @@ -400,7 +400,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
func (b *CoderUnmarshaller) peek(id string) (*pipepb.Coder, error) {
c, ok := b.models[id]
if !ok {
return nil, errors.Errorf("coder with id %v not found", id)
return nil, errors.Errorf("(peek) coder with id %v not found", id)
}
return c, nil
}
Expand Down
82 changes: 71 additions & 11 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type ElementManager struct {
stages map[string]*stageState // The state for each stage.

consumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as primary input.
sideConsumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as side input.
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.

pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.

Expand All @@ -131,12 +131,17 @@ type ElementManager struct {
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
}

// LinkID represents a fully qualified input or output.
type LinkID struct {
Transform, Local, Global string
}

func NewElementManager(config Config) *ElementManager {
return &ElementManager{
config: config,
stages: map[string]*stageState{},
consumers: map[string][]string{},
sideConsumers: map[string][]string{},
sideConsumers: map[string][]LinkID{},
pcolParents: map[string]string{},
watermarkRefreshes: set[string]{},
inprogressBundles: set[string]{},
Expand All @@ -146,9 +151,9 @@ func NewElementManager(config Config) *ElementManager {

// AddStage adds a stage to this element manager, connecting it's PCollections and
// nodes to the watermark propagation graph.
func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []string) {
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID) {
slog.Debug("AddStage", slog.String("ID", ID), slog.Any("inputs", inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
ss := makeStageState(ID, inputIDs, sides, outputIDs)
ss := makeStageState(ID, inputIDs, outputIDs, sides)

em.stages[ss.ID] = ss
for _, outputIDs := range ss.outputIDs {
Expand All @@ -158,7 +163,9 @@ func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []strin
em.consumers[input] = append(em.consumers[input], ss.ID)
}
for _, side := range ss.sides {
em.sideConsumers[side] = append(em.sideConsumers[side], ss.ID)
// Note that we use the StageID as the global ID in the value since we need
// to be able to look up the consuming stage, from the global PCollectionID.
em.sideConsumers[side.Global] = append(em.sideConsumers[side.Global], LinkID{Global: ss.ID, Local: side.Local, Transform: side.Transform})
}
}

Expand Down Expand Up @@ -363,6 +370,11 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
consumer := em.stages[sID]
consumer.AddPending(newPending)
}
sideConsumers := em.sideConsumers[output]
for _, link := range sideConsumers {
consumer := em.stages[link.Global]
consumer.AddPendingSide(newPending, link.Transform, link.Local)
}
}

// Return unprocessed to this stage's pending
Expand Down Expand Up @@ -489,7 +501,7 @@ type stageState struct {
ID string
inputID string // PCollection ID of the parallel input
outputIDs []string // PCollection IDs of outputs to update consumers.
sides []string // PCollection IDs of side inputs that can block execution.
sides []LinkID // PCollection IDs of side inputs that can block execution.

// Special handling bits
aggregate bool // whether this state needs to block for aggregation.
Expand All @@ -501,12 +513,13 @@ type stageState struct {
output mtime.Time // Output watermark for the whole stage
estimatedOutput mtime.Time // Estimated watermark output from DoFns

pending elementHeap // pending input elements for this stage that are to be processesd
inprogress map[string]elements // inprogress elements by active bundles, keyed by bundle
pending elementHeap // pending input elements for this stage that are to be processesd
inprogress map[string]elements // inprogress elements by active bundles, keyed by bundle
sideInputs map[LinkID]map[typex.Window][][]byte // side input data for this stage, from {tid, inputID} -> window
}

// makeStageState produces an initialized stageState.
func makeStageState(ID string, inputIDs, sides, outputIDs []string) *stageState {
func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *stageState {
ss := &stageState{
ID: ID,
outputIDs: outputIDs,
Expand Down Expand Up @@ -536,6 +549,42 @@ func (ss *stageState) AddPending(newPending []element) {
heap.Init(&ss.pending)
}

// AddPendingSide adds elements to be consumed as side inputs.
func (ss *stageState) AddPendingSide(newPending []element, tID, inputID string) {
ss.mu.Lock()
defer ss.mu.Unlock()
if ss.sideInputs == nil {
ss.sideInputs = map[LinkID]map[typex.Window][][]byte{}
}
key := LinkID{Transform: tID, Local: inputID}
in, ok := ss.sideInputs[key]
if !ok {
in = map[typex.Window][][]byte{}
ss.sideInputs[key] = in
}
for _, e := range newPending {
in[e.window] = append(in[e.window], e.elmBytes)
}
}

func (ss *stageState) GetSideData(tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte {
ss.mu.Lock()
defer ss.mu.Unlock()

d := ss.sideInputs[LinkID{Transform: tID, Local: inputID}]
ret := map[typex.Window][][]byte{}
for win, ds := range d {
if win.MaxTimestamp() <= watermark {
ret[win] = ds
}
}
return ret
}

func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte {
return em.stages[sID].GetSideData(tID, inputID, watermark)
}

// updateUpstreamWatermark is for the parent of the input pcollection
// to call, to update downstream stages with it's current watermark.
// This avoids downstream stages inverting lock orderings from
Expand Down Expand Up @@ -699,7 +748,18 @@ func (ss *stageState) updateWatermarks(minPending, minStateHold mtime.Time, em *
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID)
refreshes.insert(sID.Global)
}
}
// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
}
}
}
Expand All @@ -725,7 +785,7 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) {
}
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side]
pID, ok := em.pcolParents[side.Global]
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestElementHeap(t *testing.T) {
func TestStageState_minPendingTimestamp(t *testing.T) {

newState := func() *stageState {
return makeStageState("test", []string{"testInput"}, nil, []string{"testOutput"})
return makeStageState("test", []string{"testInput"}, []string{"testOutput"}, nil)
}
t.Run("noElements", func(t *testing.T) {
ss := newState()
Expand Down Expand Up @@ -188,21 +188,21 @@ func TestStageState_minPendingTimestamp(t *testing.T) {
}

func TestStageState_UpstreamWatermark(t *testing.T) {
impulse := makeStageState("impulse", nil, nil, []string{"output"})
impulse := makeStageState("impulse", nil, []string{"output"}, nil)
_, up := impulse.UpstreamWatermark()
if got, want := up, mtime.MaxTimestamp; got != want {
t.Errorf("impulse.UpstreamWatermark() = %v, want %v", got, want)
}

dofn := makeStageState("dofn", []string{"input"}, nil, []string{"output"})
dofn := makeStageState("dofn", []string{"input"}, []string{"output"}, nil)
dofn.updateUpstreamWatermark("input", 42)

_, up = dofn.UpstreamWatermark()
if got, want := up, mtime.Time(42); got != want {
t.Errorf("dofn.UpstreamWatermark() = %v, want %v", got, want)
}

flatten := makeStageState("flatten", []string{"a", "b", "c"}, nil, []string{"output"})
flatten := makeStageState("flatten", []string{"a", "b", "c"}, []string{"output"}, nil)
flatten.updateUpstreamWatermark("a", 50)
flatten.updateUpstreamWatermark("b", 42)
flatten.updateUpstreamWatermark("c", 101)
Expand All @@ -216,7 +216,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
inputCol := "testInput"
outputCol := "testOutput"
newState := func() (*stageState, *stageState, *ElementManager) {
underTest := makeStageState("underTest", []string{inputCol}, nil, []string{outputCol})
underTest := makeStageState("underTest", []string{inputCol}, []string{outputCol}, nil)
outStage := makeStageState("outStage", []string{outputCol}, nil, nil)
em := &ElementManager{
consumers: map[string][]string{
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
func TestElementManager(t *testing.T) {
t.Run("impulse", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"output"})
em.AddStage("impulse", nil, []string{"output"}, nil)
em.AddStage("dofn", []string{"output"}, nil, nil)

em.Impulse("impulse")
Expand Down Expand Up @@ -370,8 +370,8 @@ func TestElementManager(t *testing.T) {

t.Run("dofn", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"input"})
em.AddStage("dofn1", []string{"input"}, nil, []string{"output"})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"output"}, nil, nil)
em.Impulse("impulse")

Expand Down Expand Up @@ -421,9 +421,9 @@ func TestElementManager(t *testing.T) {

t.Run("side", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"input"})
em.AddStage("dofn1", []string{"input"}, nil, []string{"output"})
em.AddStage("dofn2", []string{"input"}, []string{"output"}, nil)
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"input"}, nil, []LinkID{{Transform: "dofn2", Global: "output", Local: "local"}})
em.Impulse("impulse")

var i int
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestElementManager(t *testing.T) {
})
t.Run("residual", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"input"})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")

Expand Down
16 changes: 6 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
stages := map[string]*stage{}
var impulses []string

// Inialize the "dataservice cache" to support side inputs.
// TODO(https://github.com/apache/beam/issues/28543), remove this concept.
ds := &worker.DataService{}

for i, stage := range topo {
tid := stage.transforms[0]
t := ts[tid]
Expand Down Expand Up @@ -206,7 +202,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic

switch urn {
case urns.TransformGBK:
em.AddStage(stage.ID, []string{getOnlyValue(t.GetInputs())}, nil, []string{getOnlyValue(t.GetOutputs())})
em.AddStage(stage.ID, []string{getOnlyValue(t.GetInputs())}, []string{getOnlyValue(t.GetOutputs())}, nil)
for _, global := range t.GetInputs() {
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
Expand All @@ -221,22 +217,22 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
em.StageAggregates(stage.ID)
case urns.TransformImpulse:
impulses = append(impulses, stage.ID)
em.AddStage(stage.ID, nil, nil, []string{getOnlyValue(t.GetOutputs())})
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
case urns.TransformFlatten:
inputs := maps.Values(t.GetInputs())
sort.Strings(inputs)
em.AddStage(stage.ID, inputs, nil, []string{getOnlyValue(t.GetOutputs())})
em.AddStage(stage.ID, inputs, []string{getOnlyValue(t.GetOutputs())}, nil)
}
stages[stage.ID] = stage
case wk.Env:
if err := buildDescriptor(stage, comps, wk, ds); err != nil {
if err := buildDescriptor(stage, comps, wk, em); err != nil {
return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err)
}
stages[stage.ID] = stage
slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
outputs := maps.Keys(stage.OutputsToCoders)
sort.Strings(outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, stage.sides, outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, outputs, stage.sideInputs)
default:
err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId())
slog.Error("Execute", err)
Expand Down Expand Up @@ -273,7 +269,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
defer func() { <-maxParallelism }()
s := stages[rb.StageID]
wk := wks[s.envID]
if err := s.Execute(ctx, j, wk, ds, comps, em, rb); err != nil {
if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
// Ensure we clean up on bundle failure
em.FailBundle(rb)
bundleFailed <- err
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
}

func executeWithT(ctx context.Context, t testing.TB, p *beam.Pipeline) (beam.PipelineResult, error) {
t.Helper()
t.Log("startingTest - ", t.Name())
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
Expand Down
Loading