From 624278494ec5a5ed1bda4dd241cb0dd166a0d878 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Jun 2025 14:01:21 -0400 Subject: [PATCH 1/3] Revert "Reverts #35202 (#35306)" This reverts commit 71073a0ec12a4b3e78da4f4ca0862a743c8ff8cc. --- .../runners/prism/internal/engine/elementmanager.go | 8 ++++++-- sdks/go/pkg/beam/runners/prism/internal/execute.go | 13 ++++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 42db5fbccae9..daea323d2638 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -161,6 +161,8 @@ type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int + // Whether to use real-time clock as processing time + EnableRTC bool } // ElementManager handles elements, watermarks, and related errata to determine @@ -2160,8 +2162,10 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { } // "Test" mode -> advance to next processing time event if any, to allow execution. - if t, ok := em.processTimeEvents.Peek(); ok { - return t + if !em.config.EnableRTC { + if t, ok := em.processTimeEvents.Peek(); ok { + return t + } } // "Production" mode, always real time now. diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 51848944c0c1..427f1eb7145d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -145,7 +145,18 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic topo := prepro.preProcessGraph(comps, j) ts := comps.GetTransforms() - em := engine.NewElementManager(engine.Config{}) + config := engine.Config{} + m := j.PipelineOptions().AsMap() + for _, exp := range m["beam:option:experiments:v1"].([]interface{}) { + if expStr, ok := exp.(string); ok { + if expStr == "prism_enable_rtc" { + config.EnableRTC = true + break // Found it, no need to check the rest of the slice + } + } + } + + em := engine.NewElementManager(config) // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{} From 6a9ed08432876614a9f1a1ead91756e1ec5ea8a3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Jun 2025 14:22:46 -0400 Subject: [PATCH 2/3] Check if type assertion if ok before using it in range. --- sdks/go/pkg/beam/runners/prism/internal/execute.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 427f1eb7145d..ab041da314dc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -147,11 +147,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic config := engine.Config{} m := j.PipelineOptions().AsMap() - for _, exp := range m["beam:option:experiments:v1"].([]interface{}) { - if expStr, ok := exp.(string); ok { - if expStr == "prism_enable_rtc" { - config.EnableRTC = true - break // Found it, no need to check the rest of the slice + if experimentsSlice, ok := m["beam:option:experiments:v1"].([]interface{}); ok { + for _, exp := range experimentsSlice { + if expStr, ok := exp.(string); ok { + if expStr == "prism_enable_rtc" { + config.EnableRTC = true + break // Found it, no need to check the rest of the slice + } } } } From 1bd9d31a7a597f216610b8cf6ef72362660f1595 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Jun 2025 14:29:15 -0400 Subject: [PATCH 3/3] Trigger PreCommit Java test. --- .github/trigger_files/beam_PreCommit_Java.json | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .github/trigger_files/beam_PreCommit_Java.json diff --git a/.github/trigger_files/beam_PreCommit_Java.json b/.github/trigger_files/beam_PreCommit_Java.json new file mode 100644 index 000000000000..5abe02fc09c7 --- /dev/null +++ b/.github/trigger_files/beam_PreCommit_Java.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +}