Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ boolean hasNoActiveWindows() {
return activeWindows.getActiveAndNewWindows().isEmpty();
}

@VisibleForTesting
TriggerStateMachineRunner<W> getTriggerRunner() {
return triggerRunner;
}

@VisibleForTesting
ReduceFnContextFactory<K, InputT, OutputT, W> getContextFactory() {
return contextFactory;
}

private Set<W> windowsThatAreOpen(Collection<W> windows) {
Set<W> result = new HashSet<>();
for (W window : windows) {
Expand Down Expand Up @@ -603,6 +613,14 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);

if (triggerRunner.isNew(directContext.state())) {
// Blindly clear state to ensure Windmill doesn't do unnecessary reads.
reduceFn.clearState(renamedContext);
paneInfoTracker.clear(directContext.state());
watermarkHold.setKnownEmpty(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
}

nonEmptyPanes.recordContent(renamedContext.state());
scheduleGarbageCollectionTimer(directContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,23 @@ public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
context.state().access(EXTRA_HOLD_TAG).clear();
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Permit marking the watermark holds as empty locally, without necessarily clearing them in
* the backend.
*/
public void setKnownEmpty(ReduceFn<?, ?, ?, W>.Context context) {
WindowTracing.debug(
"WatermarkHold.setKnownEmpty: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
context.key(),
context.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
context.state().access(elementHoldTag).setKnownEmpty();
context.state().access(EXTRA_HOLD_TAG).setKnownEmpty();
}

/** Return the current data hold, or null if none. Does not clear. For debugging only. */
public @Nullable Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
return context.state().access(elementHoldTag).read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core.triggers;

import java.util.BitSet;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. */
public class FinishedTriggersBitSet implements FinishedTriggers {
Expand Down Expand Up @@ -60,4 +61,17 @@ public void clearRecursively(ExecutableTriggerStateMachine trigger) {
public FinishedTriggersBitSet copy() {
return new FinishedTriggersBitSet((BitSet) bitSet.clone());
}

@Override
public boolean equals(@Nullable Object obj) {
if (!(obj instanceof FinishedTriggersBitSet)) {
return false;
}
return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet);
}

@Override
public int hashCode() {
return bitSet.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
}

@Nullable BitSet bitSet = state.read();
return bitSet == null
? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
: FinishedTriggersBitSet.fromBitSet(bitSet);
if (bitSet == null) {
return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
}

return FinishedTriggersBitSet.fromBitSet(bitSet);
}

private void clearFinishedBits(ValueState<BitSet> state) {
Expand All @@ -99,6 +101,16 @@ public boolean isClosed(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}

/** Return true if the window is new (no trigger state has ever been persisted). */
public boolean isNew(StateAccessor<?> state) {
return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == null;
}

@VisibleForTesting
public BitSet getFinishedBits(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet();
}

public void prefetchIsClosed(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
Expand Down Expand Up @@ -187,12 +199,9 @@ private void persistFinishedSet(
}

ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
if (modifiedFinishedSet.getBitSet().isEmpty()) {
finishedSetState.clear();
} else {
finishedSetState.write(modifiedFinishedSet.getBitSet());
}
@Nullable BitSet currentBits = finishedSetState.read();
if (currentBits == null || !currentBits.equals(modifiedFinishedSet.getBitSet())) {
finishedSetState.write(modifiedFinishedSet.getBitSet());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
Expand Down Expand Up @@ -2343,4 +2345,50 @@ public interface TestOptions extends PipelineOptions {

void setValue(int value);
}

@Test
public void testNewWindowOptimization() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(AfterPane.elementCountAtLeast(2))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);

ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(strategy);

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));

// 1. First element for a new window.
tester.injectElements(TimestampedValue.of(1, new Instant(1)));

// Verify sentinel bit is written.
BitSet bitSet =
tester
.createRunner()
.getTriggerRunner()
.getFinishedBits(
tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state());

// We expect the bitset to be empty (the sentinel bit is no longer used).
assertTrue("Bitset should be empty", bitSet.isEmpty());
// And trigger not finished.
assertFalse("Trigger should not be finished", bitSet.get(0));

// And verify that it is no longer "new".
assertFalse(
"Window should no longer be new",
tester
.createRunner()
.getTriggerRunner()
.isNew(
tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state()));

// 2. Second element for the same window.
// We want to verify it doesn't clear the first element.
tester.injectElements(TimestampedValue.of(2, new Instant(2)));

// Extract output.
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class WindmillWatermarkHold extends WindmillState implements WatermarkHoldState {

// The encoded size of an Instant.
private static final int ENCODED_SIZE = 8;

Expand All @@ -46,6 +47,7 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol
private final String stateFamily;

private boolean cleared = false;
private boolean knownEmpty = false;
/**
* If non-{@literal null}, the known current hold value, or absent if we know there are no output
* watermark holds. If {@literal null}, the current hold value could depend on holds in Windmill
Expand Down Expand Up @@ -77,6 +79,13 @@ public void clear() {
localAdditions = null;
}

@Override
public void setKnownEmpty() {
cachedValue = Optional.absent();
localAdditions = null;
knownEmpty = true;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public WindmillWatermarkHold readLater() {
Expand Down Expand Up @@ -133,7 +142,7 @@ public Future<Windmill.WorkItemCommitRequest> persist(

Future<Windmill.WorkItemCommitRequest> result;

if (!cleared && localAdditions == null) {
if (!knownEmpty && !cleared && localAdditions == null) {
// No changes, so no need to update Windmill and no need to cache any value.
return Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
}
Expand Down Expand Up @@ -166,15 +175,19 @@ public Future<Windmill.WorkItemCommitRequest> persist(
} else if (!cleared && localAdditions != null) {
// Otherwise, we need to combine the local additions with the already persisted data
result = combineWithPersisted();
} else if (knownEmpty) {
result = Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
} else {
throw new IllegalStateException("Unreachable condition");
}

final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size();

return Futures.lazyTransform(
result,
result1 -> {
cleared = false;
knownEmpty = false;
localAdditions = null;
if (cachedValue != null) {
cache.put(namespace, stateKey, WindmillWatermarkHold.this, estimatedByteSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3037,6 +3037,43 @@ public void testWatermarkClearBeforeRead() throws Exception {
Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testWatermarkSetKnownEmptyBeforeRead() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);

WatermarkHoldState bag = underTest.state(NAMESPACE, addr);

bag.setKnownEmpty();
assertThat(bag.read(), Matchers.nullValue());

bag.add(new Instant(300));
assertThat(bag.read(), Matchers.equalTo(new Instant(300)));

// Shouldn't need to read from windmill because the value is already available.
Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testWatermarkSetKnownEmptyPersist() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);

WatermarkHoldState bag = underTest.state(NAMESPACE, addr);

bag.add(new Instant(1000));
bag.setKnownEmpty();

Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);

// Should be a no-op, no reset, no adds.
assertEquals(0, commitBuilder.getWatermarkHoldsCount());

Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testWatermarkPersistEarliest() throws Exception {
StateTag<WatermarkHoldState> addr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ public interface WatermarkHoldState extends GroupingState<Instant, Instant> {

@Override
WatermarkHoldState readLater();

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Permit marking the state as empty locally, without necessarily clearing it in the backend.
*
* <p>This may be used by runners to optimize out unnecessary state reads.
*/
@Internal
default void setKnownEmpty() {}
}
Loading