From 6c7ed12631c7c4aba4e44fc0bd1b7f82f10b46ae Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 4 Mar 2016 10:01:51 -0800 Subject: [PATCH] Remove StateSampler from the SDK --- .../inprocess/InProcessExecutionContext.java | 4 +- .../inprocess/ParDoInProcessEvaluator.java | 2 +- .../beam/sdk/transforms/DoFnTester.java | 21 +- .../org/apache/beam/sdk/transforms/ParDo.java | 2 +- .../beam/sdk/util/BaseExecutionContext.java | 31 +- .../sdk/util/DirectModeExecutionContext.java | 4 +- .../beam/sdk/util/ExecutionContext.java | 4 +- .../sdk/util/common/worker/StateSampler.java | 367 ------------------ .../InProcessEvaluationContextTest.java | 16 +- .../util/GroupAlsoByWindowsProperties.java | 2 +- 10 files changed, 46 insertions(+), 407 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java index 1430c989a6ca..e6441cf1e60d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.util.BaseExecutionContext; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.common.worker.StateSampler; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; /** @@ -47,8 +46,7 @@ public InProcessExecutionContext(Clock clock, Object key, } @Override - protected InProcessStepContext createStepContext( - String stepName, String transformName, StateSampler stateSampler) { + protected InProcessStepContext createStepContext(String stepName, String transformName) { return new InProcessStepContext(this, stepName, transformName); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java index a2f080c19cf9..9840e067d6cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -54,7 +54,7 @@ public static ParDoInProcessEvaluator create( evaluationContext.getExecutionContext(application, inputBundle.getKey()); String stepName = evaluationContext.getStepName(application); InProcessStepContext stepContext = - executionContext.getOrCreateStepContext(stepName, stepName, null); + executionContext.getOrCreateStepContext(stepName, stepName); CounterSet counters = evaluationContext.createCounterSet(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 1c60259100fa..16dc731816f8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -483,15 +483,16 @@ void initializeState() { runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue()); } outputManager = new DoFnRunnerBase.ListOutputManager(); - fnRunner = DoFnRunners.createDefault( - options, - fn, - DirectSideInputReader.of(runnerSideInputs), - outputManager, - mainOutputTag, - sideOutputTags, - DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME, null), - counterSet.getAddCounterMutator(), - WindowingStrategy.globalDefault()); + fnRunner = + DoFnRunners.createDefault( + options, + fn, + DirectSideInputReader.of(runnerSideInputs), + outputManager, + mainOutputTag, + sideOutputTags, + DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME), + counterSet.getAddCounterMutator(), + WindowingStrategy.globalDefault()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index d266155b470c..02464ac2bc5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -1200,7 +1200,7 @@ private static void evaluateHelpe outputManager, mainOutputTag, sideOutputTags, - executionContext.getOrCreateStepContext(stepName, stepName, null), + executionContext.getOrCreateStepContext(stepName, stepName), context.getAddCounterMutator(), input.getWindowingStrategy()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java index 33df089226bf..630417bc2dfe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java @@ -19,9 +19,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.common.worker.StateSampler; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; +import com.google.common.base.Supplier; import java.io.IOException; import java.util.Collection; @@ -37,7 +37,7 @@ * be cached for the lifetime of this {@link ExecutionContext}. * *

BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass - * of {@link StepContext} from {@link #getOrCreateStepContext(String, String, StateSampler)} and + * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. *

  * @Override
@@ -47,8 +47,8 @@
  * 
* *

When a subclass of {@code BaseExecutionContext} has been downcast, the return types of - * {@link #createStepContext(String, String, StateSampler)}, - * {@link #getOrCreateStepContext(String, String, StateSampler}, and {@link #getAllStepContexts()} + * {@link #createStepContext(String, String)}, + * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} * will be appropriately specialized. */ public abstract class BaseExecutionContext @@ -60,21 +60,32 @@ public abstract class BaseExecutionContext() { + @Override + public T get() { + return createStepContext(finalStepName, finalTransformName); + } + }); + } + + protected final T getOrCreateStepContext(String stepName, Supplier createContextFunc) { T context = cachedStepContexts.get(stepName); if (context == null) { - context = createStepContext(stepName, transformName, stateSampler); + context = createContextFunc.get(); cachedStepContexts.put(stepName, context); } + return context; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java index c3da3d7fced3..85e36dd6d14b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata; -import org.apache.beam.sdk.util.common.worker.StateSampler; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; @@ -48,8 +47,7 @@ public static DirectModeExecutionContext create() { } @Override - protected StepContext createStepContext( - String stepName, String transformName, StateSampler stateSampler) { + protected StepContext createStepContext(String stepName, String transformName) { return new StepContext(this, stepName, transformName); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java index 577aa666ec2f..01bde829972d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java @@ -19,7 +19,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.common.worker.StateSampler; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; @@ -34,8 +33,7 @@ public interface ExecutionContext { /** * Returns the {@link StepContext} associated with the given step. */ - StepContext getOrCreateStepContext( - String stepName, String transformName, StateSampler stateSampler); + StepContext getOrCreateStepContext(String stepName, String transformName); /** * Returns a collection view of all of the {@link StepContext}s. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java deleted file mode 100644 index ee95260c9cc9..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common.worker; - -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterSet; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.ThreadSafe; - -/** - * A StateSampler object may be used to obtain an approximate - * breakdown of the time spent by an execution context in various - * states, as a fraction of the total time. The sampling is taken at - * regular intervals, with adjustment for scheduling delay. - */ -@ThreadSafe -public class StateSampler implements AutoCloseable { - - /** Different kinds of states. */ - public enum StateKind { - /** IO, user code, etc. */ - USER, - /** Reading/writing from/to shuffle service, etc. */ - FRAMEWORK - } - - public static final long DEFAULT_SAMPLING_PERIOD_MS = 200; - - private final String prefix; - private final CounterSet.AddCounterMutator counterSetMutator; - - /** Array of counters indexed by their state. */ - private ArrayList> countersByState = new ArrayList<>(); - - /** Map of state name to state. */ - private Map statesByName = new HashMap<>(); - - /** Map of state id to kind. */ - private Map kindsByState = new HashMap<>(); - - /** The current state. */ - private volatile int currentState; - - /** Special value of {@code currentState} that means we do not sample. */ - public static final int DO_NOT_SAMPLE = -1; - - /** - * A counter that increments with each state transition. May be used - * to detect a context being stuck in a state for some amount of - * time. - */ - private volatile long stateTransitionCount; - - /** - * The timestamp (in nanoseconds) corresponding to the last time the - * state was sampled (and recorded). - */ - private long stateTimestampNs = 0; - - /** Using a fixed number of timers for all StateSampler objects. */ - private static final int NUM_EXECUTOR_THREADS = 16; - - private static final ScheduledExecutorService executorService = - Executors.newScheduledThreadPool(NUM_EXECUTOR_THREADS, - new ThreadFactoryBuilder().setDaemon(true).build()); - - private Random rand = new Random(); - - private List callbacks = new ArrayList<>(); - - private ScheduledFuture invocationTriggerFuture = null; - - private ScheduledFuture invocationFuture = null; - - /** - * Constructs a new {@link StateSampler} that can be used to obtain - * an approximate breakdown of the time spent by an execution - * context in various states, as a fraction of the total time. - * - * @param prefix the prefix of the counter names for the states - * @param counterSetMutator the {@link CounterSet.AddCounterMutator} - * used to create a counter for each distinct state - * @param samplingPeriodMs the sampling period in milliseconds - */ - public StateSampler(String prefix, - CounterSet.AddCounterMutator counterSetMutator, - final long samplingPeriodMs) { - this.prefix = prefix; - this.counterSetMutator = counterSetMutator; - currentState = DO_NOT_SAMPLE; - scheduleSampling(samplingPeriodMs); - } - - /** - * Constructs a new {@link StateSampler} that can be used to obtain - * an approximate breakdown of the time spent by an execution - * context in various states, as a fraction of the total time. - * - * @param prefix the prefix of the counter names for the states - * @param counterSetMutator the {@link CounterSet.AddCounterMutator} - * used to create a counter for each distinct state - */ - public StateSampler(String prefix, - CounterSet.AddCounterMutator counterSetMutator) { - this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS); - } - - /** - * Called by the constructor to schedule sampling at the given period. - * - *

Should not be overridden by sub-classes unless they want to change - * or disable the automatic sampling of state. - */ - protected void scheduleSampling(final long samplingPeriodMs) { - // Here "stratified sampling" is used, which makes sure that there's 1 uniformly chosen sampled - // point in every bucket of samplingPeriodMs, to prevent pathological behavior in case some - // states happen to occur at a similar period. - // The current implementation uses a fixed-rate timer with a period samplingPeriodMs as a - // trampoline to a one-shot random timer which fires with a random delay within - // samplingPeriodMs. - stateTimestampNs = System.nanoTime(); - invocationTriggerFuture = - executorService.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - long delay = rand.nextInt((int) samplingPeriodMs); - synchronized (StateSampler.this) { - if (invocationFuture != null) { - invocationFuture.cancel(false); - } - invocationFuture = - executorService.schedule( - new Runnable() { - @Override - public void run() { - StateSampler.this.run(); - } - }, - delay, - TimeUnit.MILLISECONDS); - } - } - }, - 0, - samplingPeriodMs, - TimeUnit.MILLISECONDS); - } - - public synchronized void run() { - long startTimestampNs = System.nanoTime(); - int state = currentState; - if (state != DO_NOT_SAMPLE) { - StateKind kind = null; - long elapsedMs = TimeUnit.NANOSECONDS.toMillis(startTimestampNs - stateTimestampNs); - kind = kindsByState.get(state); - countersByState.get(state).addValue(elapsedMs); - // Invoke all callbacks. - for (SamplingCallback c : callbacks) { - c.run(state, kind, elapsedMs); - } - } - stateTimestampNs = startTimestampNs; - } - - @Override - public synchronized void close() { - currentState = DO_NOT_SAMPLE; - if (invocationTriggerFuture != null) { - invocationTriggerFuture.cancel(false); - } - if (invocationFuture != null) { - invocationFuture.cancel(false); - } - } - - /** - * Returns the state associated with a name; creating a new state if - * necessary. Using states instead of state names during state - * transitions is done for efficiency. - * - * @name the name for the state - * @kind kind of the state, see {#code StateKind} - * @return the state associated with the state name - */ - public int stateForName(String name, StateKind kind) { - if (name.isEmpty()) { - return DO_NOT_SAMPLE; - } - - synchronized (this) { - Integer state = statesByName.get(name); - if (state == null) { - String counterName = prefix + name + "-msecs"; - Counter counter = counterSetMutator.addCounter( - Counter.longs(counterName, Counter.AggregationKind.SUM)); - state = countersByState.size(); - statesByName.put(name, state); - countersByState.add(counter); - kindsByState.put(state, kind); - } - StateKind originalKind = kindsByState.get(state); - if (originalKind != kind) { - throw new IllegalArgumentException( - "for state named " + name - + ", requested kind " + kind + " different from the original kind " + originalKind); - } - return state; - } - } - - /** - * An internal class for representing StateSampler information - * typically used for debugging. - */ - public static class StateSamplerInfo { - public final String state; - public final Long transitionCount; - public final Long stateDurationMillis; - - public StateSamplerInfo(String state, Long transitionCount, - Long stateDurationMillis) { - this.state = state; - this.transitionCount = transitionCount; - this.stateDurationMillis = stateDurationMillis; - } - } - - /** - * Returns information about the current state of this state sampler - * into a {@link StateSamplerInfo} object, or null if sampling is - * not turned on. - * - * @return information about this state sampler or null if sampling is off - */ - public synchronized StateSamplerInfo getInfo() { - return currentState == DO_NOT_SAMPLE ? null - : new StateSamplerInfo(countersByState.get(currentState).getFlatName(), - stateTransitionCount, null); - } - - /** - * Returns the current state of this state sampler. - */ - public int getCurrentState() { - return currentState; - } - - /** - * Sets the current thread state. - * - * @param state the new state to transition to - * @return the previous state - */ - public int setState(int state) { - // Updates to stateTransitionCount are always done by the same - // thread, making the non-atomic volatile update below safe. The - // count is updated first to avoid incorrectly attributing - // stuckness occuring in an old state to the new state. - long previousStateTransitionCount = this.stateTransitionCount; - this.stateTransitionCount = previousStateTransitionCount + 1; - int previousState = currentState; - currentState = state; - return previousState; - } - - /** - * Sets the current thread state. - * - * @param name the name of the new state to transition to - * @param kind kind of the new state - * @return the previous state - */ - public int setState(String name, StateKind kind) { - return setState(stateForName(name, kind)); - } - - /** - * Returns an AutoCloseable {@link ScopedState} that will perform a - * state transition to the given state, and will automatically reset - * the state to the prior state upon closing. - * - * @param state the new state to transition to - * @return a {@link ScopedState} that automatically resets the state - * to the prior state - */ - public ScopedState scopedState(int state) { - return new ScopedState(this, setState(state)); - } - - /** - * Add a callback to the sampler. - * The callbacks will be executed sequentially upon {@link StateSampler#run}. - */ - public synchronized void addSamplingCallback(SamplingCallback callback) { - callbacks.add(callback); - } - - /** Get the counter prefix associated with this sampler. */ - public String getPrefix() { - return prefix; - } - - /** - * A nested class that is used to account for states and state - * transitions based on lexical scopes. - * - *

Thread-safe. - */ - public class ScopedState implements AutoCloseable { - private StateSampler sampler; - private int previousState; - - private ScopedState(StateSampler sampler, int previousState) { - this.sampler = sampler; - this.previousState = previousState; - } - - @Override - public void close() { - sampler.setState(previousState); - } - } - - /** - * Callbacks which supposed to be called sequentially upon {@link StateSampler#run}. - * They should be registered via {@link #addSamplingCallback}. - */ - public static interface SamplingCallback { - /** - * The entrance method of the callback, it is called in {@link StateSampler#run}, - * once per sample. This method should be thread safe. - * - * @param state The state of the StateSampler at the time of sample. - * @param kind The kind associated with the state, see {@link StateKind}. - * @param elapsedMs Milliseconds since last sample. - */ - public void run(int state, StateKind kind, long elapsedMs); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 50b83fda21f7..ee56954dbf08 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -160,7 +160,7 @@ public void getExecutionContextSameStepSameKeyState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1", null); + InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( @@ -176,7 +176,7 @@ public void getExecutionContextSameStepSameKeyState() { context.getExecutionContext(created.getProducingTransformInternal(), "foo"); assertThat( secondFooContext - .getOrCreateStepContext("s1", "s1", null) + .getOrCreateStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -192,7 +192,7 @@ public void getExecutionContextDifferentKeysIndependentState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getOrCreateStepContext("s1", "s1", null) + .getOrCreateStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -202,7 +202,7 @@ public void getExecutionContextDifferentKeysIndependentState() { assertThat(barContext, not(equalTo(fooContext))); assertThat( barContext - .getOrCreateStepContext("s1", "s1", null) + .getOrCreateStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -218,7 +218,7 @@ public void getExecutionContextDifferentStepsIndependentState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getOrCreateStepContext("s1", "s1", null) + .getOrCreateStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -227,7 +227,7 @@ public void getExecutionContextDifferentStepsIndependentState() { context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); assertThat( barContext - .getOrCreateStepContext("s1", "s1", null) + .getOrCreateStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -273,7 +273,7 @@ public void handleResultStoresState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); CopyOnAccessInMemoryStateInternals state = - fooContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + fooContext.getOrCreateStepContext("s1", "s1").stateInternals(); BagState bag = state.state(StateNamespaces.global(), intBag); bag.add(1); bag.add(2); @@ -293,7 +293,7 @@ public void handleResultStoresState() { context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); CopyOnAccessInMemoryStateInternals afterResultState = - afterResultContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index d21edd16357e..d5aa0daffd40 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -708,7 +708,7 @@ List>> runGABW( outputManager, outputTag, new ArrayList>(), - executionContext.getOrCreateStepContext("GABWStep", "GABWTransform", null), + executionContext.getOrCreateStepContext("GABWStep", "GABWTransform"), counters.getAddCounterMutator(), windowingStrategy); }