From 9acd8b0180b4c7c5bbe4453dce1ab7012af5bfed Mon Sep 17 00:00:00 2001 From: akashorabek <70029317+akashorabek@users.noreply.github.com> Date: Tue, 17 Jun 2025 19:47:10 +0500 Subject: [PATCH 1/2] Reverts #35202 (#35306) --- .../runners/prism/internal/engine/elementmanager.go | 8 ++------ sdks/go/pkg/beam/runners/prism/internal/execute.go | 13 +------------ 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index daea323d2638..42db5fbccae9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -161,8 +161,6 @@ type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int - // Whether to use real-time clock as processing time - EnableRTC bool } // ElementManager handles elements, watermarks, and related errata to determine @@ -2162,10 +2160,8 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { } // "Test" mode -> advance to next processing time event if any, to allow execution. - if !em.config.EnableRTC { - if t, ok := em.processTimeEvents.Peek(); ok { - return t - } + if t, ok := em.processTimeEvents.Peek(); ok { + return t } // "Production" mode, always real time now. diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 427f1eb7145d..51848944c0c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -145,18 +145,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic topo := prepro.preProcessGraph(comps, j) ts := comps.GetTransforms() - config := engine.Config{} - m := j.PipelineOptions().AsMap() - for _, exp := range m["beam:option:experiments:v1"].([]interface{}) { - if expStr, ok := exp.(string); ok { - if expStr == "prism_enable_rtc" { - config.EnableRTC = true - break // Found it, no need to check the rest of the slice - } - } - } - - em := engine.NewElementManager(config) + em := engine.NewElementManager(engine.Config{}) // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{} From ce784e5b78f88f3903f0e794296e87c79cebcc8a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Jun 2025 15:28:51 -0400 Subject: [PATCH 2/2] Fix flaky precommit java test on prism caused by PR 35202 (#35336) * Revert "Reverts #35202 (#35306)" This reverts commit 71073a0ec12a4b3e78da4f4ca0862a743c8ff8cc. * Check if type assertion is ok before using it in range. * Trigger PreCommit Java test. --- .github/trigger_files/beam_PreCommit_Java.json | 4 ++++ .../prism/internal/engine/elementmanager.go | 8 ++++++-- .../go/pkg/beam/runners/prism/internal/execute.go | 15 ++++++++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 .github/trigger_files/beam_PreCommit_Java.json diff --git a/.github/trigger_files/beam_PreCommit_Java.json b/.github/trigger_files/beam_PreCommit_Java.json new file mode 100644 index 000000000000..5abe02fc09c7 --- /dev/null +++ b/.github/trigger_files/beam_PreCommit_Java.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 42db5fbccae9..daea323d2638 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -161,6 +161,8 @@ type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int + // Whether to use real-time clock as processing time + EnableRTC bool } // ElementManager handles elements, watermarks, and related errata to determine @@ -2160,8 +2162,10 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { } // "Test" mode -> advance to next processing time event if any, to allow execution. - if t, ok := em.processTimeEvents.Peek(); ok { - return t + if !em.config.EnableRTC { + if t, ok := em.processTimeEvents.Peek(); ok { + return t + } } // "Production" mode, always real time now. diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 51848944c0c1..ab041da314dc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -145,7 +145,20 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic topo := prepro.preProcessGraph(comps, j) ts := comps.GetTransforms() - em := engine.NewElementManager(engine.Config{}) + config := engine.Config{} + m := j.PipelineOptions().AsMap() + if experimentsSlice, ok := m["beam:option:experiments:v1"].([]interface{}); ok { + for _, exp := range experimentsSlice { + if expStr, ok := exp.(string); ok { + if expStr == "prism_enable_rtc" { + config.EnableRTC = true + break // Found it, no need to check the rest of the slice + } + } + } + } + + em := engine.NewElementManager(config) // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{}