diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index caec1fc7c13d..f034e2fc0a14 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -20,7 +20,6 @@ import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; import com.google.cloud.dataflow.sdk.io.Read.Bounded; -import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -62,8 +61,7 @@ public TransformEvaluator forApplication( private TransformEvaluator getTransformEvaluator( final AppliedPTransform, Bounded> transform, - final InProcessEvaluationContext evaluationContext) - throws IOException { + final InProcessEvaluationContext evaluationContext) { BoundedReadEvaluator evaluator = getTransformEvaluatorQueue(transform, evaluationContext).poll(); if (evaluator == null) { @@ -93,8 +91,9 @@ private Queue> getTransformEvaluatorQueu if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + BoundedSource source = transform.getTransform().getSource(); BoundedReadEvaluator evaluator = - new BoundedReadEvaluator(transform, evaluationContext); + new BoundedReadEvaluator(transform, evaluationContext, source); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -116,13 +115,19 @@ private Queue> getTransformEvaluatorQueu private static class BoundedReadEvaluator implements TransformEvaluator { private final AppliedPTransform, Bounded> transform; private final InProcessEvaluationContext evaluationContext; - private boolean contentsRemaining; + /** + * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same + * as the source derived from {@link #transform} due to splitting. + */ + private BoundedSource source; public BoundedReadEvaluator( AppliedPTransform, Bounded> transform, - InProcessEvaluationContext evaluationContext) { + InProcessEvaluationContext evaluationContext, + BoundedSource source) { this.transform = transform; this.evaluationContext = evaluationContext; + this.source = source; } @Override @@ -130,12 +135,9 @@ public void processElement(WindowedValue element) {} @Override public InProcessTransformResult finishBundle() throws IOException { - try (final Reader reader = - transform - .getTransform() - .getSource() - .createReader(evaluationContext.getPipelineOptions());) { - contentsRemaining = reader.start(); + try (final BoundedReader reader = + source.createReader(evaluationContext.getPipelineOptions());) { + boolean contentsRemaining = reader.start(); UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); while (contentsRemaining) { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index fa162903c25a..0f2e4f487948 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -90,8 +90,10 @@ private Queue> getTransformEvaluatorQu if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + UnboundedSource source = transform.getTransform().getSource(); UnboundedReadEvaluator evaluator = - new UnboundedReadEvaluator(transform, evaluationContext, evaluatorQueue); + new UnboundedReadEvaluator( + transform, evaluationContext, source, evaluatorQueue); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -116,15 +118,22 @@ private static class UnboundedReadEvaluator implements TransformEvaluat private final AppliedPTransform, Unbounded> transform; private final InProcessEvaluationContext evaluationContext; private final Queue> evaluatorQueue; + /** + * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same + * source as derived from {@link #transform} due to splitting. + */ + private final UnboundedSource source; private CheckpointMark checkpointMark; public UnboundedReadEvaluator( AppliedPTransform, Unbounded> transform, InProcessEvaluationContext evaluationContext, + UnboundedSource source, Queue> evaluatorQueue) { this.transform = transform; this.evaluationContext = evaluationContext; this.evaluatorQueue = evaluatorQueue; + this.source = source; this.checkpointMark = null; } @@ -135,8 +144,7 @@ public void processElement(WindowedValue element) {} public InProcessTransformResult finishBundle() throws IOException { UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); try (UnboundedReader reader = - createReader( - transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) { + createReader(source, evaluationContext.getPipelineOptions());) { int numElements = 0; if (reader.start()) { do {