diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11c6..fa842d4a960c5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ public class ExecutionConfig implements Serializable, Archiveable { + try { + final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); + consumerVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, // there must be at least one known location + Collections.emptySet()); + } catch (Throwable t) { + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + + "vertex " + consumerVertex, t)); + } + + return null; + }, + executor); + } + void scheduleOrUpdateConsumers(List> allConsumers) { final int numConsumers = allConsumers.size(); @@ -755,23 +776,16 @@ else if (numConsumers == 0) { // TODO The current approach may send many update messages even though the consuming // task has already been deployed with all necessary information. We have to check // whether this is a problem and fix it, if it is. - CompletableFuture.supplyAsync( - () -> { - try { - final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed(), - LocationPreferenceConstraint.ANY, // there must be at least one known location - Collections.emptySet()); - } catch (Throwable t) { - consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + - "vertex " + consumerVertex, t)); - } - return null; - }, - executor); + // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling. + // A shortcut of input constraint check is added for InputDependencyConstraint.ANY since + // at least one of the consumer vertex's inputs is consumable here. This is to avoid the + // O(N) complexity introduced by input constraint check for InputDependencyConstraint.ANY, + // as we do not want the default scheduling performance to be affected. + if (consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY || + consumerVertex.checkInputDependencyConstraints()) { + scheduleConsumer(consumerVertex); + } // double check to resolve race conditions if (consumerVertex.getExecutionState() == RUNNING) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index ee73505097675..94ee45d65927d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -366,6 +367,10 @@ public List getInputs() { return inputs; } + public InputDependencyConstraint getInputDependencyConstraint() { + return getJobVertex().getInputDependencyConstraint(); + } + public Either, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException { // only one thread should offload the task information, so let's also let only one thread // serialize the task information! diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a0747296c5399..1217217333504 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; @@ -61,6 +62,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -70,6 +72,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; @@ -340,6 +343,10 @@ public Map getProduc return resultPartitions; } + public InputDependencyConstraint getInputDependencyConstraint() { + return getJobVertex().getInputDependencyConstraint(); + } + // -------------------------------------------------------------------------------------------- // Graph building // -------------------------------------------------------------------------------------------- @@ -684,6 +691,8 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { throw new IllegalStateException("Unknown partition " + partitionId + "."); } + partition.markDataProduced(); + if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition execution.scheduleOrUpdateConsumers(partition.getConsumers()); @@ -726,6 +735,34 @@ List finishAllBlockingPartitions() { } } + /** + * Check whether the InputDependencyConstraint is satisfied for this vertex. + * + * @return whether the input constraint is satisfied + */ + boolean checkInputDependencyConstraints() { + if (getInputDependencyConstraint() == InputDependencyConstraint.ANY) { + // InputDependencyConstraint == ANY + return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable); + } else { + // InputDependencyConstraint == ALL + return IntStream.range(0, inputEdges.length).allMatch(this::isInputConsumable); + } + } + + /** + * Get whether an input of the vertex is consumable. + * An input is consumable when when any partition in it is consumable. + * + * Note that a BLOCKING result partition is only consumable when all partitions in the result are FINISHED. + * + * @return whether the input is consumable + */ + boolean isInputConsumable(int inputNumber) { + return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch( + IntermediateResultPartition::isConsumable); + } + // -------------------------------------------------------------------------------------------- // Notifications from the Execution Attempt // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 313272cf86bfb..6e1d9ba69fbd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -156,19 +156,17 @@ public int getConnectionIndex() { void resetForNewExecution() { this.numberOfRunningProducers.set(numParallelProducers); + for (IntermediateResultPartition partition : partitions) { + partition.resetForNewExecution(); + } } int decrementNumberOfRunningProducersAndGetRemaining() { return numberOfRunningProducers.decrementAndGet(); } - boolean isConsumable() { - if (resultType.isPipelined()) { - return true; - } - else { - return numberOfRunningProducers.get() == 0; - } + boolean areAllPartitionsFinished() { + return numberOfRunningProducers.get() == 0; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 124ceb2b6bcd0..d4e85cf11c10f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -36,6 +36,11 @@ public class IntermediateResultPartition { private List> consumers; + /** + * Whether this partition has produced some data. + */ + private boolean hasDataProduced = false; + public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) { this.totalResult = totalResult; this.producer = producer; @@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() { return partitionId; } - ResultPartitionType getResultType() { + public ResultPartitionType getResultType() { return totalResult.getResultType(); } @@ -68,8 +73,20 @@ public List> getConsumers() { return consumers; } + public void markDataProduced() { + hasDataProduced = true; + } + public boolean isConsumable() { - return totalResult.isConsumable(); + if (getResultType().isPipelined()) { + return hasDataProduced; + } else { + return totalResult.areAllPartitionsFinished(); + } + } + + void resetForNewExecution() { + hasDataProduced = false; } int addConsumerGroup() { @@ -94,6 +111,8 @@ boolean markFinished() { throw new IllegalStateException("Tried to mark a non-blocking result partition as finished"); } + hasDataProduced = true; + final int refCnt = totalResult.decrementNumberOfRunningProducersAndGetRemaining(); if (refCnt == 0) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 1fe95ebf01054..c78d707029b5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitSource; @@ -112,6 +113,9 @@ public class JobVertex implements java.io.Serializable { * to be included in the JSON plan */ private String resultOptimizerProperties; + /** The input dependency constraint to schedule this vertex. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // -------------------------------------------------------------------------------------------- /** @@ -557,6 +561,14 @@ public void setResultOptimizerProperties(String resultOptimizerProperties) { this.resultOptimizerProperties = resultOptimizerProperties; } + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java index 6aa36b70ab889..4f7417f27fd1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -192,6 +193,7 @@ private static ExecutionVertex mockExecutionVertex(ExecutionState state, Resourc private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED); when(partition.isConsumable()).thenReturn(true); IntermediateResult result = mock(IntermediateResult.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java new file mode 100644 index 0000000000000..3c7354adc2f8b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the inputs constraint for {@link ExecutionVertex}. + */ +public class ExecutionVertexInputConstraintTest extends TestLogger { + + @Test + public void testInputConsumable() throws Exception { + List vertices = createOrderedVertices(); + ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ALL); + ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs not consumable on init + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + + // One pipelined input consumable on data produced + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.isInputConsumable(0)); + // Input0 of ev32 is not consumable. It consumes the same PIPELINED result with ev31 but not the same partition + assertFalse(ev32.isInputConsumable(0)); + + // The blocking input not consumable if only one partition is FINISHED + ev21.getCurrentExecutionAttempt().markFinished(); + assertFalse(ev31.isInputConsumable(1)); + + // The blocking input consumable if all partitions are FINISHED + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs not consumable after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintANY() throws Exception { + List vertices = createOrderedVertices(); + ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ANY); + ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable satisfies the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.checkInputDependencyConstraints()); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L); + waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L); + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.checkInputDependencyConstraints()); + } + + @Test + public void testInputConstraintALL() throws Exception { + List vertices = createOrderedVertices(); + ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ALL); + ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable does not satisfy the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.checkInputDependencyConstraints()); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + } + + private static List createOrderedVertices() { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + return Arrays.asList(v1, v2, v3); + } + + private static ExecutionGraph createExecutionGraph( + List orderedVertices, + InputDependencyConstraint inputDependencyConstraint) throws Exception { + + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + + for (JobVertex vertex : orderedVertices) { + vertex.setInputDependencyConstraint(inputDependencyConstraint); + } + + ExecutionGraph eg = new ExecutionGraph( + new DummyJobInformation( + jobId, + jobName), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + new FixedDelayRestartStrategy(1, 0), + new RestartAllStrategy.Factory(), + slotProvider); + eg.attachJobGraph(orderedVertices); + + return eg; + } + + private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception { + waitForAllExecutionsPredicate(eg, + isInExecutionState(ExecutionState.CANCELING) + .or(isInExecutionState(ExecutionState.CANCELED)) + .or(isInExecutionState(ExecutionState.FAILED)) + .or(isInExecutionState(ExecutionState.FINISHED)), + 2000L); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + if (ev.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) { + ev.getCurrentExecutionAttempt().cancelingComplete(); + } + } + waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java new file mode 100644 index 0000000000000..e20c0bd7efac2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link IntermediateResultPartition}. + */ +public class IntermediateResultPartitionTest extends TestLogger { + + @Test + public void testPipelinedPartitionConsumable() throws Exception { + IntermediateResult result = createResult(ResultPartitionType.PIPELINED, 2); + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Partition 1 consumable after data are produced + partition1.markDataProduced(); + assertTrue(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } + + @Test + public void testBlockingPartitionConsumable() throws Exception { + IntermediateResult result = createResult(ResultPartitionType.BLOCKING, 2); + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if only one partition is FINISHED + partition1.markFinished(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Consumable after all partitions are FINISHED + partition2.markFinished(); + assertTrue(partition1.isConsumable()); + assertTrue(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } + + private static IntermediateResult createResult( + ResultPartitionType resultPartitionType, + int producerCount) throws Exception { + + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, producerCount, resultPartitionType); + for (int i = 0; i < producerCount; i++) { + // Generate result partition in the result + new ExecutionVertex(jobVertex, i, new IntermediateResult[]{result}, Time.minutes(1)); + } + + return result; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 69213024975e5..bc0377b79320f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -404,6 +404,9 @@ private StreamConfig createJobVertex( LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId); } + // TODO: inherit InputDependencyConstraint from the head operator + jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint()); + jobVertices.put(streamNodeId, jobVertex); builtVertices.add(streamNodeId); jobGraph.addVertex(jobVertex);