Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
Expand Down Expand Up @@ -250,6 +251,9 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i

case urns.CoderKV:
ccids := c.GetComponentCoderIds()
if len(ccids) != 2 {
panic(fmt.Sprintf("KV coder with more than 2 components: %s", prototext.Format(c)))
}
kd := pullDecoderNoAlloc(coders[ccids[0]], coders)
vd := pullDecoderNoAlloc(coders[ccids[1]], coders)
return func(r io.Reader) {
Expand All @@ -262,3 +266,24 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i
panic(fmt.Sprintf("unknown coder urn key: %v", urn))
}
}

// debugCoder is developer code to get the structure of a proto coder visible when
// debugging coder errors in prism. It may sometimes be unused, so we do this to avoid
// linting errors.
var _ = debugCoder

func debugCoder(cid string, coders map[string]*pipepb.Coder) string {
var b strings.Builder
b.WriteString(cid)
b.WriteRune('\n')
c := coders[cid]
if len(c.ComponentCoderIds) > 0 {
b.WriteRune('\t')
b.WriteString(strings.Join(c.ComponentCoderIds, ", "))
b.WriteRune('\n')
for _, ccid := range c.GetComponentCoderIds() {
b.WriteString(debugCoder(ccid, coders))
}
}
return b.String()
}
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,28 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
}
}

// Change the coders of PCollections being input into a flatten to match the
// Flatten's output coder. They must be compatible SDK side anyway, so ensure
// they're written out to the runner in the same fashion.
// This may stop being necessary once Flatten Unzipping happens in the optimizer.
outPCol := comps.GetPcollections()[outColID]
outCoder := comps.GetCoders()[outPCol.GetCoderId()]
coderSubs := map[string]*pipepb.Coder{}
for _, p := range t.GetInputs() {
inPCol := comps.GetPcollections()[p]
if inPCol.CoderId != outPCol.CoderId {
coderSubs[inPCol.CoderId] = outCoder
}
}

// Return the new components which is the transforms consumer
return prepareResult{
// We sub this flatten with itself, to not drop it.
SubbedComps: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
tid: t,
},
Coders: coderSubs,
},
RemovedLeaves: nil,
ForcedRoots: forcedRoots,
Expand Down
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,25 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
}
if !bypassedWindowingStrategies[wsID] {
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS)
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW)
// Non nil triggers should fail.

// Allow earliest and latest in pane to unblock running python tasks.
// Tests actually using the set behavior will fail.
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW,
pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE)
// Non default triggers should fail.
if ws.GetTrigger().GetDefault() == nil {
dt := &pipepb.Trigger{
Trigger: &pipepb.Trigger_Default_{},
}
// Allow Never and Always triggers to unblock iteration on Java and Python SDKs.
// Without multiple firings, these will be very similar to the default trigger.
nt := &pipepb.Trigger{
Trigger: &pipepb.Trigger_Never_{},
}
check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String())
at := &pipepb.Trigger{
Trigger: &pipepb.Trigger_Always_{},
}
check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String(), at.String())
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components, j *jobservices.
// If there's an unknown urn, and it's not composite, simply add it to the leaves.
if len(t.GetSubtransforms()) == 0 {
leaves[tid] = struct{}{}
} else {
slog.Info("composite transform has unknown urn",
slog.Group("transform", slog.String("ID", tid),
slog.String("name", t.GetUniqueName()),
slog.String("urn", spec.GetUrn())))
}
continue
}
Expand Down
18 changes: 17 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,18 @@ func portFor(wInCid string, wk *worker.W) []byte {
// It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage.
//
// Because we need the local ids for routing the sources/sinks information.
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) error {
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) (err error) {
// Catch construction time panics and produce them as errors out.
defer func() {
if r := recover(); r != nil {
switch rt := r.(type) {
case error:
err = rt
default:
err = fmt.Errorf("%v", r)
}
}
}()
// Assume stage has an indicated primary input

coders := map[string]*pipepb.Coder{}
Expand Down Expand Up @@ -484,6 +495,11 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
for _, pid := range stg.internalCols {
lpUnknownCoders(comps.GetPcollections()[pid].GetCoderId(), coders, comps.GetCoders())
}
// Add coders for all windowing strategies.
// TODO: filter PCollections, filter windowing strategies by Pcollections instead.
for _, ws := range comps.GetWindowingStrategies() {
lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
}

reconcileCoders(coders, comps.GetCoders())

Expand Down
10 changes: 6 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func TestUnimplemented(t *testing.T) {
// {pipeline: primitives.Drain}, // Can't test drain automatically yet.

// Triggers (Need teststream and are unimplemented.)
{pipeline: primitives.TriggerAlways},
{pipeline: primitives.TriggerAfterAll},
{pipeline: primitives.TriggerAfterAny},
{pipeline: primitives.TriggerAfterEach},
Expand All @@ -54,9 +53,7 @@ func TestUnimplemented(t *testing.T) {
{pipeline: primitives.TriggerElementCount},
{pipeline: primitives.TriggerOrFinally},
{pipeline: primitives.TriggerRepeat},

// Needs triggers.
{pipeline: primitives.Panes},
{pipeline: primitives.TriggerAlways},
}

for _, test := range tests {
Expand Down Expand Up @@ -86,7 +83,12 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.Checkpoints},
{pipeline: primitives.CoGBK},
{pipeline: primitives.ReshuffleKV},

// The following have been "allowed" to unblock further development
// But it's not clear these tests truly validate the expected behavior
// of the triggers or panes.
{pipeline: primitives.TriggerNever},
{pipeline: primitives.Panes},
}

for _, test := range tests {
Expand Down
11 changes: 10 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
Expand Down Expand Up @@ -139,7 +140,15 @@ func (wk *W) Stop() {
wk.stopped.Store(true)
close(wk.InstReqs)
close(wk.DataReqs)
wk.server.Stop()

// Give the SDK side 5 seconds to gracefully stop, before
// hard stopping all RPCs.
tim := time.AfterFunc(5*time.Second, func() {
wk.server.Stop()
})
wk.server.GracefulStop()
tim.Stop()

wk.lis.Close()
slog.Debug("stopped", "worker", wk)
}
Expand Down