diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 08056ae18b22..daea323d2638 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -161,6 +161,8 @@ type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int + // Whether to use real-time clock as processing time + EnableRTC bool } // ElementManager handles elements, watermarks, and related errata to determine @@ -2158,11 +2160,12 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { if em.testStreamHandler != nil && !em.testStreamHandler.completed { return em.testStreamHandler.Now() } - // TODO toggle between testmode and production mode. + // "Test" mode -> advance to next processing time event if any, to allow execution. - // if test mode... - if t, ok := em.processTimeEvents.Peek(); ok { - return t + if !em.config.EnableRTC { + if t, ok := em.processTimeEvents.Peek(); ok { + return t + } } // "Production" mode, always real time now. diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 51848944c0c1..427f1eb7145d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -145,7 +145,18 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic topo := prepro.preProcessGraph(comps, j) ts := comps.GetTransforms() - em := engine.NewElementManager(engine.Config{}) + config := engine.Config{} + m := j.PipelineOptions().AsMap() + for _, exp := range m["beam:option:experiments:v1"].([]interface{}) { + if expStr, ok := exp.(string); ok { + if expStr == "prism_enable_rtc" { + config.EnableRTC = true + break // Found it, no need to check the rest of the slice + } + } + } + + em := engine.NewElementManager(config) // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{}