Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;

/** The default input dependency constraint to schedule tasks. */
private InputDependencyConstraint defaultInputDependencyConstraint = InputDependencyConstraint.ANY;

// ------------------------------- User code values --------------------------------------------

private GlobalJobParameters globalJobParameters;
Expand Down Expand Up @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}

/**
* Sets the default input dependency constraint for vertex scheduling. It indicates when a task
* should be scheduled considering its inputs status.
*
* The default constraint is {@link InputDependencyConstraint#ANY}.
*
* @param inputDependencyConstraint The input dependency constraint.
*/
public void setDefaultInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
this.defaultInputDependencyConstraint = inputDependencyConstraint;
}

/**
* Gets the default input dependency constraint for vertex scheduling. It indicates when a task
* should be scheduled considering its inputs status.
*
* The default constraint is {@link InputDependencyConstraint#ANY}.
*
* @return The input dependency constraint of this job.
*/
public InputDependencyConstraint getDefaultInputDependencyConstraint() {
return defaultInputDependencyConstraint;
}

/**
* Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
* In some cases this might be preferable. For example, when using interfaces
Expand Down Expand Up @@ -918,7 +945,8 @@ public boolean equals(Object obj) {
registeredKryoTypes.equals(other.registeredKryoTypes) &&
registeredPojoTypes.equals(other.registeredPojoTypes) &&
taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
useSnapshotCompression == other.useSnapshotCompression;
useSnapshotCompression == other.useSnapshotCompression &&
defaultInputDependencyConstraint == other.defaultInputDependencyConstraint;

} else {
return false;
Expand Down Expand Up @@ -946,7 +974,8 @@ public int hashCode() {
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis,
useSnapshotCompression);
useSnapshotCompression,
defaultInputDependencyConstraint);
}

public boolean canEqual(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

/**
* This constraint indicates when a task should be scheduled considering its inputs status.
*/
public enum InputDependencyConstraint {

/**
* Schedule the task if any input is consumable.
*/
ANY,

/**
* Schedule the task if all the inputs are consumable.
*/
ALL
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {

// add vertices to the graph
for (JobVertex vertex : this.vertices.values()) {
vertex.setInputDependencyConstraint(program.getOriginalPlan().getExecutionConfig().getDefaultInputDependencyConstraint());
graph.addVertex(vertex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public static InputChannelDeploymentDescriptor[] fromEdges(
final ResultPartitionLocation partitionLocation;

// The producing task needs to be RUNNING or already FINISHED
if (consumedPartition.isConsumable() && producerSlot != null &&
if ((consumedPartition.getResultType().isPipelined() || consumedPartition.isConsumable()) &&
producerSlot != null &&
(producerState == ExecutionState.RUNNING ||
producerState == ExecutionState.FINISHED ||
producerState == ExecutionState.SCHEDULED ||
Expand Down Expand Up @@ -136,7 +137,8 @@ else if (producerState == ExecutionState.CANCELING
}
else {
String msg = String.format("Trying to eagerly schedule a task whose inputs " +
"are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
"are not ready (result type: %s, partition consumable: %s, producer state: %s, producer slot: %s).",
consumedPartition.getResultType(),
consumedPartition.isConsumable(),
producerState,
producerSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
Expand Down Expand Up @@ -718,6 +719,26 @@ else if (current == CREATED || current == SCHEDULED) {
}
}

private void scheduleConsumer(ExecutionVertex consumerVertex) {
CompletableFuture.supplyAsync(
() -> {
try {
final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
consumerVertex.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
LocationPreferenceConstraint.ANY, // there must be at least one known location
Collections.emptySet());
} catch (Throwable t) {
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
"vertex " + consumerVertex, t));
}

return null;
},
executor);
}

void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
final int numConsumers = allConsumers.size();

Expand Down Expand Up @@ -755,23 +776,16 @@ else if (numConsumers == 0) {
// TODO The current approach may send many update messages even though the consuming
// task has already been deployed with all necessary information. We have to check
// whether this is a problem and fix it, if it is.
CompletableFuture.supplyAsync(
() -> {
try {
final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
consumerVertex.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
LocationPreferenceConstraint.ANY, // there must be at least one known location
Collections.emptySet());
} catch (Throwable t) {
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
"vertex " + consumerVertex, t));
}

return null;
},
executor);
// Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling.
// A shortcut of input constraint check is added for InputDependencyConstraint.ANY since
// at least one of the consumer vertex's inputs is consumable here. This is to avoid the
// O(N) complexity introduced by input constraint check for InputDependencyConstraint.ANY,
// as we do not want the default scheduling performance to be affected.
if (consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY ||
consumerVertex.checkInputDependencyConstraints()) {
scheduleConsumer(consumerVertex);
}

// double check to resolve race conditions
if (consumerVertex.getExecutionState() == RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
Expand Down Expand Up @@ -366,6 +367,10 @@ public List<IntermediateResult> getInputs() {
return inputs;
}

public InputDependencyConstraint getInputDependencyConstraint() {
return getJobVertex().getInputDependencyConstraint();
}

public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
// only one thread should offload the task information, so let's also let only one thread
// serialize the task information!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -61,6 +62,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -70,6 +72,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;

Expand Down Expand Up @@ -340,6 +343,10 @@ public Map<IntermediateResultPartitionID, IntermediateResultPartition> getProduc
return resultPartitions;
}

public InputDependencyConstraint getInputDependencyConstraint() {
return getJobVertex().getInputDependencyConstraint();
}

// --------------------------------------------------------------------------------------------
// Graph building
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -684,6 +691,8 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
throw new IllegalStateException("Unknown partition " + partitionId + ".");
}

partition.markDataProduced();

if (partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
execution.scheduleOrUpdateConsumers(partition.getConsumers());
Expand Down Expand Up @@ -726,6 +735,34 @@ List<IntermediateResultPartition> finishAllBlockingPartitions() {
}
}

/**
* Check whether the InputDependencyConstraint is satisfied for this vertex.
*
* @return whether the input constraint is satisfied
*/
boolean checkInputDependencyConstraints() {
if (getInputDependencyConstraint() == InputDependencyConstraint.ANY) {
// InputDependencyConstraint == ANY
return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having moved isInputConsumable into IntermediateResult allows to get rid of the indirection of the IntStream.

} else {
// InputDependencyConstraint == ALL
return IntStream.range(0, inputEdges.length).allMatch(this::isInputConsumable);
}
}

/**
* Get whether an input of the vertex is consumable.
* An input is consumable when when any partition in it is consumable.
*
* Note that a BLOCKING result partition is only consumable when all partitions in the result are FINISHED.
*
* @return whether the input is consumable
*/
boolean isInputConsumable(int inputNumber) {
return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch(
IntermediateResultPartition::isConsumable);
}

// --------------------------------------------------------------------------------------------
// Notifications from the Execution Attempt
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,17 @@ public int getConnectionIndex() {

void resetForNewExecution() {
this.numberOfRunningProducers.set(numParallelProducers);
for (IntermediateResultPartition partition : partitions) {
partition.resetForNewExecution();
}
}

int decrementNumberOfRunningProducersAndGetRemaining() {
return numberOfRunningProducers.decrementAndGet();
}

boolean isConsumable() {
if (resultType.isPipelined()) {
return true;
}
else {
return numberOfRunningProducers.get() == 0;
}
boolean areAllPartitionsFinished() {
return numberOfRunningProducers.get() == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public class IntermediateResultPartition {

private List<List<ExecutionEdge>> consumers;

/**
* Whether this partition has produced some data.
*/
private boolean hasDataProduced = false;

public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) {
this.totalResult = totalResult;
this.producer = producer;
Expand All @@ -60,16 +65,28 @@ public IntermediateResultPartitionID getPartitionId() {
return partitionId;
}

ResultPartitionType getResultType() {
public ResultPartitionType getResultType() {
return totalResult.getResultType();
}

public List<List<ExecutionEdge>> getConsumers() {
return consumers;
}

public void markDataProduced() {
hasDataProduced = true;
}

public boolean isConsumable() {
return totalResult.isConsumable();
if (getResultType().isPipelined()) {
return hasDataProduced;
} else {
return totalResult.areAllPartitionsFinished();
}
}

void resetForNewExecution() {
hasDataProduced = false;
}

int addConsumerGroup() {
Expand All @@ -94,6 +111,8 @@ boolean markFinished() {
throw new IllegalStateException("Tried to mark a non-blocking result partition as finished");
}

hasDataProduced = true;

final int refCnt = totalResult.decrementNumberOfRunningProducersAndGetRemaining();

if (refCnt == 0) {
Expand Down
Loading