Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,17 @@ public synchronized Instant getEarliestTimerTimestamp() {
}

private synchronized void updateTimers(TimerUpdate update) {
for (TimerData completedTimer : update.completedTimers) {
pendingTimers.remove(completedTimer);
}
Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
for (TimerData addedTimer : update.setTimers) {
NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
if (timerQueue != null) {
timerQueue.add(addedTimer);
}
}

for (TimerData completedTimer : update.completedTimers) {
pendingTimers.remove(completedTimer);
}
for (TimerData deletedTimer : update.deletedTimers) {
NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
if (timerQueue != null) {
Expand Down Expand Up @@ -829,28 +830,39 @@ private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) {
}

/**
* Removes all of the completed Timers from the collection of pending timers, adds all new timers,
* and removes all deleted timers. Removes all elements consumed by the input bundle from the
* {@link PTransform PTransforms} collection of pending elements, and adds all elements produced
* by the {@link PTransform} to the pending queue of each consumer.
* First adds all produced elements to the queue of pending elements for each consumer, then adds
* all pending timers to the collection of pending timers, then removes all completed and deleted
* timers from the collection of pending timers, then removes all completed elements from the
* pending queue of the transform.
*
* <p>It is required that all newly pending elements are added to the queue of pending elements
* for each consumer prior to the completed elements being removed, as doing otherwise could cause
* a Watermark to appear in a state in which the upstream (completed) element does not hold the
* watermark but the element it produced is not yet pending. This can cause the watermark to
* erroneously advance.
*/
private void updatePending(
CommittedBundle<?> input,
AppliedPTransform<?, ?, ?> transform,
TimerUpdate timerUpdate,
Iterable<? extends CommittedBundle<?>> outputs) {
TransformWatermarks completedTransform = transformToWatermarks.get(transform);
completedTransform.updateTimers(timerUpdate);
if (input != null) {
completedTransform.removePending(input);
}

// Newly pending elements must be added before completed elements are removed, as the two
// do not share a Mutex within this call and thus can be interleaved with external calls to
// refresh.
for (CommittedBundle<?> bundle : outputs) {
for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
TransformWatermarks watermarks = transformToWatermarks.get(consumer);
watermarks.addPending(bundle);
}
}

completedTransform.updateTimers(timerUpdate);
if (input != null) {
completedTransform.removePending(input);
}

}

/**
Expand Down