diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index daea323d2638..42db5fbccae9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -161,8 +161,6 @@ type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int - // Whether to use real-time clock as processing time - EnableRTC bool } // ElementManager handles elements, watermarks, and related errata to determine @@ -2162,10 +2160,8 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { } // "Test" mode -> advance to next processing time event if any, to allow execution. - if !em.config.EnableRTC { - if t, ok := em.processTimeEvents.Peek(); ok { - return t - } + if t, ok := em.processTimeEvents.Peek(); ok { + return t } // "Production" mode, always real time now. diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 427f1eb7145d..51848944c0c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -145,18 +145,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic topo := prepro.preProcessGraph(comps, j) ts := comps.GetTransforms() - config := engine.Config{} - m := j.PipelineOptions().AsMap() - for _, exp := range m["beam:option:experiments:v1"].([]interface{}) { - if expStr, ok := exp.(string); ok { - if expStr == "prism_enable_rtc" { - config.EnableRTC = true - break // Found it, no need to check the rest of the slice - } - } - } - - em := engine.NewElementManager(config) + em := engine.NewElementManager(engine.Config{}) // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{}