From 0cf6a382fa9b42b7ba15889205fab67bf97d5f73 Mon Sep 17 00:00:00 2001 From: "zhuzhu.zz" Date: Thu, 6 Dec 2018 19:37:05 +0800 Subject: [PATCH 1/6] [FLINK-10945] Add an InputDependencyConstraint to avoid resource deadlocks for finite stream jobs when resources are limited --- .../flink/api/common/ExecutionConfig.java | 27 ++ .../api/common/InputDependencyConstraint.java | 35 +++ .../InputChannelDeploymentDescriptor.java | 6 +- .../runtime/executiongraph/Execution.java | 49 ++-- .../executiongraph/ExecutionGraph.java | 12 + .../executiongraph/ExecutionGraphBuilder.java | 7 + .../executiongraph/ExecutionVertex.java | 52 ++++ .../executiongraph/IntermediateResult.java | 12 +- .../IntermediateResultPartition.java | 25 +- .../InputChannelDeploymentDescriptorTest.java | 2 + .../ExecutionVertexInputConstraintTest.java | 235 ++++++++++++++++++ .../IntermediateResultPartitionTest.java | 98 ++++++++ 12 files changed, 526 insertions(+), 34 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11c6..6def1dfa488a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ public class ExecutionConfig implements Serializable, Archiveable { - try { - final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed(), - LocationPreferenceConstraint.ANY, // there must be at least one known location - Collections.emptySet()); - } catch (Throwable t) { - consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + + // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise postpone the scheduling + if (consumerVertex.checkInputDependencyConstraints()) { + // When deploying a consuming task, its task deployment descriptor will contain all + // deployment information available at the respective time. It is possible that some + // of the partitions to be consumed have not been created yet. These are updated + // runtime via the update messages. + // + // TODO The current approach may send many update messages even though the consuming + // task has already been deployed with all necessary information. We have to check + // whether this is a problem and fix it, if it is. + CompletableFuture.supplyAsync( + () -> { + try { + final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); + consumerVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, // there must be at least one known location + Collections.emptySet()); + } catch (Throwable t) { + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); - } + } - return null; - }, - executor); + return null; + }, + executor); + } // double check to resolve race conditions if (consumerVertex.getExecutionState() == RUNNING) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 56315e07146d5..e89409f032c5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -255,6 +256,9 @@ public class ExecutionGraph implements AccessExecutionGraph { * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- private final AtomicInteger verticesFinished; @@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() { return scheduleMode; } + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + public Time getAllocationTimeout() { return allocationTimeout; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index f1a861d2ca141..c0d877dcb2832 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph( executionGraph.setScheduleMode(jobGraph.getScheduleMode()); executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); + try { + executionGraph.setInputDependencyConstraint( + jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint()); + } catch (IOException | ClassNotFoundException e) { + throw new JobException("Fail to deserialize execution config.", e); + } + try { executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a0747296c5399..22e02e2b13d9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; @@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition + partition.markDataProduced(); execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { @@ -726,6 +728,56 @@ List finishAllBlockingPartitions() { } } + /** + * Check whether the InputDependencyConstraint is satisfied for this vertex. + * + * @return whether the input constraint is satisfied + */ + public boolean checkInputDependencyConstraints() { + if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) { + // InputDependencyConstraint == ANY + for (int i = 0; i < inputEdges.length; i++) { + if (isInputConsumable(i)) { + return true; + } + } + return false; + } else { + // InputDependencyConstraint == ALL + for (int i = 0; i < inputEdges.length; i++) { + if (!isInputConsumable(i)) { + return false; + } + } + return true; + } + } + + /** + * An input is consumable when + * 1. the source result is PIPELINED and one of the result partition has produced data. + * 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED). + * + * @return whether the input is consumable + */ + public boolean isInputConsumable(int inputNumber) { + IntermediateResult result = jobVertex.getInputs().get(inputNumber); + + if (result.getResultType().isPipelined()) { + // For PIPELINED result, the input is consumable if any result partition has produced records or is finished + for (ExecutionEdge edge : inputEdges[inputNumber]) { + if (edge.getSource().hasDataProduced()) { + return true; + } + } + } else { + // For BLOCKING result, the input is consumable if all the partitions in the result are finished + return result.areAllPartitionsFinished(); + } + + return false; + } + // -------------------------------------------------------------------------------------------- // Notifications from the Execution Attempt // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 313272cf86bfb..6e1d9ba69fbd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -156,19 +156,17 @@ public int getConnectionIndex() { void resetForNewExecution() { this.numberOfRunningProducers.set(numParallelProducers); + for (IntermediateResultPartition partition : partitions) { + partition.resetForNewExecution(); + } } int decrementNumberOfRunningProducersAndGetRemaining() { return numberOfRunningProducers.decrementAndGet(); } - boolean isConsumable() { - if (resultType.isPipelined()) { - return true; - } - else { - return numberOfRunningProducers.get() == 0; - } + boolean areAllPartitionsFinished() { + return numberOfRunningProducers.get() == 0; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 124ceb2b6bcd0..56fb137eb5949 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -36,6 +36,11 @@ public class IntermediateResultPartition { private List> consumers; + /** + * Whether this partition has data produced. For pipelined result only. + */ + private boolean dataProduced = false; + public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) { this.totalResult = totalResult; this.producer = producer; @@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() { return partitionId; } - ResultPartitionType getResultType() { + public ResultPartitionType getResultType() { return totalResult.getResultType(); } @@ -68,8 +73,24 @@ public List> getConsumers() { return consumers; } + public void markDataProduced() { + dataProduced = true; + } + + public boolean hasDataProduced() { + return dataProduced; + } + public boolean isConsumable() { - return totalResult.isConsumable(); + if (getResultType().isPipelined()) { + return dataProduced; + } else { + return totalResult.areAllPartitionsFinished(); + } + } + + void resetForNewExecution() { + dataProduced = false; } int addConsumerGroup() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java index 6aa36b70ab889..4f7417f27fd1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -192,6 +193,7 @@ private static ExecutionVertex mockExecutionVertex(ExecutionState state, Resourc private static IntermediateResultPartition mockPartition(ExecutionVertex producer) { IntermediateResultPartition partition = mock(IntermediateResultPartition.class); + when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED); when(partition.isConsumable()).thenReturn(true); IntermediateResult result = mock(IntermediateResult.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java new file mode 100644 index 0000000000000..8d131a0cb304b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the inputs constraint for {@link ExecutionVertex}. + */ +public class ExecutionVertexInputConstraintTest extends TestLogger { + + @Test + public void testInputConsumable() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs not consumable on init + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + + // One pipelined input consumable on data produced + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.isInputConsumable(0)); + + // The blocking input not consumable if only one partition is FINISHED + ev21.getCurrentExecutionAttempt().markFinished(); + assertFalse(ev31.isInputConsumable(1)); + + // The blocking input consumable if all partitions are FINISHED + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs not consumable after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.isInputConsumable(0)); + assertFalse(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintANY() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + eg.setInputDependencyConstraint(InputDependencyConstraint.ANY); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable satisfies the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertTrue(ev31.checkInputDependencyConstraints()); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L); + waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L); + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + } + + @Test + public void testInputConstraintALL() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + eg.setInputDependencyConstraint(InputDependencyConstraint.ALL); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // Inputs constraint not satisfied on init + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input1 consumable does not satisfy the constraint + IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next(); + ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), + ev11.getCurrentExecutionAttempt().getAttemptId())); + assertFalse(ev31.checkInputDependencyConstraints()); + + // Input2 consumable satisfies the constraint + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.getCurrentExecutionAttempt().markFinished(); + assertTrue(ev31.isInputConsumable(1)); + + // Inputs constraint not satisfied after failover + ev11.fail(new Exception()); + waitUntilJobRestarted(eg); + assertFalse(ev31.checkInputDependencyConstraints()); + } + + private static ExecutionGraph createExecutionGraph(List ordered) throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + + ExecutionGraph eg = new ExecutionGraph( + new DummyJobInformation( + jobId, + jobName), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + new FixedDelayRestartStrategy(1, 0), + new RestartAllStrategy.Factory(), + slotProvider); + + eg.attachJobGraph(ordered); + + return eg; + } + + private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception { + waitForAllExecutionsPredicate(eg, + isInExecutionState(ExecutionState.CANCELING) + .or(isInExecutionState(ExecutionState.CANCELED)) + .or(isInExecutionState(ExecutionState.FAILED)) + .or(isInExecutionState(ExecutionState.FINISHED)), + 2000L); + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + if (ev.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) { + ev.getCurrentExecutionAttempt().cancelingComplete(); + } + } + waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java new file mode 100644 index 0000000000000..2cd00617d9021 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link IntermediateResultPartition}. + */ +public class IntermediateResultPartitionTest extends TestLogger { + + @Test + public void testPipelinedPartitionConsumable() throws Exception { + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.PIPELINED); + ExecutionVertex vertex1 = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + ExecutionVertex vertex2 = + new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Partition 1 consumable after data are produced + partition1.markDataProduced(); + assertTrue(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } + + @Test + public void testBlockingPartitionConsumable() throws Exception { + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.BLOCKING); + ExecutionVertex vertex1 = + new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); + ExecutionVertex vertex2 = + new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); + + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if only one partition is FINISHED + partition1.markFinished(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Consumable after all partitions are FINISHED + partition2.markFinished(); + assertTrue(partition1.isConsumable()); + assertTrue(partition2.isConsumable()); + + // Not consumable if failover happens + result.resetForNewExecution(); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + } +} From a6371b9b91e0c60655761e6949ee962d00017c85 Mon Sep 17 00:00:00 2001 From: "zhuzhu.zz" Date: Tue, 11 Dec 2018 23:12:28 +0800 Subject: [PATCH 2/6] Minor changes for comments --- .../runtime/executiongraph/Execution.java | 56 ++++++++++--------- .../executiongraph/ExecutionVertex.java | 27 +++------ .../IntermediateResultPartition.java | 16 +++--- .../ExecutionVertexInputConstraintTest.java | 2 + .../IntermediateResultPartitionTest.java | 2 +- 5 files changed, 48 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 59f76502ed683..9e1112d6e5ba5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -718,6 +718,26 @@ else if (current == CREATED || current == SCHEDULED) { } } + private void scheduleConsumer(ExecutionVertex consumerVertex) { + CompletableFuture.supplyAsync( + () -> { + try { + final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); + consumerVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, // there must be at least one known location + Collections.emptySet()); + } catch (Throwable t) { + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + + "vertex " + consumerVertex, t)); + } + + return null; + }, + executor); + } + void scheduleOrUpdateConsumers(List> allConsumers) { final int numConsumers = allConsumers.size(); @@ -747,33 +767,17 @@ else if (numConsumers == 0) { consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge( partition, partitionExecution)); - // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise postpone the scheduling + // When deploying a consuming task, its task deployment descriptor will contain all + // deployment information available at the respective time. It is possible that some + // of the partitions to be consumed have not been created yet. These are updated + // runtime via the update messages. + // + // TODO The current approach may send many update messages even though the consuming + // task has already been deployed with all necessary information. We have to check + // whether this is a problem and fix it, if it is. if (consumerVertex.checkInputDependencyConstraints()) { - // When deploying a consuming task, its task deployment descriptor will contain all - // deployment information available at the respective time. It is possible that some - // of the partitions to be consumed have not been created yet. These are updated - // runtime via the update messages. - // - // TODO The current approach may send many update messages even though the consuming - // task has already been deployed with all necessary information. We have to check - // whether this is a problem and fix it, if it is. - CompletableFuture.supplyAsync( - () -> { - try { - final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed(), - LocationPreferenceConstraint.ANY, // there must be at least one known location - Collections.emptySet()); - } catch (Throwable t) { - consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + - "vertex " + consumerVertex, t)); - } - - return null; - }, - executor); + // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling + scheduleConsumer(consumerVertex); } // double check to resolve race conditions diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 22e02e2b13d9f..c98f9d3e69d93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -71,6 +72,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; @@ -687,7 +689,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition - partition.markDataProduced(); + partition.markSomePipelinedDataProduced(); execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { @@ -736,20 +738,10 @@ List finishAllBlockingPartitions() { public boolean checkInputDependencyConstraints() { if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) { // InputDependencyConstraint == ANY - for (int i = 0; i < inputEdges.length; i++) { - if (isInputConsumable(i)) { - return true; - } - } - return false; + return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable); } else { // InputDependencyConstraint == ALL - for (int i = 0; i < inputEdges.length; i++) { - if (!isInputConsumable(i)) { - return false; - } - } - return true; + return IntStream.range(0, inputEdges.length).allMatch(this::isInputConsumable); } } @@ -765,17 +757,12 @@ public boolean isInputConsumable(int inputNumber) { if (result.getResultType().isPipelined()) { // For PIPELINED result, the input is consumable if any result partition has produced records or is finished - for (ExecutionEdge edge : inputEdges[inputNumber]) { - if (edge.getSource().hasDataProduced()) { - return true; - } - } + return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch( + IntermediateResultPartition::isSomePipelinedDataProduced); } else { // For BLOCKING result, the input is consumable if all the partitions in the result are finished return result.areAllPartitionsFinished(); } - - return false; } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 56fb137eb5949..c9d0223af3414 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -37,9 +37,9 @@ public class IntermediateResultPartition { private List> consumers; /** - * Whether this partition has data produced. For pipelined result only. + * Whether this partition has produced some data. For pipelined result only. */ - private boolean dataProduced = false; + private boolean isSomePipelinedDataProduced = false; public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) { this.totalResult = totalResult; @@ -73,24 +73,24 @@ public List> getConsumers() { return consumers; } - public void markDataProduced() { - dataProduced = true; + public void markSomePipelinedDataProduced() { + isSomePipelinedDataProduced = true; } - public boolean hasDataProduced() { - return dataProduced; + public boolean isSomePipelinedDataProduced() { + return isSomePipelinedDataProduced; } public boolean isConsumable() { if (getResultType().isPipelined()) { - return dataProduced; + return isSomePipelinedDataProduced; } else { return totalResult.areAllPartitionsFinished(); } } void resetForNewExecution() { - dataProduced = false; + isSomePipelinedDataProduced = false; } int addConsumerGroup() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java index 8d131a0cb304b..411fe55281159 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -85,6 +85,8 @@ public void testInputConsumable() throws Exception { ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), ev11.getCurrentExecutionAttempt().getAttemptId())); assertTrue(ev31.isInputConsumable(0)); + // Input0 of ev32 is not consumable. It consumes the same PIPELINED result with ev31 but not the same partition + assertFalse(ev32.isInputConsumable(0)); // The blocking input not consumable if only one partition is FINISHED ev21.getCurrentExecutionAttempt().markFinished(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index 2cd00617d9021..f40dc12231d41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -53,7 +53,7 @@ public void testPipelinedPartitionConsumable() throws Exception { assertFalse(partition2.isConsumable()); // Partition 1 consumable after data are produced - partition1.markDataProduced(); + partition1.markSomePipelinedDataProduced(); assertTrue(partition1.isConsumable()); assertFalse(partition2.isConsumable()); From 374735c826c56c25999374e04afbe6f759efc265 Mon Sep 17 00:00:00 2001 From: "zhuzhu.zz" Date: Fri, 14 Dec 2018 14:40:20 +0800 Subject: [PATCH 3/6] 1. Add a shortcut for scheduling input constraint check when InputDependencyConstraint == ANY 2. Fixes for tests --- .../runtime/executiongraph/Execution.java | 4 +- .../ExecutionVertexInputConstraintTest.java | 110 +++++++----------- .../IntermediateResultPartitionTest.java | 33 +++--- 3 files changed, 62 insertions(+), 85 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 9e1112d6e5ba5..ede9674c0ef44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; @@ -775,7 +776,8 @@ else if (numConsumers == 0) { // TODO The current approach may send many update messages even though the consuming // task has already been deployed with all necessary information. We have to check // whether this is a problem and fix it, if it is. - if (consumerVertex.checkInputDependencyConstraints()) { + if (consumerVertex.getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY || + consumerVertex.checkInputDependencyConstraints()) { // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling scheduleConsumer(consumerVertex); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java index 411fe55281159..92ad710521171 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -53,26 +53,13 @@ public class ExecutionVertexInputConstraintTest extends TestLogger { @Test public void testInputConsumable() throws Exception { - JobVertex v1 = new JobVertex("vertex1"); - JobVertex v2 = new JobVertex("vertex2"); - JobVertex v3 = new JobVertex("vertex3"); - v1.setParallelism(2); - v2.setParallelism(2); - v3.setParallelism(2); - v1.setInvokableClass(AbstractInvokable.class); - v2.setInvokableClass(AbstractInvokable.class); - v3.setInvokableClass(AbstractInvokable.class); - v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - List ordered = Arrays.asList(v1, v2, v3); - ExecutionGraph eg = createExecutionGraph(ordered); - - ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; - ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; - ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; - ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; - ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; - ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + List vertices = createOrderedVertices(); + ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ALL); + ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[1]; eg.scheduleForExecution(); @@ -105,27 +92,12 @@ public void testInputConsumable() throws Exception { @Test public void testInputConstraintANY() throws Exception { - JobVertex v1 = new JobVertex("vertex1"); - JobVertex v2 = new JobVertex("vertex2"); - JobVertex v3 = new JobVertex("vertex3"); - v1.setParallelism(2); - v2.setParallelism(2); - v3.setParallelism(2); - v1.setInvokableClass(AbstractInvokable.class); - v2.setInvokableClass(AbstractInvokable.class); - v3.setInvokableClass(AbstractInvokable.class); - v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - List ordered = Arrays.asList(v1, v2, v3); - ExecutionGraph eg = createExecutionGraph(ordered); - eg.setInputDependencyConstraint(InputDependencyConstraint.ANY); - - ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; - ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; - ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; - ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; - ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; - ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + List vertices = createOrderedVertices(); + ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ANY); + ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0]; eg.scheduleForExecution(); @@ -148,32 +120,17 @@ public void testInputConstraintANY() throws Exception { waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L); ev21.getCurrentExecutionAttempt().markFinished(); ev22.getCurrentExecutionAttempt().markFinished(); - assertTrue(ev31.isInputConsumable(1)); + assertTrue(ev31.checkInputDependencyConstraints()); } @Test public void testInputConstraintALL() throws Exception { - JobVertex v1 = new JobVertex("vertex1"); - JobVertex v2 = new JobVertex("vertex2"); - JobVertex v3 = new JobVertex("vertex3"); - v1.setParallelism(2); - v2.setParallelism(2); - v3.setParallelism(2); - v1.setInvokableClass(AbstractInvokable.class); - v2.setInvokableClass(AbstractInvokable.class); - v3.setInvokableClass(AbstractInvokable.class); - v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - List ordered = Arrays.asList(v1, v2, v3); - ExecutionGraph eg = createExecutionGraph(ordered); - eg.setInputDependencyConstraint(InputDependencyConstraint.ALL); - - ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; - ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; - ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; - ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; - ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; - ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + List vertices = createOrderedVertices(); + ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ALL); + ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0]; eg.scheduleForExecution(); @@ -189,7 +146,7 @@ public void testInputConstraintALL() throws Exception { // Input2 consumable satisfies the constraint ev21.getCurrentExecutionAttempt().markFinished(); ev22.getCurrentExecutionAttempt().markFinished(); - assertTrue(ev31.isInputConsumable(1)); + assertTrue(ev31.checkInputDependencyConstraints()); // Inputs constraint not satisfied after failover ev11.fail(new Exception()); @@ -197,10 +154,27 @@ public void testInputConstraintALL() throws Exception { assertFalse(ev31.checkInputDependencyConstraints()); } - private static ExecutionGraph createExecutionGraph(List ordered) throws Exception { + private static List createOrderedVertices() { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + return Arrays.asList(v1, v2, v3); + } + + private static ExecutionGraph createExecutionGraph( + List orderedVertices, + InputDependencyConstraint inputDependencyConstraint) throws Exception { + final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; - final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); ExecutionGraph eg = new ExecutionGraph( @@ -213,8 +187,8 @@ private static ExecutionGraph createExecutionGraph(List ordered) thro new FixedDelayRestartStrategy(1, 0), new RestartAllStrategy.Factory(), slotProvider); - - eg.attachJobGraph(ordered); + eg.attachJobGraph(orderedVertices); + eg.setInputDependencyConstraint(inputDependencyConstraint); return eg; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index f40dc12231d41..5c94101f49041 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -37,14 +37,7 @@ public class IntermediateResultPartitionTest extends TestLogger { @Test public void testPipelinedPartitionConsumable() throws Exception { - ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); - IntermediateResult result = - new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.PIPELINED); - ExecutionVertex vertex1 = - new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); - ExecutionVertex vertex2 = - new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); - + IntermediateResult result = createResult(ResultPartitionType.PIPELINED, 2); IntermediateResultPartition partition1 = result.getPartitions()[0]; IntermediateResultPartition partition2 = result.getPartitions()[1]; @@ -65,14 +58,7 @@ public void testPipelinedPartitionConsumable() throws Exception { @Test public void testBlockingPartitionConsumable() throws Exception { - ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); - IntermediateResult result = - new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.BLOCKING); - ExecutionVertex vertex1 = - new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1)); - ExecutionVertex vertex2 = - new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1)); - + IntermediateResult result = createResult(ResultPartitionType.BLOCKING, 2); IntermediateResultPartition partition1 = result.getPartitions()[0]; IntermediateResultPartition partition2 = result.getPartitions()[1]; @@ -95,4 +81,19 @@ public void testBlockingPartitionConsumable() throws Exception { assertFalse(partition1.isConsumable()); assertFalse(partition2.isConsumable()); } + + private static IntermediateResult createResult( + ResultPartitionType resultPartitionType, + int producerCount) throws Exception { + + ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()); + IntermediateResult result = + new IntermediateResult(new IntermediateDataSetID(), jobVertex, producerCount, resultPartitionType); + for (int i = 0; i < producerCount; i++) { + // Generate result partition in the result + new ExecutionVertex(jobVertex, i, new IntermediateResult[]{result}, Time.minutes(1)); + } + + return result; + } } From c7a33ee1fbb9332c001ebf559ee537a77337f9a9 Mon Sep 17 00:00:00 2001 From: "zhuzhu.zz" Date: Mon, 17 Dec 2018 11:24:17 +0800 Subject: [PATCH 4/6] Refine comments --- .../org/apache/flink/runtime/executiongraph/Execution.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index ede9674c0ef44..89103d5aa9dfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -776,9 +776,14 @@ else if (numConsumers == 0) { // TODO The current approach may send many update messages even though the consuming // task has already been deployed with all necessary information. We have to check // whether this is a problem and fix it, if it is. + + // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling. + // A shortcut of input constraint check is added for InputDependencyConstraint.ANY since + // at least one of the consumer vertex's inputs is consumable here. This is to avoid the + // O(N) complexity introduced by input constraint check for InputDependencyConstraint.ANY, + // as we do not want the default scheduling performance to be affected. if (consumerVertex.getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY || consumerVertex.checkInputDependencyConstraints()) { - // Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling scheduleConsumer(consumerVertex); } From 2bd5173b7e08725fff3fb2f698451021cfb1c946 Mon Sep 17 00:00:00 2001 From: "zhuzhu.zz" Date: Wed, 9 Jan 2019 23:17:51 +0800 Subject: [PATCH 5/6] 1. Move InputDependencyConstraint to JobVertex (but not make it configurable through API yet) 2. Refine input consumable checks --- .../flink/api/common/ExecutionConfig.java | 22 ++++++++------- .../plantranslate/JobGraphGenerator.java | 1 + .../runtime/executiongraph/Execution.java | 2 +- .../executiongraph/ExecutionGraph.java | 12 --------- .../executiongraph/ExecutionGraphBuilder.java | 7 ----- .../executiongraph/ExecutionJobVertex.java | 5 ++++ .../executiongraph/ExecutionVertex.java | 27 +++++++++---------- .../IntermediateResultPartition.java | 18 ++++++------- .../flink/runtime/jobgraph/JobVertex.java | 12 +++++++++ .../ExecutionVertexInputConstraintTest.java | 5 +++- .../IntermediateResultPartitionTest.java | 2 +- .../api/graph/StreamingJobGraphGenerator.java | 3 +++ 12 files changed, 59 insertions(+), 57 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 6def1dfa488a0..fa842d4a960c5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,8 +159,8 @@ public class ExecutionConfig implements Serializable, Archiveable getInputs() { return inputs; } + public InputDependencyConstraint getInputDependencyConstraint() { + return getJobVertex().getInputDependencyConstraint(); + } + public Either, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException { // only one thread should offload the task information, so let's also let only one thread // serialize the task information! diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index c98f9d3e69d93..11e84e7ed75f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -343,6 +343,10 @@ public Map getProduc return resultPartitions; } + public InputDependencyConstraint getInputDependencyConstraint() { + return getJobVertex().getInputDependencyConstraint(); + } + // -------------------------------------------------------------------------------------------- // Graph building // -------------------------------------------------------------------------------------------- @@ -689,7 +693,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition - partition.markSomePipelinedDataProduced(); + partition.markDataProduced(); execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { @@ -736,7 +740,7 @@ List finishAllBlockingPartitions() { * @return whether the input constraint is satisfied */ public boolean checkInputDependencyConstraints() { - if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) { + if (getInputDependencyConstraint() == InputDependencyConstraint.ANY) { // InputDependencyConstraint == ANY return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable); } else { @@ -746,23 +750,16 @@ public boolean checkInputDependencyConstraints() { } /** - * An input is consumable when - * 1. the source result is PIPELINED and one of the result partition has produced data. - * 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED). + * Get whether an input of the vertex is consumable. + * An input is consumable when when any partition in it is consumable. + * + * Note that a BLOCKING result partition is only consumable when all partitions in the result are FINISHED. * * @return whether the input is consumable */ public boolean isInputConsumable(int inputNumber) { - IntermediateResult result = jobVertex.getInputs().get(inputNumber); - - if (result.getResultType().isPipelined()) { - // For PIPELINED result, the input is consumable if any result partition has produced records or is finished - return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch( - IntermediateResultPartition::isSomePipelinedDataProduced); - } else { - // For BLOCKING result, the input is consumable if all the partitions in the result are finished - return result.areAllPartitionsFinished(); - } + return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch( + IntermediateResultPartition::isConsumable); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index c9d0223af3414..d4e85cf11c10f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -37,9 +37,9 @@ public class IntermediateResultPartition { private List> consumers; /** - * Whether this partition has produced some data. For pipelined result only. + * Whether this partition has produced some data. */ - private boolean isSomePipelinedDataProduced = false; + private boolean hasDataProduced = false; public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) { this.totalResult = totalResult; @@ -73,24 +73,20 @@ public List> getConsumers() { return consumers; } - public void markSomePipelinedDataProduced() { - isSomePipelinedDataProduced = true; - } - - public boolean isSomePipelinedDataProduced() { - return isSomePipelinedDataProduced; + public void markDataProduced() { + hasDataProduced = true; } public boolean isConsumable() { if (getResultType().isPipelined()) { - return isSomePipelinedDataProduced; + return hasDataProduced; } else { return totalResult.areAllPartitionsFinished(); } } void resetForNewExecution() { - isSomePipelinedDataProduced = false; + hasDataProduced = false; } int addConsumerGroup() { @@ -115,6 +111,8 @@ boolean markFinished() { throw new IllegalStateException("Tried to mark a non-blocking result partition as finished"); } + hasDataProduced = true; + final int refCnt = totalResult.decrementNumberOfRunningProducersAndGetRemaining(); if (refCnt == 0) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 1fe95ebf01054..c78d707029b5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitSource; @@ -112,6 +113,9 @@ public class JobVertex implements java.io.Serializable { * to be included in the JSON plan */ private String resultOptimizerProperties; + /** The input dependency constraint to schedule this vertex. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // -------------------------------------------------------------------------------------------- /** @@ -557,6 +561,14 @@ public void setResultOptimizerProperties(String resultOptimizerProperties) { this.resultOptimizerProperties = resultOptimizerProperties; } + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java index 92ad710521171..3c7354adc2f8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java @@ -177,6 +177,10 @@ private static ExecutionGraph createExecutionGraph( final String jobName = "Test Job Sample Name"; final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + for (JobVertex vertex : orderedVertices) { + vertex.setInputDependencyConstraint(inputDependencyConstraint); + } + ExecutionGraph eg = new ExecutionGraph( new DummyJobInformation( jobId, @@ -188,7 +192,6 @@ private static ExecutionGraph createExecutionGraph( new RestartAllStrategy.Factory(), slotProvider); eg.attachJobGraph(orderedVertices); - eg.setInputDependencyConstraint(inputDependencyConstraint); return eg; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index 5c94101f49041..e20c0bd7efac2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -46,7 +46,7 @@ public void testPipelinedPartitionConsumable() throws Exception { assertFalse(partition2.isConsumable()); // Partition 1 consumable after data are produced - partition1.markSomePipelinedDataProduced(); + partition1.markDataProduced(); assertTrue(partition1.isConsumable()); assertFalse(partition2.isConsumable()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 69213024975e5..bc0377b79320f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -404,6 +404,9 @@ private StreamConfig createJobVertex( LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId); } + // TODO: inherit InputDependencyConstraint from the head operator + jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint()); + jobVertices.put(streamNodeId, jobVertex); builtVertices.add(streamNodeId); jobGraph.addVertex(jobVertex); From 8c2ce553f38a0c230733b134e355575f59fd6dcb Mon Sep 17 00:00:00 2001 From: "zhuzhu.zz" Date: Wed, 16 Jan 2019 12:02:00 +0800 Subject: [PATCH 6/6] Fix for comments --- .../flink/runtime/executiongraph/ExecutionVertex.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 11e84e7ed75f3..1217217333504 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -691,9 +691,10 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { throw new IllegalStateException("Unknown partition " + partitionId + "."); } + partition.markDataProduced(); + if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition - partition.markDataProduced(); execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { @@ -739,7 +740,7 @@ List finishAllBlockingPartitions() { * * @return whether the input constraint is satisfied */ - public boolean checkInputDependencyConstraints() { + boolean checkInputDependencyConstraints() { if (getInputDependencyConstraint() == InputDependencyConstraint.ANY) { // InputDependencyConstraint == ANY return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable); @@ -757,7 +758,7 @@ public boolean checkInputDependencyConstraints() { * * @return whether the input is consumable */ - public boolean isInputConsumable(int inputNumber) { + boolean isInputConsumable(int inputNumber) { return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch( IntermediateResultPartition::isConsumable); }