diff --git a/.github/trigger_files/beam_PreCommit_Java.json b/.github/trigger_files/beam_PreCommit_Java.json new file mode 100644 index 000000000000..5abe02fc09c7 --- /dev/null +++ b/.github/trigger_files/beam_PreCommit_Java.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 42db5fbccae9..daea323d2638 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -161,6 +161,8 @@ type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int + // Whether to use real-time clock as processing time + EnableRTC bool } // ElementManager handles elements, watermarks, and related errata to determine @@ -2160,8 +2162,10 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { } // "Test" mode -> advance to next processing time event if any, to allow execution. - if t, ok := em.processTimeEvents.Peek(); ok { - return t + if !em.config.EnableRTC { + if t, ok := em.processTimeEvents.Peek(); ok { + return t + } } // "Production" mode, always real time now. diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 51848944c0c1..ab041da314dc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -145,7 +145,20 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic topo := prepro.preProcessGraph(comps, j) ts := comps.GetTransforms() - em := engine.NewElementManager(engine.Config{}) + config := engine.Config{} + m := j.PipelineOptions().AsMap() + if experimentsSlice, ok := m["beam:option:experiments:v1"].([]interface{}); ok { + for _, exp := range experimentsSlice { + if expStr, ok := exp.(string); ok { + if expStr == "prism_enable_rtc" { + config.EnableRTC = true + break // Found it, no need to check the rest of the slice + } + } + } + } + + em := engine.NewElementManager(config) // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{}