diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index 6f68b7e42f..918a7f4a1c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -466,9 +466,6 @@ public synchronized Instant getEarliestTimerTimestamp() { } private synchronized void updateTimers(TimerUpdate update) { - for (TimerData completedTimer : update.completedTimers) { - pendingTimers.remove(completedTimer); - } Map> timerMap = timerMap(update.key); for (TimerData addedTimer : update.setTimers) { NavigableSet timerQueue = timerMap.get(addedTimer.getDomain()); @@ -476,6 +473,10 @@ private synchronized void updateTimers(TimerUpdate update) { timerQueue.add(addedTimer); } } + + for (TimerData completedTimer : update.completedTimers) { + pendingTimers.remove(completedTimer); + } for (TimerData deletedTimer : update.deletedTimers) { NavigableSet timerQueue = timerMap.get(deletedTimer.getDomain()); if (timerQueue != null) { @@ -829,10 +830,16 @@ private void refreshWatermarks(AppliedPTransform transform) { } /** - * Removes all of the completed Timers from the collection of pending timers, adds all new timers, - * and removes all deleted timers. Removes all elements consumed by the input bundle from the - * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced - * by the {@link PTransform} to the pending queue of each consumer. + * First adds all produced elements to the queue of pending elements for each consumer, then adds + * all pending timers to the collection of pending timers, then removes all completed and deleted + * timers from the collection of pending timers, then removes all completed elements from the + * pending queue of the transform. + * + *

It is required that all newly pending elements are added to the queue of pending elements + * for each consumer prior to the completed elements being removed, as doing otherwise could cause + * a Watermark to appear in a state in which the upstream (completed) element does not hold the + * watermark but the element it produced is not yet pending. This can cause the watermark to + * erroneously advance. */ private void updatePending( CommittedBundle input, @@ -840,17 +847,22 @@ private void updatePending( TimerUpdate timerUpdate, Iterable> outputs) { TransformWatermarks completedTransform = transformToWatermarks.get(transform); - completedTransform.updateTimers(timerUpdate); - if (input != null) { - completedTransform.removePending(input); - } + // Newly pending elements must be added before completed elements are removed, as the two + // do not share a Mutex within this call and thus can be interleaved with external calls to + // refresh. for (CommittedBundle bundle : outputs) { for (AppliedPTransform consumer : consumers.get(bundle.getPCollection())) { TransformWatermarks watermarks = transformToWatermarks.get(consumer); watermarks.addPending(bundle); } } + + completedTransform.updateTimers(timerUpdate); + if (input != null) { + completedTransform.removePending(input); + } + } /**