diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11c6..6def1dfa488a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ public class ExecutionConfig implements Serializable, Archiveable { - try { - final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed(), - LocationPreferenceConstraint.ANY, // there must be at least one known location - Collections.emptySet()); - } catch (Throwable t) { - consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + + // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise postpone the scheduling + if (consumerVertex.checkInputDependencyConstraints()) { + // When deploying a consuming task, its task deployment descriptor will contain all + // deployment information available at the respective time. It is possible that some + // of the partitions to be consumed have not been created yet. These are updated + // runtime via the update messages. + // + // TODO The current approach may send many update messages even though the consuming + // task has already been deployed with all necessary information. We have to check + // whether this is a problem and fix it, if it is. + CompletableFuture.supplyAsync( + () -> { + try { + final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); + consumerVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, // there must be at least one known location + Collections.emptySet()); + } catch (Throwable t) { + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); - } + } - return null; - }, - executor); + return null; + }, + executor); + } // double check to resolve race conditions if (consumerVertex.getExecutionState() == RUNNING) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 56315e07146d5..e89409f032c5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -255,6 +256,9 @@ public class ExecutionGraph implements AccessExecutionGraph { * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- private final AtomicInteger verticesFinished; @@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() { return scheduleMode; } + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + public Time getAllocationTimeout() { return allocationTimeout; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index f1a861d2ca141..c0d877dcb2832 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph( executionGraph.setScheduleMode(jobGraph.getScheduleMode()); executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); + try { + executionGraph.setInputDependencyConstraint( + jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint()); + } catch (IOException | ClassNotFoundException e) { + throw new JobException("Fail to deserialize execution config.", e); + } + try { executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a0747296c5399..22e02e2b13d9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; @@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition + partition.markDataProduced(); execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { @@ -726,6 +728,56 @@ List finishAllBlockingPartitions() { } } + /** + * Check whether the InputDependencyConstraint is satisfied for this vertex. + * + * @return whether the input constraint is satisfied + */ + public boolean checkInputDependencyConstraints() { + if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) { + // InputDependencyConstraint == ANY + for (int i = 0; i < inputEdges.length; i++) { + if (isInputConsumable(i)) { + return true; + } + } + return false; + } else { + // InputDependencyConstraint == ALL + for (int i = 0; i < inputEdges.length; i++) { + if (!isInputConsumable(i)) { + return false; + } + } + return true; + } + } + + /** + * An input is consumable when + * 1. the source result is PIPELINED and one of the result partition has produced data. + * 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED). + * + * @return whether the input is consumable + */ + public boolean isInputConsumable(int inputNumber) { + IntermediateResult result = jobVertex.getInputs().get(inputNumber); + + if (result.getResultType().isPipelined()) { + // For PIPELINED result, the input is consumable if any result partition has produced records or is finished + for (ExecutionEdge edge : inputEdges[inputNumber]) { + if (edge.getSource().hasDataProduced()) { + return true; + } + } + } else { + // For BLOCKING result, the input is consumable if all the partitions in the result are finished + return result.areAllPartitionsFinished(); + } + + return false; + } + // -------------------------------------------------------------------------------------------- // Notifications from the Execution Attempt // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 313272cf86bfb..6e1d9ba69fbd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -156,19 +156,17 @@ public int getConnectionIndex() { void resetForNewExecution() { this.numberOfRunningProducers.set(numParallelProducers); + for (IntermediateResultPartition partition : partitions) { + partition.resetForNewExecution(); + } } int decrementNumberOfRunningProducersAndGetRemaining() { return numberOfRunningProducers.decrementAndGet(); } - boolean isConsumable() { - if (resultType.isPipelined()) { - return true; - } - else { - return numberOfRunningProducers.get() == 0; - } + boolean areAllPartitionsFinished() { + return numberOfRunningProducers.get() == 0; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 124ceb2b6bcd0..56fb137eb5949 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -36,6 +36,11 @@ public class IntermediateResultPartition { private List> consumers; + /** + * Whether this partition has data produced. For pipelined result only. + */ + private boolean dataProduced = false; + public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) { this.totalResult = totalResult; this.producer = producer; @@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() { return partitionId; } - ResultPartitionType getResultType() { + public ResultPartitionType getResultType() { return totalResult.getResultType(); } @@ -68,8 +73,24 @@ public List> getConsumers() { return consumers; } + public void markDataProduced() { + dataProduced = true; + } + + public boolean hasDataProduced() { + return dataProduced; + } + public boolean isConsumable() { - return totalResult.isConsumable(); + if (getResultType().isPipelined()) { + return dataProduced; + } else { + return totalResult.areAllPartitionsFinished(); + } + } + + void resetForNewExecution() { + dataProduced = false; } int addConsumerGroup() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java index 6aa36b70ab889..4f7417f27fd1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -192,6 +193,7 @@ private static ExecutionVertex mockExecutionVertex(ExecutionState state, Resourc private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED); when(partition.isConsumable()).thenReturn(true); IntermediateResult result = mock(IntermediateResult.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java new file mode 100644 index 0000000000000..8d131a0cb304b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the inputs constraint for {@link ExecutionVertex}. + */ +public class ExecutionVertexInputConstraintTest extends TestLogger { + + @Test + public void testInputConsumable() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs not consumable on init + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + + // One pipelined input consumable on data produced + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.isInputConsumable(0)); + + // The blocking input not consumable if only one partition is FINISHED + ev21.getCurrentExecutionAttempt().markFinished(); + assertFalse(ev31.isInputConsumable(1)); + + // The blocking input consumable if all partitions are FINISHED + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs not consumable after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintANY() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + eg.setInputDependencyConstraint(InputDependencyConstraint.ANY); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable satisfies the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.checkInputDependencyConstraints()); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L); + waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L); + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintALL() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + eg.setInputDependencyConstraint(InputDependencyConstraint.ALL); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable does not satisfy the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + } + + private static ExecutionGraph createExecutionGraph(List ordered) throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + + ExecutionGraph eg = new ExecutionGraph( + new DummyJobInformation( + jobId, + jobName), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + new FixedDelayRestartStrategy(1, 0), + new RestartAllStrategy.Factory(), + slotProvider); + + eg.attachJobGraph(ordered); + + return eg; + } + + private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception { + waitForAllExecutionsPredicate(eg, + isInExecutionState(ExecutionState.CANCELING) + .or(isInExecutionState(ExecutionState.CANCELED)) + .or(isInExecutionState(ExecutionState.FAILED)) + .or(isInExecutionState(ExecutionState.FINISHED)), + 2000L); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + if (ev.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) { + ev.getCurrentExecutionAttempt().cancelingComplete(); + } + } + waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java new file mode 100644 index 0000000000000..2cd00617d9021 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link IntermediateResultPartition}. + */ +public class IntermediateResultPartitionTest extends TestLogger { + + @Test + public void testPipelinedPartitionConsumable() throws Exception { + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.PIPELINED); + ExecutionVertex vertex1 = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + ExecutionVertex vertex2 = + new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Partition 1 consumable after data are produced + partition1.markDataProduced(); + assertTrue(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } + + @Test + public void testBlockingPartitionConsumable() throws Exception { + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.BLOCKING); + ExecutionVertex vertex1 = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + ExecutionVertex vertex2 = + new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if only one partition is FINISHED + partition1.markFinished(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Consumable after all partitions are FINISHED + partition2.markFinished(); + assertTrue(partition1.isConsumable()); + assertTrue(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } +}