diff --git a/CHANGES.md b/CHANGES.md index 1b943a99f8a0..6d23ed60baaf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,8 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)). + * This enables initial Java GroupIntoBatches support. ## Breaking Changes diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 82eb62b9e207..ce71151099bd 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -106,6 +106,12 @@ def sickbayTests = [ 'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams', 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger', + // GroupIntoBatchesTest tests that fail: + // Teststream has bad KV encodings due to using an outer context. + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode', + // ShardedKey not yet implemented. + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', + // Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected 'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage', @@ -228,14 +234,16 @@ def createPrismValidatesRunnerTask = { name, environmentType -> excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment' // Not yet implemented in Prism - // https://github.com/apache/beam/issues/32211 - excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' // https://github.com/apache/beam/issues/32929 excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' - // Not supported in Portable Java SDK yet. - // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState - excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' + // Not supported in Portable Java SDK yet. + // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState + excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' + + // Processing time with TestStream is unreliable without being able to control + // SDK side time portably. Ignore these tests. + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' } filter { for (String test : sickbayTests) { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 1739efdb742a..00e18c669afa 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -184,7 +184,8 @@ type Config struct { // // Watermarks are advanced based on consumed input, except if the stage produces residuals. type ElementManager struct { - config Config + config Config + nextBundID func() string // Generates unique bundleIDs. Set in the Bundles method. impulses set[string] // List of impulse stages. stages map[string]*stageState // The state for each stage. @@ -197,6 +198,7 @@ type ElementManager struct { refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. inprogressBundles set[string] // Active bundleIDs changedStages set[string] // Stages that have changed and need their watermark refreshed. + injectedBundles []RunBundle // Represents ready to execute bundles prepared outside of the main loop, such as for onWindowExpiration, or for Triggers. livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully. @@ -271,6 +273,16 @@ func (em *ElementManager) StageStateful(ID string) { em.stages[ID].stateful = true } +// StageOnWindowExpiration marks the given stage as stateful, which means elements are +// processed by key. +func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) { + ss := em.stages[stageID] + ss.onWindowExpiration = timer + ss.keysToExpireByWindow = map[typex.Window]set[string]{} + ss.inProgressExpiredWindows = map[typex.Window]int{} + ss.expiryWindowsByBundles = map[string]typex.Window{} +} + // StageProcessingTimeTimers indicates which timers are processingTime domain timers. func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) { em.stages[ID].processingTimeTimersFamilies = ptTimers @@ -338,6 +350,8 @@ func (rb RunBundle) LogValue() slog.Value { // The returned channel is closed when the context is canceled, or there are no pending elements // remaining. func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle { + // Make it easier for injected bundles to get unique IDs. + em.nextBundID = nextBundID runStageCh := make(chan RunBundle) ctx, cancelFn := context.WithCancelCause(ctx) go func() { @@ -370,8 +384,9 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) - // If there are no changed stages or ready processing time events available, we wait until there are. - for len(em.changedStages)+len(changedByProcessingTime) == 0 { + // If there are no changed stages, ready processing time events, + // or injected bundles available, we wait until there are. + for len(em.changedStages)+len(changedByProcessingTime)+len(em.injectedBundles) == 0 { // Check to see if we must exit select { case <-ctx.Done(): @@ -386,6 +401,19 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) } + // Run any injected bundles first. + for len(em.injectedBundles) > 0 { + rb := em.injectedBundles[0] + em.injectedBundles = em.injectedBundles[1:] + em.refreshCond.L.Unlock() + + select { + case <-ctx.Done(): + return + case runStageCh <- rb: + } + em.refreshCond.L.Lock() + } // We know there is some work we can do that may advance the watermarks, // refresh them, and see which stages have advanced. @@ -628,6 +656,12 @@ type Block struct { Transform, Family string } +// StaticTimerID represents the static user identifiers for a timer, +// in particular, the ID of the Transform, and the family for the timer. +type StaticTimerID struct { + TransformID, TimerFamily string +} + // StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle. // // TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention. @@ -847,6 +881,19 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } delete(stage.inprogressHoldsByBundle, rb.BundleID) + // Clean up OnWindowExpiration bundle accounting, so window state + // may be garbage collected. + if stage.expiryWindowsByBundles != nil { + win, ok := stage.expiryWindowsByBundles[rb.BundleID] + if ok { + stage.inProgressExpiredWindows[win] -= 1 + if stage.inProgressExpiredWindows[win] == 0 { + delete(stage.inProgressExpiredWindows, win) + } + delete(stage.expiryWindowsByBundles, rb.BundleID) + } + } + // If there are estimated output watermarks, set the estimated // output watermark for the stage. if len(residuals.MinOutputWatermarks) > 0 { @@ -1068,6 +1115,12 @@ type stageState struct { strat winStrat // Windowing Strategy for aggregation fireings. processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain. + // onWindowExpiration management + onWindowExpiration StaticTimerID // The static ID of the OnWindowExpiration callback. + keysToExpireByWindow map[typex.Window]set[string] // Tracks all keys ever used with a window, so they may be expired. + inProgressExpiredWindows map[typex.Window]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely garbage collect them. + expiryWindowsByBundles map[string]typex.Window // Tracks which bundle is handling which window, so the above map can be cleared. + mu sync.Mutex upstreamWatermarks sync.Map // watermark set from inputPCollection's parent. input mtime.Time // input watermark for the parallel input. @@ -1158,6 +1211,14 @@ func (ss *stageState) AddPending(newPending []element) int { timers: map[timerKey]timerTimes{}, } ss.pendingByKeys[string(e.keyBytes)] = dnt + if ss.keysToExpireByWindow != nil { + w, ok := ss.keysToExpireByWindow[e.window] + if !ok { + w = make(set[string]) + ss.keysToExpireByWindow[e.window] = w + } + w.insert(string(e.keyBytes)) + } } heap.Push(&dnt.elements, e) @@ -1555,48 +1616,143 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { if minWatermarkHold < newOut { newOut = minWatermarkHold } - refreshes := set[string]{} + // If the newOut is smaller, then don't change downstream watermarks. + if newOut <= ss.output { + return nil + } + // If bigger, advance the output watermark - if newOut > ss.output { - ss.output = newOut - for _, outputCol := range ss.outputIDs { - consumers := em.consumers[outputCol] - - for _, sID := range consumers { - em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) - refreshes.insert(sID) - } - // Inform side input consumers, but don't update the upstream watermark. - for _, sID := range em.sideConsumers[outputCol] { - refreshes.insert(sID.Global) - } - } - // Garbage collect state, timers and side inputs, for all windows - // that are before the new output watermark. - // They'll never be read in again. - for _, wins := range ss.sideInputs { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { - delete(wins, win) + preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, em) + + // Garbage collect state, timers and side inputs, for all windows + // that are before the new output watermark, if they aren't in progress + // of being expired. + // They'll never be read in again. + for _, wins := range ss.sideInputs { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + // Clear out anything we've already used. + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue } + delete(wins, win) } } - for _, wins := range ss.state { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - if win.MaxTimestamp() < newOut { - delete(wins, win) + } + for _, wins := range ss.state { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip collecting this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue } + delete(wins, win) } } } + // If there are windows to expire, we don't update the output watermark yet. + if preventDownstreamUpdate { + return nil + } + + // Update this stage's output watermark, and then propagate that to downstream stages + refreshes := set[string]{} + ss.output = newOut + for _, outputCol := range ss.outputIDs { + consumers := em.consumers[outputCol] + + for _, sID := range consumers { + em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) + refreshes.insert(sID) + } + // Inform side input consumers, but don't update the upstream watermark. + for _, sID := range em.sideConsumers[outputCol] { + refreshes.insert(sID.Global) + } + } return refreshes } +// createOnWindowExpirationBundles injects bundles when windows +// expire for all keys that were used in that window. Returns true if any +// bundles are created, which means that the window must not yet be garbage +// collected. +// +// Must be called within the stageState.mu's and the ElementManager.refreshCond +// critical sections. +func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *ElementManager) bool { + var preventDownstreamUpdate bool + for win, keys := range ss.keysToExpireByWindow { + // Check if the window has expired. + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() >= newOut { + continue + } + // We can't advance the output watermark if there's garbage to collect. + preventDownstreamUpdate = true + // Hold off on garbage collecting data for these windows while these + // are in progress. + ss.inProgressExpiredWindows[win] += 1 + + // Produce bundle(s) for these keys and window, and inject them. + wm := win.MaxTimestamp() + rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + em.nextBundID(), Watermark: wm} + + // Now we need to actually build the bundle. + var toProcess []element + busyKeys := set[string]{} + usedKeys := set[string]{} + for k := range keys { + if ss.inprogressKeys.present(k) { + busyKeys.insert(k) + continue + } + usedKeys.insert(k) + toProcess = append(toProcess, element{ + window: win, + timestamp: wm, + pane: typex.NoFiringPane(), + holdTimestamp: wm, + transform: ss.onWindowExpiration.TransformID, + family: ss.onWindowExpiration.TimerFamily, + sequence: 1, + keyBytes: []byte(k), + elmBytes: nil, + }) + } + em.addPending(len(toProcess)) + ss.watermarkHolds.Add(wm, 1) + ss.makeInProgressBundle( + func() string { return rb.BundleID }, + toProcess, + wm, + usedKeys, + map[mtime.Time]int{wm: 1}, + ) + ss.expiryWindowsByBundles[rb.BundleID] = win + + slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("usedKeys", usedKeys), slog.Any("window", win), slog.Any("toProcess", toProcess), slog.Any("busyKeys", busyKeys)) + // We're already in the refreshCond critical section. + // Insert that this is in progress here to avoid a race condition. + em.inprogressBundles.insert(rb.BundleID) + em.injectedBundles = append(em.injectedBundles, rb) + + // Remove the key accounting, or continue tracking which keys still need clearing. + if len(busyKeys) == 0 { + delete(ss.keysToExpireByWindow, win) + } else { + ss.keysToExpireByWindow[win] = busyKeys + } + } + return preventDownstreamUpdate +} + // bundleReady returns the maximum allowed watermark for this stage, and whether // it's permitted to execute by side inputs. func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool) { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go index d5904b13fb88..0d7da5ea163f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -524,3 +525,162 @@ func TestElementManager(t *testing.T) { } }) } + +func TestElementManager_OnWindowExpiration(t *testing.T) { + t.Run("createOnWindowExpirationBundles", func(t *testing.T) { + // Unlike the other tests above, we synthesize the input configuration, + em := NewElementManager(Config{}) + var instID uint64 + em.nextBundID = func() string { + return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1)) + } + em.AddStage("impulse", nil, []string{"input"}, nil) + em.AddStage("dofn", []string{"input"}, nil, nil) + onWE := StaticTimerID{ + TransformID: "dofn1", + TimerFamily: "onWinExp", + } + em.StageOnWindowExpiration("dofn", onWE) + em.Impulse("impulse") + + stage := em.stages["dofn"] + stage.pendingByKeys = map[string]*dataAndTimers{} + stage.inprogressKeys = set[string]{} + + validateInProgressExpiredWindows := func(t *testing.T, win typex.Window, want int) { + t.Helper() + if got := stage.inProgressExpiredWindows[win]; got != want { + t.Errorf("stage.inProgressExpiredWindows[%v] = %v, want %v", win, got, want) + } + } + validateSideBundles := func(t *testing.T, keys set[string]) { + t.Helper() + if len(em.injectedBundles) == 0 { + t.Errorf("no injectedBundles exist when checking keys: %v", keys) + } + // Check that all keys are marked as in progress + for k := range keys { + if !stage.inprogressKeys.present(k) { + t.Errorf("key %q not marked as in progress", k) + } + } + + bundleID := "" + sideBundles: + for _, rb := range em.injectedBundles { + // find that a side channel bundle exists with these keys. + bkeys := stage.inprogressKeysByBundle[rb.BundleID] + if len(bkeys) != len(keys) { + continue sideBundles + } + for k := range keys { + if !bkeys.present(k) { + continue sideBundles + } + } + bundleID = rb.BundleID + break + } + if bundleID == "" { + t.Errorf("no bundle found with all the given keys: %v: bundles: %v keysByBundle: %v", keys, em.injectedBundles, stage.inprogressKeysByBundle) + } + } + + newOut := mtime.EndOfGlobalWindowTime + // No windows exist, so no side channel bundles should be set. + if got, want := stage.createOnWindowExpirationBundles(newOut, em), false; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + // Validate that no side channel bundles were created. + if got, want := len(stage.inProgressExpiredWindows), 0; got != want { + t.Errorf("len(stage.inProgressExpiredWindows) = %v, want %v", got, want) + } + if got, want := len(em.injectedBundles), 0; got != want { + t.Errorf("len(em.injectedBundles) = %v, want %v", got, want) + } + + // Configure a few conditions to validate in the call. + // Each window is in it's own bundle, all are in the same bundle. + // Bundle 1 + expiredWindow1 := window.IntervalWindow{Start: 0, End: newOut - 1} + + akey := "\u0004key1" + keys1 := singleSet(akey) + stage.keysToExpireByWindow[expiredWindow1] = keys1 + // Bundle 2 + expiredWindow2 := window.IntervalWindow{Start: 1, End: newOut - 1} + keys2 := singleSet("\u0004key2") + keys2.insert("\u0004key3") + keys2.insert("\u0004key4") + stage.keysToExpireByWindow[expiredWindow2] = keys2 + + // We should never see this key and window combination, as the window is + // not yet expired. + liveWindow := window.IntervalWindow{Start: 2, End: newOut + 1} + stage.keysToExpireByWindow[liveWindow] = singleSet("\u0010keyNotSeen") + + if got, want := stage.createOnWindowExpirationBundles(newOut, em), true; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + + // We should only see 2 injectedBundles at this point. + if got, want := len(em.injectedBundles), 2; got != want { + t.Errorf("len(em.injectedBundles) = %v, want %v", got, want) + } + + validateInProgressExpiredWindows(t, expiredWindow1, 1) + validateInProgressExpiredWindows(t, expiredWindow2, 1) + validateSideBundles(t, keys1) + validateSideBundles(t, keys2) + + // Bundle 3 + expiredWindow3 := window.IntervalWindow{Start: 3, End: newOut - 1} + keys3 := singleSet(akey) // We shouldn't see this key, since it's in progress. + keys3.insert("\u0004key5") // We should see this key since it isn't. + stage.keysToExpireByWindow[expiredWindow3] = keys3 + + if got, want := stage.createOnWindowExpirationBundles(newOut, em), true; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + + // We should see 3 injectedBundles at this point. + if got, want := len(em.injectedBundles), 3; got != want { + t.Errorf("len(em.injectedBundles) = %v, want %v", got, want) + } + + validateInProgressExpiredWindows(t, expiredWindow1, 1) + validateInProgressExpiredWindows(t, expiredWindow2, 1) + validateInProgressExpiredWindows(t, expiredWindow3, 1) + validateSideBundles(t, keys1) + validateSideBundles(t, keys2) + validateSideBundles(t, singleSet("\u0004key5")) + + // remove key1 from "inprogress keys", and the associated bundle. + stage.inprogressKeys.remove(akey) + delete(stage.inProgressExpiredWindows, expiredWindow1) + for bundID, bkeys := range stage.inprogressKeysByBundle { + if bkeys.present(akey) { + t.Logf("bundID: %v, bkeys: %v, keyByBundle: %v", bundID, bkeys, stage.inprogressKeysByBundle) + delete(stage.inprogressKeysByBundle, bundID) + win := stage.expiryWindowsByBundles[bundID] + delete(stage.expiryWindowsByBundles, bundID) + if win != expiredWindow1 { + t.Fatalf("Unexpected window: got %v, want %v", win, expiredWindow1) + } + break + } + } + + // Now we should get another bundle for expiredWindow3, and have none for expiredWindow1 + if got, want := stage.createOnWindowExpirationBundles(newOut, em), true; got != want { + t.Errorf("createOnWindowExpirationBundles(%v) = %v, want %v", newOut, got, want) + } + + validateInProgressExpiredWindows(t, expiredWindow1, 0) + validateInProgressExpiredWindows(t, expiredWindow2, 1) + validateInProgressExpiredWindows(t, expiredWindow3, 2) + validateSideBundles(t, keys1) // Should still have this key present, but with a different bundle. + validateSideBundles(t, keys2) + validateSideBundles(t, singleSet("\u0004key5")) // still exist.. + }) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 614edee47721..fde62f00c7c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -318,6 +318,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic if stage.stateful { em.StageStateful(stage.ID) } + if stage.onWindowExpiration.TimerFamily != "" { + slog.Debug("OnWindowExpiration", slog.String("stage", stage.ID), slog.Any("values", stage.onWindowExpiration)) + em.StageOnWindowExpiration(stage.ID, stage.onWindowExpiration) + } if len(stage.processingTimeTimers) > 0 { em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index deef259a99d1..6158cd6d612c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -45,6 +45,7 @@ var supportedRequirements = map[string]struct{}{ urns.RequirementSplittableDoFn: {}, urns.RequirementStatefulProcessing: {}, urns.RequirementBundleFinalization: {}, + urns.RequirementOnWindowExpiration: {}, } // TODO, move back to main package, and key off of executor handlers? diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index a2840760bf7a..894a6e1427a2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -182,8 +182,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME) } - check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now. - // Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139 if pardo.GetRestrictionCoderId() != "" && isStateful { check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.", "") diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index dceaa9ab8fcb..0d3ec7c365c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -449,6 +449,9 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 { stg.stateful = true } + if pardo.GetOnWindowExpirationTimerFamilySpec() != "" { + stg.onWindowExpiration = engine.StaticTimerID{TransformID: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()} + } sis = pardo.GetSideInputs() } if _, ok := sis[link.Local]; ok { diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 9f00c22789b6..9dd6cbdafec8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -57,18 +57,20 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - finalize bool - stateful bool + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + finalize bool + stateful bool + onWindowExpiration engine.StaticTimerID + // hasTimers indicates the transform+timerfamily pairs that need to be waited on for // the stage to be considered complete. - hasTimers []struct{ Transform, TimerFamily string } + hasTimers []engine.StaticTimerID processingTimeTimers map[string]bool exe transformExecuter @@ -452,7 +454,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng } } for timerID, v := range pardo.GetTimerFamilySpecs() { - stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID}) + stg.hasTimers = append(stg.hasTimers, engine.StaticTimerID{TransformID: tid, TimerFamily: timerID}) if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME { if stg.processingTimeTimers == nil { stg.processingTimeTimers = map[string]bool{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 83ad1bda9841..14cd84aef821 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -42,7 +42,7 @@ type B struct { InputTransformID string Input []*engine.Block // Data and Timers for this bundle. EstimatedInputElements int - HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate. + HasTimers []engine.StaticTimerID // Timer streams to terminate. // IterableSideInputData is a map from transformID + inputID, to window, to data. IterableSideInputData map[SideInputKey]map[typex.Window][][]byte @@ -190,7 +190,7 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} { for _, tid := range b.HasTimers { timers = append(timers, &fnpb.Elements_Timers{ InstructionId: b.InstID, - TransformId: tid.Transform, + TransformId: tid.TransformID, TimerFamilyId: tid.TimerFamily, IsLast: true, })