diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 7a95c9f49bdc..14fb8e22c0e0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -46,8 +46,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
* Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
* and any splits are honored.
+ *
+ *
The Queue storing available evaluators must enforce a happens-before relationship for
+ * elements being added to the queue to accesses after it, to ensure that updates performed to the
+ * state of an evaluator are properly visible. ConcurrentLinkedQueue provides this relation, but
+ * an arbitrary Queue implementation does not, so the concrete type is used explicitly.
*/
- private final ConcurrentMap>>
+ private final ConcurrentMap<
+ EvaluatorKey, ConcurrentLinkedQueue extends UnboundedReadEvaluator>>>
sourceEvaluators = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -83,8 +89,8 @@ private Queue> getTransformEvaluatorQu
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@SuppressWarnings("unchecked")
- Queue> evaluatorQueue =
- (Queue>) sourceEvaluators.get(key);
+ ConcurrentLinkedQueue> evaluatorQueue =
+ (ConcurrentLinkedQueue>) sourceEvaluators.get(key);
if (evaluatorQueue == null) {
evaluatorQueue = new ConcurrentLinkedQueue<>();
if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
@@ -97,7 +103,8 @@ private Queue> getTransformEvaluatorQu
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
- evaluatorQueue = (Queue>) sourceEvaluators.get(key);
+ evaluatorQueue =
+ (ConcurrentLinkedQueue>) sourceEvaluators.get(key);
}
}
return evaluatorQueue;
@@ -117,7 +124,7 @@ private static class UnboundedReadEvaluator implements TransformEvaluat
private static final int ARBITRARY_MAX_ELEMENTS = 10;
private final AppliedPTransform, PCollection, Unbounded> transform;
private final InProcessEvaluationContext evaluationContext;
- private final Queue> evaluatorQueue;
+ private final ConcurrentLinkedQueue> evaluatorQueue;
/**
* The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
* source as derived from {@link #transform} due to splitting.
@@ -129,7 +136,7 @@ public UnboundedReadEvaluator(
AppliedPTransform, PCollection, Unbounded> transform,
InProcessEvaluationContext evaluationContext,
UnboundedSource source,
- Queue> evaluatorQueue) {
+ ConcurrentLinkedQueue> evaluatorQueue) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.evaluatorQueue = evaluatorQueue;