diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index cba9007cc6..ee8f02f2c5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -44,8 +44,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted. * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused * and any splits are honored. + * + *

The Queue storing available evaluators must enforce a happens-before relationship for + * elements being added to the queue to accesses after it, to ensure that updates performed to the + * state of an evaluator are properly visible. ConcurrentLinkedQueue provides this relation, but + * an arbitrary Queue implementation does not, so the concrete type is used explicitly. */ - private final ConcurrentMap>> + private final ConcurrentMap< + EvaluatorKey, ConcurrentLinkedQueue>> sourceEvaluators = new ConcurrentHashMap<>(); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -81,8 +87,8 @@ private Queue> getTransformEvaluatorQu // Pipeline#run). EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); @SuppressWarnings("unchecked") - Queue> evaluatorQueue = - (Queue>) sourceEvaluators.get(key); + ConcurrentLinkedQueue> evaluatorQueue = + (ConcurrentLinkedQueue>) sourceEvaluators.get(key); if (evaluatorQueue == null) { evaluatorQueue = new ConcurrentLinkedQueue<>(); if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { @@ -95,7 +101,8 @@ private Queue> getTransformEvaluatorQu evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us - evaluatorQueue = (Queue>) sourceEvaluators.get(key); + evaluatorQueue = + (ConcurrentLinkedQueue>) sourceEvaluators.get(key); } } return evaluatorQueue; @@ -115,7 +122,7 @@ private static class UnboundedReadEvaluator implements TransformEvaluat private static final int ARBITRARY_MAX_ELEMENTS = 10; private final AppliedPTransform, Unbounded> transform; private final InProcessEvaluationContext evaluationContext; - private final Queue> evaluatorQueue; + private final ConcurrentLinkedQueue> evaluatorQueue; /** * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same * source as derived from {@link #transform} due to splitting. @@ -127,7 +134,7 @@ public UnboundedReadEvaluator( AppliedPTransform, Unbounded> transform, InProcessEvaluationContext evaluationContext, UnboundedSource source, - Queue> evaluatorQueue) { + ConcurrentLinkedQueue> evaluatorQueue) { this.transform = transform; this.evaluationContext = evaluationContext; this.evaluatorQueue = evaluatorQueue;