diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index a48973f65674..791952c625f4 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -98,6 +98,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', // Requires Allowed Lateness, among others. 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness', + 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate', 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode', 'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow', @@ -160,6 +161,14 @@ def sickbayTests = [ // TODO(https://github.com/apache/beam/issues/31231) 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata', + + // These tests fail once Late Data was being precisely dropped. + // They set a single element to be late data, and expect it (correctly) to be preserved. + // Since presently, these are treated as No-ops, the fix is to disable the + // dropping behavior when a stage's input is a Reshuffle/Redistribute transform. + 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming', + 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming', + // Prism isn't handling Java's side input views properly. // https://github.com/apache/beam/issues/32932 // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. @@ -177,13 +186,6 @@ def sickbayTests = [ // java.lang.IllegalStateException: java.io.EOFException 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', - // Requires Time Sorted Input - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData', - // Missing output due to processing time timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 3cfcf9ef8c0e..bb3c8ceceeb8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st func (ss *stageState) AddPending(newPending []element) int { ss.mu.Lock() defer ss.mu.Unlock() + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + // Data that arrives after the *output* watermark is late. + threshold := ss.output + origPending := make([]element, 0, ss.pending.Len()) + for _, e := range newPending { + if e.window.MaxTimestamp() < threshold { + continue + } + origPending = append(origPending, e) + } + newPending = origPending if ss.stateful { if ss.pendingByKeys == nil { ss.pendingByKeys = map[string]*dataAndTimers{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go index 13e9b6f1b79d..7ac472251f61 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go @@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb // At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal. - // StatefulDoFns need to be marked as being roots. + // ForceRoots cause fusion breaks in the optimized graph. + // StatefulDoFns need to be marked as being roots, for correct per-key state handling. + // Prism already sorts input elements for a stage by EventTime, so a fusion break enables the sorted behavior. var forcedRoots []string - if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 { + if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0 || + pdo.GetRequiresTimeSortedInput() { forcedRoots = append(forcedRoots, tid) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 4be64e5a9c80..f186b11fd1d8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{ urns.RequirementStatefulProcessing: {}, urns.RequirementBundleFinalization: {}, urns.RequirementOnWindowExpiration: {}, + urns.RequirementTimeSortedInput: {}, } // TODO, move back to main package, and key off of executor handlers? diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 742547b9b6c3..8409133772eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3764,7 +3764,9 @@ public void testRequiresTimeSortedInputWithLateData() { if (stamp == 100) { // advance watermark when we have 100 remaining elements // all the rest are going to be late elements - input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); + input = + input.advanceWatermarkTo( + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1))); } } testTimeSortedInput( @@ -3796,7 +3798,9 @@ public void testTwoRequiresTimeSortedInputWithLateData() { if (stamp == 100) { // advance watermark when we have 100 remaining elements // all the rest are going to be late elements - input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); + input = + input.advanceWatermarkTo( + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1))); } } // apply the sorted function for the first time