Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
* Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
* and any splits are honored.
*
* <p>The Queue storing available evaluators must enforce a happens-before relationship for
* elements being added to the queue to accesses after it, to ensure that updates performed to the
* state of an evaluator are properly visible. ConcurrentLinkedQueue provides this relation, but
* an arbitrary Queue implementation does not, so the concrete type is used explicitly.
*/
private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
private final ConcurrentMap<
EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>>>
sourceEvaluators = new ConcurrentHashMap<>();

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -83,8 +89,8 @@ private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQu
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@SuppressWarnings("unchecked")
Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
(Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
(ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
if (evaluatorQueue == null) {
evaluatorQueue = new ConcurrentLinkedQueue<>();
if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
Expand All @@ -97,7 +103,8 @@ private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQu
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
evaluatorQueue =
(ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
}
}
return evaluatorQueue;
Expand All @@ -117,7 +124,7 @@ private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluat
private static final int ARBITRARY_MAX_ELEMENTS = 10;
private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
/**
* The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
* source as derived from {@link #transform} due to splitting.
Expand All @@ -129,7 +136,7 @@ public UnboundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext,
UnboundedSource<OutputT, ?> source,
Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.evaluatorQueue = evaluatorQueue;
Expand Down