From 07600d78035ef038eee66b5035c55cbecaea6157 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:53:27 -0700 Subject: [PATCH 1/9] coder debug. --- .../pkg/beam/runners/prism/internal/coders.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index b7157b8598de..48722a3d0b5e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" @@ -250,6 +251,9 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i case urns.CoderKV: ccids := c.GetComponentCoderIds() + if len(ccids) != 2 { + panic(fmt.Sprintf("KV coder with more than 2 components: %s", prototext.Format(c))) + } kd := pullDecoderNoAlloc(coders[ccids[0]], coders) vd := pullDecoderNoAlloc(coders[ccids[1]], coders) return func(r io.Reader) { @@ -262,3 +266,21 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i panic(fmt.Sprintf("unknown coder urn key: %v", urn)) } } + +// debugCoder is developer code to get the structure of a proto coder visible when +// debugging coder errors in prism. It may sometimes be unused. +func debugCoder(cid string, coders map[string]*pipepb.Coder) string { + var b strings.Builder + b.WriteString(cid) + b.WriteRune('\n') + c := coders[cid] + if len(c.ComponentCoderIds) > 0 { + b.WriteRune('\t') + b.WriteString(strings.Join(c.ComponentCoderIds, ", ")) + b.WriteRune('\n') + for _, ccid := range c.GetComponentCoderIds() { + b.WriteString(debugCoder(ccid, coders)) + } + } + return b.String() +} From b9d532cc5c3faa7de18a3f20ffdb23127f29ac2b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:54:32 -0700 Subject: [PATCH 2/9] Flatten coder re-write --- .../beam/runners/prism/internal/handlerunner.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 59e926754821..a205c768731b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -99,6 +99,20 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C } } + // Change the coders of PCollections being input into a flatten to match the + // Flatten's output coder. They must be compatible SDK side anyway, so ensure + // they're written out to the runner in the same fashion. + // This may stop being necessary once Flatten Unzipping happens in the optimizer. + outPCol := comps.GetPcollections()[outColID] + outCoder := comps.GetCoders()[outPCol.GetCoderId()] + coderSubs := map[string]*pipepb.Coder{} + for _, p := range t.GetInputs() { + inPCol := comps.GetPcollections()[p] + if inPCol.CoderId != outPCol.CoderId { + coderSubs[inPCol.CoderId] = outCoder + } + } + // Return the new components which is the transforms consumer return prepareResult{ // We sub this flatten with itself, to not drop it. @@ -106,6 +120,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C Transforms: map[string]*pipepb.PTransform{ tid: t, }, + Coders: coderSubs, }, RemovedLeaves: nil, ForcedRoots: forcedRoots, From d22a409f5b60c29ea0c5f654fa34c33d0cc884d9 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:54:48 -0700 Subject: [PATCH 3/9] remove noisy composite log --- sdks/go/pkg/beam/runners/prism/internal/preprocess.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index e357714166a5..95f6af18ac74 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -106,11 +106,6 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components, j *jobservices. // If there's an unknown urn, and it's not composite, simply add it to the leaves. if len(t.GetSubtransforms()) == 0 { leaves[tid] = struct{}{} - } else { - slog.Info("composite transform has unknown urn", - slog.Group("transform", slog.String("ID", tid), - slog.String("name", t.GetUniqueName()), - slog.String("urn", spec.GetUrn()))) } continue } From f403a50019f6a56d325fbb4974b942dea11925bb Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:55:21 -0700 Subject: [PATCH 4/9] Catch construction time panics and return as errors to avoid crashing prism. --- .../pkg/beam/runners/prism/internal/stage.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index a8b8bdd918e4..9d1c8481d65e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -310,7 +310,18 @@ func portFor(wInCid string, wk *worker.W) []byte { // It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage. // // Because we need the local ids for routing the sources/sinks information. -func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) error { +func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) (err error) { + // Catch construction time panics and produce them as errors out. + defer func() { + if r := recover(); r != nil { + switch rt := r.(type) { + case error: + err = rt + default: + err = fmt.Errorf("%v", r) + } + } + }() // Assume stage has an indicated primary input coders := map[string]*pipepb.Coder{} @@ -484,6 +495,11 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng for _, pid := range stg.internalCols { lpUnknownCoders(comps.GetPcollections()[pid].GetCoderId(), coders, comps.GetCoders()) } + // Add coders for all windowing strategies. + // TODO: filter PCollections, filter windowing strategies by Pcollections instead. + for _, ws := range comps.GetWindowingStrategies() { + lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) + } reconcileCoders(coders, comps.GetCoders()) From 828a4561141ffd19be1f9cbd6e5945e696c5d3e0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:01:08 -0700 Subject: [PATCH 5/9] Relax certain features to allows Python tests to run. --- .../prism/internal/jobservices/management.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 3526ee00cc1f..ecd51a54700a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -198,16 +198,25 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } if !bypassedWindowingStrategies[wsID] { check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS) - check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) - // Non nil triggers should fail. + + // Allow earliest and latest in pane to unblock running python tasks. + // Tests actually using the set behavior will fail. + check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW, + pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE) + // Non default triggers should fail. if ws.GetTrigger().GetDefault() == nil { dt := &pipepb.Trigger{ Trigger: &pipepb.Trigger_Default_{}, } + // Allow Never and Always triggers to unblock iteration on Java and Python SDKs. + // Without multiple firings, these will be very similar to the default trigger. nt := &pipepb.Trigger{ Trigger: &pipepb.Trigger_Never_{}, } - check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String()) + at := &pipepb.Trigger{ + Trigger: &pipepb.Trigger_Always_{}, + } + check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String(), at.String()) } } } From 01ab2a252516a55d1f54d1dd2586e90058f4cc4b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:01:36 -0700 Subject: [PATCH 6/9] Gracefully stop GRPC server to make python runs quiet. --- .../pkg/beam/runners/prism/internal/worker/worker.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 47fc2cccfc54..d02b76b28c1e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -28,6 +28,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -139,7 +140,15 @@ func (wk *W) Stop() { wk.stopped.Store(true) close(wk.InstReqs) close(wk.DataReqs) - wk.server.Stop() + + // Give the SDK side 5 seconds to gracefully stop, before + // hard stopping all RPCs. + tim := time.AfterFunc(5*time.Second, func() { + wk.server.Stop() + }) + wk.server.GracefulStop() + tim.Stop() + wk.lis.Close() slog.Debug("stopped", "worker", wk) } From 62bf4ebfa0b94fe87a91f8fdaa20f69feaedc508 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:18:00 -0700 Subject: [PATCH 7/9] Move now "passing" pane test. --- .../beam/runners/prism/internal/unimplemented_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 27fc7f76bbc8..6afb04521af0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -44,7 +44,6 @@ func TestUnimplemented(t *testing.T) { // {pipeline: primitives.Drain}, // Can't test drain automatically yet. // Triggers (Need teststream and are unimplemented.) - {pipeline: primitives.TriggerAlways}, {pipeline: primitives.TriggerAfterAll}, {pipeline: primitives.TriggerAfterAny}, {pipeline: primitives.TriggerAfterEach}, @@ -54,9 +53,7 @@ func TestUnimplemented(t *testing.T) { {pipeline: primitives.TriggerElementCount}, {pipeline: primitives.TriggerOrFinally}, {pipeline: primitives.TriggerRepeat}, - - // Needs triggers. - {pipeline: primitives.Panes}, + {pipeline: primitives.TriggerAlways}, } for _, test := range tests { @@ -86,7 +83,12 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.Checkpoints}, {pipeline: primitives.CoGBK}, {pipeline: primitives.ReshuffleKV}, + + // The following have been "allowed" to unblock further development + // But it's not clear these tests truly validate the expected behavior + // of the triggers or panes. {pipeline: primitives.TriggerNever}, + {pipeline: primitives.Panes}, } for _, test := range tests { From 9161c8eaa5c2051903a6c337be7ea3ea231a78b6 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:29:40 -0700 Subject: [PATCH 8/9] underscore the debug coder dumper. --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 48722a3d0b5e..c9577ca779c5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -267,6 +267,8 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i } } +var _ = debugCoder + // debugCoder is developer code to get the structure of a proto coder visible when // debugging coder errors in prism. It may sometimes be unused. func debugCoder(cid string, coders map[string]*pipepb.Coder) string { From 99f64497a66f5f36c2072b85a12aec1306e485d1 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:43:08 -0700 Subject: [PATCH 9/9] Move comment. --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index c9577ca779c5..6fdaf804a34f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -267,10 +267,11 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i } } +// debugCoder is developer code to get the structure of a proto coder visible when +// debugging coder errors in prism. It may sometimes be unused, so we do this to avoid +// linting errors. var _ = debugCoder -// debugCoder is developer code to get the structure of a proto coder visible when -// debugging coder errors in prism. It may sometimes be unused. func debugCoder(cid string, coders map[string]*pipepb.Coder) string { var b strings.Builder b.WriteString(cid)