From ba026353e48418ad3ea544f6af4f789694c06f77 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 9 Dec 2024 14:11:29 -0800 Subject: [PATCH 1/9] Untilter OnWindowExpiration tests --- runners/prism/java/build.gradle | 2 -- 1 file changed, 2 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 82eb62b9e207..f225a98f7bc8 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -228,8 +228,6 @@ def createPrismValidatesRunnerTask = { name, environmentType -> excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' // Not yet implemented in Prism - // https://github.com/apache/beam/issues/32211 - excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' // https://github.com/apache/beam/issues/32929 excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' From 1e04f6879cd8a70107548ff1a27c94427f5fc74e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:17:46 -0800 Subject: [PATCH 2/9] [#32211] Support OnWindowExpiry in Prism --- runners/prism/java/build.gradle | 16 +- .../prism/internal/engine/elementmanager.go | 188 +++++++++++++++--- .../beam/runners/prism/internal/execute.go | 4 + .../runners/prism/internal/jobservices/job.go | 1 + .../prism/internal/jobservices/management.go | 2 - .../beam/runners/prism/internal/preprocess.go | 6 + .../pkg/beam/runners/prism/internal/stage.go | 22 +- .../runners/prism/internal/worker/bundle.go | 2 +- 8 files changed, 192 insertions(+), 49 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index f225a98f7bc8..ce71151099bd 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -106,6 +106,12 @@ def sickbayTests = [ 'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams', 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger', + // GroupIntoBatchesTest tests that fail: + // Teststream has bad KV encodings due to using an outer context. + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode', + // ShardedKey not yet implemented. + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', + // Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected 'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage', @@ -231,9 +237,13 @@ def createPrismValidatesRunnerTask = { name, environmentType -> // https://github.com/apache/beam/issues/32929 excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' - // Not supported in Portable Java SDK yet. - // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState - excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' + // Not supported in Portable Java SDK yet. + // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState + excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' + + // Processing time with TestStream is unreliable without being able to control + // SDK side time portably. Ignore these tests. + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' } filter { for (String test : sickbayTests) { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 1739efdb742a..5a43f8306df3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -194,9 +194,10 @@ type ElementManager struct { pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection. - refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. - inprogressBundles set[string] // Active bundleIDs - changedStages set[string] // Stages that have changed and need their watermark refreshed. + refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. + inprogressBundles set[string] // Active bundleIDs + changedStages set[string] // Stages that have changed and need their watermark refreshed. + sideChannelBundles []RunBundle // Represents ready to executed bundles prepared on the side by a stage instead of in the main loop, such as for onWindowExpiry, or for Triggers. livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully. @@ -271,6 +272,16 @@ func (em *ElementManager) StageStateful(ID string) { em.stages[ID].stateful = true } +// StageOnWindowExpiration marks the given stage as stateful, which means elements are +// processed by key. +func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) { + ss := em.stages[stageID] + ss.onWindowExpirationFamily = timer.TimerFamily + ss.onWindowExpirationTransform = timer.Transform + ss.keysToExpireByWindow = map[typex.Window]set[string]{} + ss.inProgressExpiredWindows = map[typex.Window]int{} +} + // StageProcessingTimeTimers indicates which timers are processingTime domain timers. func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) { em.stages[ID].processingTimeTimersFamilies = ptTimers @@ -371,7 +382,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. em.changedStages.merge(changedByProcessingTime) // If there are no changed stages or ready processing time events available, we wait until there are. - for len(em.changedStages)+len(changedByProcessingTime) == 0 { + for len(em.changedStages)+len(changedByProcessingTime)+len(em.sideChannelBundles) == 0 { // Check to see if we must exit select { case <-ctx.Done(): @@ -386,6 +397,20 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) } + // Run any side channel bundles first. + for len(em.sideChannelBundles) > 0 { + rb := em.sideChannelBundles[0] + em.sideChannelBundles = em.sideChannelBundles[1:] + em.refreshCond.L.Unlock() + + select { + case <-ctx.Done(): + return + case runStageCh <- rb: + } + slog.Debug("OnWindowExpiration-Bundle sent", slog.Any("bundle", rb), slog.Int64("livePending", em.livePending.Load())) + em.refreshCond.L.Lock() + } // We know there is some work we can do that may advance the watermarks, // refresh them, and see which stages have advanced. @@ -628,6 +653,11 @@ type Block struct { Transform, Family string } +// StaticTimerID represents the static user identifiers for a timer. +type StaticTimerID struct { + Transform, TimerFamily string +} + // StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle. // // TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention. @@ -1068,6 +1098,12 @@ type stageState struct { strat winStrat // Windowing Strategy for aggregation fireings. processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain. + // onWindowExpiration management + onWindowExpirationFamily string // If non-empty, indicates that this stage has an OnWindowExpiration timer that must trigger. + onWindowExpirationTransform string // The transform name of the DoFn with OnWindowExpiration + keysToExpireByWindow map[typex.Window]set[string] + inProgressExpiredWindows map[typex.Window]int + mu sync.Mutex upstreamWatermarks sync.Map // watermark set from inputPCollection's parent. input mtime.Time // input watermark for the parallel input. @@ -1158,6 +1194,14 @@ func (ss *stageState) AddPending(newPending []element) int { timers: map[timerKey]timerTimes{}, } ss.pendingByKeys[string(e.keyBytes)] = dnt + if ss.keysToExpireByWindow != nil { + w, ok := ss.keysToExpireByWindow[e.window] + if !ok { + w = make(set[string]) + ss.keysToExpireByWindow[e.window] = w + } + w.insert(string(e.keyBytes)) + } } heap.Push(&dnt.elements, e) @@ -1555,45 +1599,123 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { if minWatermarkHold < newOut { newOut = minWatermarkHold } - refreshes := set[string]{} + // If the newOut is smaller, then don't change downstream watermarks. + if newOut <= ss.output { + return nil + } + // If bigger, advance the output watermark - if newOut > ss.output { - ss.output = newOut - for _, outputCol := range ss.outputIDs { - consumers := em.consumers[outputCol] - - for _, sID := range consumers { - em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) - refreshes.insert(sID) - } - // Inform side input consumers, but don't update the upstream watermark. - for _, sID := range em.sideConsumers[outputCol] { - refreshes.insert(sID.Global) + var preventDownstreamUpdate bool + for win, keys := range ss.keysToExpireByWindow { + // Check if the window has expired. + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() >= newOut { + continue + } + // We can't advance the output watermark if there's garbage to collect. + preventDownstreamUpdate = true + // Hold off on garbage collecting data for these windows while these + // are in progress. + ss.inProgressExpiredWindows[win] += 1 + + // Produce bundle(s) for these keys and window, and side channel them. + wm := win.MaxTimestamp() + rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + ss.ID + "-" + win.MaxTimestamp().String(), Watermark: wm} + + // Now we need to actually build the bundle. + var toProcess []element + busyKeys := set[string]{} + for k := range keys { + if ss.inprogressKeys.present(k) { + busyKeys.insert(k) + continue } + toProcess = append(toProcess, element{ + window: win, + timestamp: wm, + pane: typex.NoFiringPane(), + holdTimestamp: wm, + transform: ss.onWindowExpirationTransform, + family: ss.onWindowExpirationFamily, + sequence: 1, + keyBytes: []byte(k), + elmBytes: nil, + }) + } + em.addPending(len(toProcess)) + ss.watermarkHolds.Add(wm, 1) + ss.makeInProgressBundle( + func() string { return rb.BundleID }, + toProcess, + wm, + keys, + map[mtime.Time]int{wm: 1}, + ) + + slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("keys", keys), slog.Any("window", win), slog.Any("toProcess", toProcess)) + // We're already in the refreshCond critical section. + // Insert that this is in progress here to avoid a race condition. + em.inprogressBundles.insert(rb.BundleID) + em.sideChannelBundles = append(em.sideChannelBundles, rb) + + // Remove the key accounting, or continue tracking which keys still need clearing. + if len(busyKeys) == 0 { + delete(ss.keysToExpireByWindow, win) + } else { + ss.keysToExpireByWindow[win] = busyKeys } - // Garbage collect state, timers and side inputs, for all windows - // that are before the new output watermark. - // They'll never be read in again. - for _, wins := range ss.sideInputs { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { - delete(wins, win) + } + // Garbage collect state, timers and side inputs, for all windows + // that are before the new output watermark. + // They'll never be read in again. + for _, wins := range ss.sideInputs { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + // Clear out anything we've already used. + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue } + delete(wins, win) } } - for _, wins := range ss.state { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - if win.MaxTimestamp() < newOut { - delete(wins, win) + } + for _, wins := range ss.state { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip collecting this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue } + delete(wins, win) } } } + // If there are windows to expire, we don't update the output watermark yet. + if preventDownstreamUpdate { + return nil + } + + // Update this stage's output watermark, and then propagate that to downstream stages + refreshes := set[string]{} + ss.output = newOut + for _, outputCol := range ss.outputIDs { + consumers := em.consumers[outputCol] + + for _, sID := range consumers { + em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) + refreshes.insert(sID) + } + // Inform side input consumers, but don't update the upstream watermark. + for _, sID := range em.sideConsumers[outputCol] { + refreshes.insert(sID.Global) + } + } return refreshes } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 614edee47721..fde62f00c7c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -318,6 +318,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic if stage.stateful { em.StageStateful(stage.ID) } + if stage.onWindowExpiration.TimerFamily != "" { + slog.Debug("OnWindowExpiration", slog.String("stage", stage.ID), slog.Any("values", stage.onWindowExpiration)) + em.StageOnWindowExpiration(stage.ID, stage.onWindowExpiration) + } if len(stage.processingTimeTimers) > 0 { em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index deef259a99d1..6158cd6d612c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -45,6 +45,7 @@ var supportedRequirements = map[string]struct{}{ urns.RequirementSplittableDoFn: {}, urns.RequirementStatefulProcessing: {}, urns.RequirementBundleFinalization: {}, + urns.RequirementOnWindowExpiration: {}, } // TODO, move back to main package, and key off of executor handlers? diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index a2840760bf7a..894a6e1427a2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -182,8 +182,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME) } - check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now. - // Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139 if pardo.GetRestrictionCoderId() != "" && isStateful { check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.", "") diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index dceaa9ab8fcb..0e17d642d88a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -449,6 +449,12 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 { stg.stateful = true } + if pardo.GetOnWindowExpirationTimerFamilySpec() != "" { + stg.onWindowExpiration = struct { + Transform string + TimerFamily string + }{Transform: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()} + } sis = pardo.GetSideInputs() } if _, ok := sis[link.Local]; ok { diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 9f00c22789b6..7cc37754f0ef 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -57,18 +57,20 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - finalize bool - stateful bool + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + finalize bool + stateful bool + onWindowExpiration engine.StaticTimerID + // hasTimers indicates the transform+timerfamily pairs that need to be waited on for // the stage to be considered complete. - hasTimers []struct{ Transform, TimerFamily string } + hasTimers []engine.StaticTimerID processingTimeTimers map[string]bool exe transformExecuter diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 83ad1bda9841..4f25c1399924 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -42,7 +42,7 @@ type B struct { InputTransformID string Input []*engine.Block // Data and Timers for this bundle. EstimatedInputElements int - HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate. + HasTimers []engine.StaticTimerID // Timer streams to terminate. // IterableSideInputData is a map from transformID + inputID, to window, to data. IterableSideInputData map[SideInputKey]map[typex.Window][][]byte From de16e08e131fbaec38b121cb61a0c7fa70ded108 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:23:01 -0800 Subject: [PATCH 3/9] Update CHANGES.md --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 1b943a99f8a0..6d23ed60baaf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,8 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)). + * This enables initial Java GroupIntoBatches support. ## Breaking Changes From eb09cb020cf0a24eb818bd21b8294777422c21e4 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:51:56 -0800 Subject: [PATCH 4/9] Add uni test for bundle creation logic. --- .../prism/internal/engine/elementmanager.go | 160 +++++++++++------- .../internal/engine/elementmanager_test.go | 160 ++++++++++++++++++ 2 files changed, 256 insertions(+), 64 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 5a43f8306df3..7b34e18bd4eb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -184,7 +184,8 @@ type Config struct { // // Watermarks are advanced based on consumed input, except if the stage produces residuals. type ElementManager struct { - config Config + config Config + nextBundID func() string // Generates unique bundleIDs. Set in the Bundles method. impulses set[string] // List of impulse stages. stages map[string]*stageState // The state for each stage. @@ -276,10 +277,10 @@ func (em *ElementManager) StageStateful(ID string) { // processed by key. func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) { ss := em.stages[stageID] - ss.onWindowExpirationFamily = timer.TimerFamily - ss.onWindowExpirationTransform = timer.Transform + ss.onWindowExpiration = timer ss.keysToExpireByWindow = map[typex.Window]set[string]{} ss.inProgressExpiredWindows = map[typex.Window]int{} + ss.expiryWindowsByBundles = map[string]typex.Window{} } // StageProcessingTimeTimers indicates which timers are processingTime domain timers. @@ -349,6 +350,8 @@ func (rb RunBundle) LogValue() slog.Value { // The returned channel is closed when the context is canceled, or there are no pending elements // remaining. func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle { + // Make it easier for side channel bundles to get unique IDs. + em.nextBundID = nextBundID runStageCh := make(chan RunBundle) ctx, cancelFn := context.WithCancelCause(ctx) go func() { @@ -408,7 +411,6 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. return case runStageCh <- rb: } - slog.Debug("OnWindowExpiration-Bundle sent", slog.Any("bundle", rb), slog.Int64("livePending", em.livePending.Load())) em.refreshCond.L.Lock() } @@ -877,6 +879,19 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } delete(stage.inprogressHoldsByBundle, rb.BundleID) + // Clean up OnWindowExpiration bundle accounting, so window state + // may be garbage collected. + if stage.expiryWindowsByBundles != nil { + win, ok := stage.expiryWindowsByBundles[rb.BundleID] + if ok { + stage.inProgressExpiredWindows[win] -= 1 + if stage.inProgressExpiredWindows[win] == 0 { + delete(stage.inProgressExpiredWindows, win) + } + delete(stage.expiryWindowsByBundles, rb.BundleID) + } + } + // If there are estimated output watermarks, set the estimated // output watermark for the stage. if len(residuals.MinOutputWatermarks) > 0 { @@ -1099,10 +1114,10 @@ type stageState struct { processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain. // onWindowExpiration management - onWindowExpirationFamily string // If non-empty, indicates that this stage has an OnWindowExpiration timer that must trigger. - onWindowExpirationTransform string // The transform name of the DoFn with OnWindowExpiration - keysToExpireByWindow map[typex.Window]set[string] - inProgressExpiredWindows map[typex.Window]int + onWindowExpiration StaticTimerID // The static ID of the OnWindowExpiration callback. + keysToExpireByWindow map[typex.Window]set[string] // Tracks all keys ever used with a window, so they may be expired. + inProgressExpiredWindows map[typex.Window]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely collect them. + expiryWindowsByBundles map[string]typex.Window // Tracks which bundle is handling which window, so the above map can be cleared. mu sync.Mutex upstreamWatermarks sync.Map // watermark set from inputPCollection's parent. @@ -1605,6 +1620,70 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { } // If bigger, advance the output watermark + preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, em) + + // Garbage collect state, timers and side inputs, for all windows + // that are before the new output watermark, if they aren't in progress + // of being expired. + // They'll never be read in again. + for _, wins := range ss.sideInputs { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + // Clear out anything we've already used. + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue + } + delete(wins, win) + } + } + } + for _, wins := range ss.state { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip collecting this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue + } + delete(wins, win) + } + } + } + // If there are windows to expire, we don't update the output watermark yet. + if preventDownstreamUpdate { + return nil + } + + // Update this stage's output watermark, and then propagate that to downstream stages + refreshes := set[string]{} + ss.output = newOut + for _, outputCol := range ss.outputIDs { + consumers := em.consumers[outputCol] + + for _, sID := range consumers { + em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) + refreshes.insert(sID) + } + // Inform side input consumers, but don't update the upstream watermark. + for _, sID := range em.sideConsumers[outputCol] { + refreshes.insert(sID.Global) + } + } + return refreshes +} + +// createOnWindowExpirationBundles creates side channel bundles when windows +// expire for all keys that were used in that window. Returns true if any +// bundles are created, which means that the window must not yet be garbage +// collected. +// +// Must be called within the stageState.mu's and the ElementManager.refreshCond +// critical sections. +func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *ElementManager) bool { var preventDownstreamUpdate bool for win, keys := range ss.keysToExpireByWindow { // Check if the window has expired. @@ -1621,23 +1700,25 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // Produce bundle(s) for these keys and window, and side channel them. wm := win.MaxTimestamp() - rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + ss.ID + "-" + win.MaxTimestamp().String(), Watermark: wm} + rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + em.nextBundID(), Watermark: wm} // Now we need to actually build the bundle. var toProcess []element busyKeys := set[string]{} + usedKeys := set[string]{} for k := range keys { if ss.inprogressKeys.present(k) { busyKeys.insert(k) continue } + usedKeys.insert(k) toProcess = append(toProcess, element{ window: win, timestamp: wm, pane: typex.NoFiringPane(), holdTimestamp: wm, - transform: ss.onWindowExpirationTransform, - family: ss.onWindowExpirationFamily, + transform: ss.onWindowExpiration.Transform, + family: ss.onWindowExpiration.TimerFamily, sequence: 1, keyBytes: []byte(k), elmBytes: nil, @@ -1649,11 +1730,12 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { func() string { return rb.BundleID }, toProcess, wm, - keys, + usedKeys, map[mtime.Time]int{wm: 1}, ) + ss.expiryWindowsByBundles[rb.BundleID] = win - slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("keys", keys), slog.Any("window", win), slog.Any("toProcess", toProcess)) + slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("usedKeys", usedKeys), slog.Any("window", win), slog.Any("toProcess", toProcess), slog.Any("busyKeys", busyKeys)) // We're already in the refreshCond critical section. // Insert that this is in progress here to avoid a race condition. em.inprogressBundles.insert(rb.BundleID) @@ -1666,57 +1748,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { ss.keysToExpireByWindow[win] = busyKeys } } - // Garbage collect state, timers and side inputs, for all windows - // that are before the new output watermark. - // They'll never be read in again. - for _, wins := range ss.sideInputs { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { - // If the expiry is in progress, skip this window. - if ss.inProgressExpiredWindows[win] > 0 { - continue - } - delete(wins, win) - } - } - } - for _, wins := range ss.state { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - if win.MaxTimestamp() < newOut { - // If the expiry is in progress, skip collecting this window. - if ss.inProgressExpiredWindows[win] > 0 { - continue - } - delete(wins, win) - } - } - } - // If there are windows to expire, we don't update the output watermark yet. - if preventDownstreamUpdate { - return nil - } - - // Update this stage's output watermark, and then propagate that to downstream stages - refreshes := set[string]{} - ss.output = newOut - for _, outputCol := range ss.outputIDs { - consumers := em.consumers[outputCol] - - for _, sID := range consumers { - em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) - refreshes.insert(sID) - } - // Inform side input consumers, but don't update the upstream watermark. - for _, sID := range em.sideConsumers[outputCol] { - refreshes.insert(sID.Global) - } - } - return refreshes + return preventDownstreamUpdate } // bundleReady returns the maximum allowed watermark for this stage, and whether diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go index d5904b13fb88..26924eedfb21 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -524,3 +525,162 @@ func TestElementManager(t *testing.T) { } }) } + +func TestElementManager_OnWindowExpiration(t *testing.T) { + t.Run("createOnWindowExpirationBundles", func(t *testing.T) { + // Unlike the other tests above, we synthesize the input configuration, + em := NewElementManager(Config{}) + var instID uint64 + em.nextBundID = func() string { + return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1)) + } + em.AddStage("impulse", nil, []string{"input"}, nil) + em.AddStage("dofn", []string{"input"}, nil, nil) + onWE := StaticTimerID{ + Transform: "dofn1", + TimerFamily: "onWinExp", + } + em.StageOnWindowExpiration("dofn", onWE) + em.Impulse("impulse") + + stage := em.stages["dofn"] + stage.pendingByKeys = map[string]*dataAndTimers{} + stage.inprogressKeys = set[string]{} + + validateInProgressExpiredWindows := func(t *testing.T, win typex.Window, want int) { + t.Helper() + if got := stage.inProgressExpiredWindows[win]; got != want { + t.Errorf("stage.inProgressExpiredWindows[%v] = %v, want %v", win, got, want) + } + } + validateSideBundles := func(t *testing.T, keys set[string]) { + t.Helper() + if len(em.sideChannelBundles) == 0 { + t.Errorf("no sideChannelBundles exist when checking keys: %v", keys) + } + // Check that all keys are marked as in progress + for k := range keys { + if !stage.inprogressKeys.present(k) { + t.Errorf("key %q not marked as in progress", k) + } + } + + bundleID := "" + sideBundles: + for _, rb := range em.sideChannelBundles { + // find that a side channel bundle exists with these keys. + bkeys := stage.inprogressKeysByBundle[rb.BundleID] + if len(bkeys) != len(keys) { + continue sideBundles + } + for k := range keys { + if !bkeys.present(k) { + continue sideBundles + } + } + bundleID = rb.BundleID + break + } + if bundleID == "" { + t.Errorf("no bundle found with all the given keys: %v: bundles: %v keysByBundle: %v", keys, em.sideChannelBundles, stage.inprogressKeysByBundle) + } + } + + newOut := mtime.EndOfGlobalWindowTime + // No windows exist, so no side channel bundles should be set. + if got, want := stage.createOnWindowExpirationBundles(newOut, em), false; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + // Validate that no side channel bundles were created. + if got, want := len(stage.inProgressExpiredWindows), 0; got != want { + t.Errorf("len(stage.inProgressExpiredWindows) = %v, want %v", got, want) + } + if got, want := len(em.sideChannelBundles), 0; got != want { + t.Errorf("len(em.sideChannelBundles) = %v, want %v", got, want) + } + + // Configure a few conditions to validate in the call. + // Each window is in it's own bundle, all are in the same bundle. + // Bundle 1 + expiredWindow1 := window.IntervalWindow{Start: 0, End: newOut - 1} + + akey := "\u0004key1" + keys1 := singleSet(akey) + stage.keysToExpireByWindow[expiredWindow1] = keys1 + // Bundle 2 + expiredWindow2 := window.IntervalWindow{Start: 1, End: newOut - 1} + keys2 := singleSet("\u0004key2") + keys2.insert("\u0004key3") + keys2.insert("\u0004key4") + stage.keysToExpireByWindow[expiredWindow2] = keys2 + + // We should never see this key and window combination, as the window is + // not yet expired. + liveWindow := window.IntervalWindow{Start: 2, End: newOut + 1} + stage.keysToExpireByWindow[liveWindow] = singleSet("\u0010keyNotSeen") + + if got, want := stage.createOnWindowExpirationBundles(newOut, em), true; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + + // We should only see 2 sideChannelBundles at this point. + if got, want := len(em.sideChannelBundles), 2; got != want { + t.Errorf("len(em.sideChannelBundles) = %v, want %v", got, want) + } + + validateInProgressExpiredWindows(t, expiredWindow1, 1) + validateInProgressExpiredWindows(t, expiredWindow2, 1) + validateSideBundles(t, keys1) + validateSideBundles(t, keys2) + + // Bundle 3 + expiredWindow3 := window.IntervalWindow{Start: 3, End: newOut - 1} + keys3 := singleSet(akey) // We shouldn't see this key, since it's in progress. + keys3.insert("\u0004key5") // We should see this key since it isn't. + stage.keysToExpireByWindow[expiredWindow3] = keys3 + + if got, want := stage.createOnWindowExpirationBundles(newOut, em), true; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + + // We should see 3 sideChannelBundles at this point. + if got, want := len(em.sideChannelBundles), 3; got != want { + t.Errorf("len(em.sideChannelBundles) = %v, want %v", got, want) + } + + validateInProgressExpiredWindows(t, expiredWindow1, 1) + validateInProgressExpiredWindows(t, expiredWindow2, 1) + validateInProgressExpiredWindows(t, expiredWindow3, 1) + validateSideBundles(t, keys1) + validateSideBundles(t, keys2) + validateSideBundles(t, singleSet("\u0004key5")) + + // remove key1 from "inprogress keys", and the associated bundle. + stage.inprogressKeys.remove(akey) + delete(stage.inProgressExpiredWindows, expiredWindow1) + for bundID, bkeys := range stage.inprogressKeysByBundle { + if bkeys.present(akey) { + t.Logf("bundID: %v, bkeys: %v, keyByBundle: %v", bundID, bkeys, stage.inprogressKeysByBundle) + delete(stage.inprogressKeysByBundle, bundID) + win := stage.expiryWindowsByBundles[bundID] + delete(stage.expiryWindowsByBundles, bundID) + if win != expiredWindow1 { + t.Fatalf("Unexpected window: got %v, want %v", win, expiredWindow1) + } + break + } + } + + // Now we should get another bundle for expiredWindow3, and have none for expiredWindow1 + if got, want := stage.createOnWindowExpirationBundles(newOut, em), true; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + + validateInProgressExpiredWindows(t, expiredWindow1, 0) + validateInProgressExpiredWindows(t, expiredWindow2, 1) + validateInProgressExpiredWindows(t, expiredWindow3, 2) + validateSideBundles(t, keys1) // Should still have this key present, but with a different bundle. + validateSideBundles(t, keys2) + validateSideBundles(t, singleSet("\u0004key5")) // still exist.. + }) +} From 405c9812cd5b3ea17e7c5a7c1098ffb69b224da9 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 11 Dec 2024 20:54:30 -0800 Subject: [PATCH 5/9] Rename side channel to injected. --- .../prism/internal/engine/elementmanager.go | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 7b34e18bd4eb..c5f84608f25b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -195,10 +195,10 @@ type ElementManager struct { pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection. - refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. - inprogressBundles set[string] // Active bundleIDs - changedStages set[string] // Stages that have changed and need their watermark refreshed. - sideChannelBundles []RunBundle // Represents ready to executed bundles prepared on the side by a stage instead of in the main loop, such as for onWindowExpiry, or for Triggers. + refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. + inprogressBundles set[string] // Active bundleIDs + changedStages set[string] // Stages that have changed and need their watermark refreshed. + injectedBundles []RunBundle // Represents ready to execute bundles prepared outside of the main loop, such as for onWindowExpiration, or for Triggers. livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully. @@ -350,7 +350,7 @@ func (rb RunBundle) LogValue() slog.Value { // The returned channel is closed when the context is canceled, or there are no pending elements // remaining. func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle { - // Make it easier for side channel bundles to get unique IDs. + // Make it easier for injected bundles to get unique IDs. em.nextBundID = nextBundID runStageCh := make(chan RunBundle) ctx, cancelFn := context.WithCancelCause(ctx) @@ -384,8 +384,9 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) - // If there are no changed stages or ready processing time events available, we wait until there are. - for len(em.changedStages)+len(changedByProcessingTime)+len(em.sideChannelBundles) == 0 { + // If there are no changed stages, ready processing time events, + // or injected bundles available, we wait until there are. + for len(em.changedStages)+len(changedByProcessingTime)+len(em.injectedBundles) == 0 { // Check to see if we must exit select { case <-ctx.Done(): @@ -400,10 +401,10 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) } - // Run any side channel bundles first. - for len(em.sideChannelBundles) > 0 { - rb := em.sideChannelBundles[0] - em.sideChannelBundles = em.sideChannelBundles[1:] + // Run any injected bundles first. + for len(em.injectedBundles) > 0 { + rb := em.injectedBundles[0] + em.injectedBundles = em.injectedBundles[1:] em.refreshCond.L.Unlock() select { @@ -1676,7 +1677,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { return refreshes } -// createOnWindowExpirationBundles creates side channel bundles when windows +// createOnWindowExpirationBundles injects bundles when windows // expire for all keys that were used in that window. Returns true if any // bundles are created, which means that the window must not yet be garbage // collected. @@ -1698,7 +1699,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele // are in progress. ss.inProgressExpiredWindows[win] += 1 - // Produce bundle(s) for these keys and window, and side channel them. + // Produce bundle(s) for these keys and window, and inject them. wm := win.MaxTimestamp() rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + em.nextBundID(), Watermark: wm} @@ -1739,7 +1740,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele // We're already in the refreshCond critical section. // Insert that this is in progress here to avoid a race condition. em.inprogressBundles.insert(rb.BundleID) - em.sideChannelBundles = append(em.sideChannelBundles, rb) + em.injectedBundles = append(em.injectedBundles, rb) // Remove the key accounting, or continue tracking which keys still need clearing. if len(busyKeys) == 0 { From 36ddc743f3268448ecd7be3289d246717ae7a390 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 11 Dec 2024 20:57:34 -0800 Subject: [PATCH 6/9] Amend documentation for StaticTimerID. --- .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index c5f84608f25b..8ee14976d80b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -656,7 +656,8 @@ type Block struct { Transform, Family string } -// StaticTimerID represents the static user identifiers for a timer. +// StaticTimerID represents the static user identifiers for a timer, +// in particular, the ID of the Transform, and the family for the timer. type StaticTimerID struct { Transform, TimerFamily string } From 30f84a494c6a2778238623eee26a303fdfaa22d0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 11 Dec 2024 21:02:14 -0800 Subject: [PATCH 7/9] comment --- .../go/pkg/beam/runners/prism/internal/engine/elementmanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 8ee14976d80b..6a17f7397de0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1118,7 +1118,7 @@ type stageState struct { // onWindowExpiration management onWindowExpiration StaticTimerID // The static ID of the OnWindowExpiration callback. keysToExpireByWindow map[typex.Window]set[string] // Tracks all keys ever used with a window, so they may be expired. - inProgressExpiredWindows map[typex.Window]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely collect them. + inProgressExpiredWindows map[typex.Window]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely garbage collect them. expiryWindowsByBundles map[string]typex.Window // Tracks which bundle is handling which window, so the above map can be cleared. mu sync.Mutex From 54f7a420d7e059ac49339efef3851f9e157f3d6c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 12 Dec 2024 10:24:32 -0800 Subject: [PATCH 8/9] fix variable name in test --- .../internal/engine/elementmanager_test.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go index 26924eedfb21..fe52291be6c4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go @@ -555,8 +555,8 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { } validateSideBundles := func(t *testing.T, keys set[string]) { t.Helper() - if len(em.sideChannelBundles) == 0 { - t.Errorf("no sideChannelBundles exist when checking keys: %v", keys) + if len(em.injectedBundles) == 0 { + t.Errorf("no injectedBundles exist when checking keys: %v", keys) } // Check that all keys are marked as in progress for k := range keys { @@ -567,7 +567,7 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { bundleID := "" sideBundles: - for _, rb := range em.sideChannelBundles { + for _, rb := range em.injectedBundles { // find that a side channel bundle exists with these keys. bkeys := stage.inprogressKeysByBundle[rb.BundleID] if len(bkeys) != len(keys) { @@ -582,7 +582,7 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { break } if bundleID == "" { - t.Errorf("no bundle found with all the given keys: %v: bundles: %v keysByBundle: %v", keys, em.sideChannelBundles, stage.inprogressKeysByBundle) + t.Errorf("no bundle found with all the given keys: %v: bundles: %v keysByBundle: %v", keys, em.injectedBundles, stage.inprogressKeysByBundle) } } @@ -595,8 +595,8 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { if got, want := len(stage.inProgressExpiredWindows), 0; got != want { t.Errorf("len(stage.inProgressExpiredWindows) = %v, want %v", got, want) } - if got, want := len(em.sideChannelBundles), 0; got != want { - t.Errorf("len(em.sideChannelBundles) = %v, want %v", got, want) + if got, want := len(em.injectedBundles), 0; got != want { + t.Errorf("len(em.injectedBundles) = %v, want %v", got, want) } // Configure a few conditions to validate in the call. @@ -623,9 +623,9 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) } - // We should only see 2 sideChannelBundles at this point. - if got, want := len(em.sideChannelBundles), 2; got != want { - t.Errorf("len(em.sideChannelBundles) = %v, want %v", got, want) + // We should only see 2 injectedBundles at this point. + if got, want := len(em.injectedBundles), 2; got != want { + t.Errorf("len(em.injectedBundles) = %v, want %v", got, want) } validateInProgressExpiredWindows(t, expiredWindow1, 1) @@ -643,9 +643,9 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) } - // We should see 3 sideChannelBundles at this point. - if got, want := len(em.sideChannelBundles), 3; got != want { - t.Errorf("len(em.sideChannelBundles) = %v, want %v", got, want) + // We should see 3 injectedBundles at this point. + if got, want := len(em.injectedBundles), 3; got != want { + t.Errorf("len(em.injectedBundles) = %v, want %v", got, want) } validateInProgressExpiredWindows(t, expiredWindow1, 1) From b9846ca7f1269fb3f7e3586685814351c1a28b60 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 12 Dec 2024 13:00:34 -0800 Subject: [PATCH 9/9] Rename static field to TransformID --- .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 4 ++-- .../runners/prism/internal/engine/elementmanager_test.go | 2 +- sdks/go/pkg/beam/runners/prism/internal/preprocess.go | 5 +---- sdks/go/pkg/beam/runners/prism/internal/stage.go | 2 +- sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go | 2 +- 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 6a17f7397de0..00e18c669afa 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -659,7 +659,7 @@ type Block struct { // StaticTimerID represents the static user identifiers for a timer, // in particular, the ID of the Transform, and the family for the timer. type StaticTimerID struct { - Transform, TimerFamily string + TransformID, TimerFamily string } // StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle. @@ -1719,7 +1719,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele timestamp: wm, pane: typex.NoFiringPane(), holdTimestamp: wm, - transform: ss.onWindowExpiration.Transform, + transform: ss.onWindowExpiration.TransformID, family: ss.onWindowExpiration.TimerFamily, sequence: 1, keyBytes: []byte(k), diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go index fe52291be6c4..0d7da5ea163f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go @@ -537,7 +537,7 @@ func TestElementManager_OnWindowExpiration(t *testing.T) { em.AddStage("impulse", nil, []string{"input"}, nil) em.AddStage("dofn", []string{"input"}, nil, nil) onWE := StaticTimerID{ - Transform: "dofn1", + TransformID: "dofn1", TimerFamily: "onWinExp", } em.StageOnWindowExpiration("dofn", onWE) diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index 0e17d642d88a..0d3ec7c365c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -450,10 +450,7 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa stg.stateful = true } if pardo.GetOnWindowExpirationTimerFamilySpec() != "" { - stg.onWindowExpiration = struct { - Transform string - TimerFamily string - }{Transform: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()} + stg.onWindowExpiration = engine.StaticTimerID{TransformID: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()} } sis = pardo.GetSideInputs() } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 7cc37754f0ef..9dd6cbdafec8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -454,7 +454,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng } } for timerID, v := range pardo.GetTimerFamilySpecs() { - stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID}) + stg.hasTimers = append(stg.hasTimers, engine.StaticTimerID{TransformID: tid, TimerFamily: timerID}) if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME { if stg.processingTimeTimers == nil { stg.processingTimeTimers = map[string]bool{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 4f25c1399924..14cd84aef821 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -190,7 +190,7 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} { for _, tid := range b.HasTimers { timers = append(timers, &fnpb.Elements_Timers{ InstructionId: b.InstID, - TransformId: tid.Transform, + TransformId: tid.TransformID, TimerFamilyId: tid.TimerFamily, IsLast: true, })