Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
private boolean failTaskOnCheckpointError = true;

/** The input dependency constraint to schedule tasks. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;

// ------------------------------- User code values --------------------------------------------

private GlobalJobParameters globalJobParameters;
Expand Down Expand Up @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
return executionMode;
}

/**
* Sets the input dependency constraint for vertex scheduling. It indicates when a task
* should be scheduled considering its inputs status.
*
* The default constraint is {@link InputDependencyConstraint#ANY}.
*
* @param inputDependencyConstraint The input dependency constraint.
*/
public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
this.inputDependencyConstraint = inputDependencyConstraint;
}

/**
* Gets the input dependency constraint for vertex scheduling. It indicates when a task
* should be scheduled considering its inputs status.
*
* The default constraint is {@link InputDependencyConstraint#ANY}.
*
* @return The input dependency constraint of this job.
*/
public InputDependencyConstraint getInputDependencyConstraint() {
return inputDependencyConstraint;
}

/**
* Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
* In some cases this might be preferable. For example, when using interfaces
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

/**
* This constraint indicates when a task should be scheduled considering its inputs status.
*/
public enum InputDependencyConstraint {

/**
* Schedule the task if any input is consumable.
*/
ANY,

/**
* Schedule the task if all the inputs are consumable.
*/
ALL
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public static InputChannelDeploymentDescriptor[] fromEdges(
final ResultPartitionLocation partitionLocation;

// The producing task needs to be RUNNING or already FINISHED
if (consumedPartition.isConsumable() && producerSlot != null &&
if ((consumedPartition.getResultType().isPipelined() || consumedPartition.isConsumable()) &&
producerSlot != null &&
(producerState == ExecutionState.RUNNING ||
producerState == ExecutionState.FINISHED ||
producerState == ExecutionState.SCHEDULED ||
Expand Down Expand Up @@ -136,7 +137,8 @@ else if (producerState == ExecutionState.CANCELING
}
else {
String msg = String.format("Trying to eagerly schedule a task whose inputs " +
"are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
"are not ready (result type: %s, partition consumable: %s, producer state: %s, producer slot: %s).",
consumedPartition.getResultType(),
consumedPartition.isConsumable(),
producerState,
producerSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,31 +747,34 @@ else if (numConsumers == 0) {
consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
partition, partitionExecution));

// When deploying a consuming task, its task deployment descriptor will contain all
// deployment information available at the respective time. It is possible that some
// of the partitions to be consumed have not been created yet. These are updated
// runtime via the update messages.
//
// TODO The current approach may send many update messages even though the consuming
// task has already been deployed with all necessary information. We have to check
// whether this is a problem and fix it, if it is.
CompletableFuture.supplyAsync(
() -> {
try {
final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
consumerVertex.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
LocationPreferenceConstraint.ANY, // there must be at least one known location
Collections.emptySet());
} catch (Throwable t) {
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
// Schedule the consumer vertex if its inputs constraint is satisfied, otherwise postpone the scheduling
if (consumerVertex.checkInputDependencyConstraints()) {
// When deploying a consuming task, its task deployment descriptor will contain all
// deployment information available at the respective time. It is possible that some
// of the partitions to be consumed have not been created yet. These are updated
// runtime via the update messages.
//
// TODO The current approach may send many update messages even though the consuming
// task has already been deployed with all necessary information. We have to check
// whether this is a problem and fix it, if it is.
CompletableFuture.supplyAsync(
() -> {
try {
final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
consumerVertex.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
LocationPreferenceConstraint.ANY, // there must be at least one known location
Collections.emptySet());
} catch (Throwable t) {
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
"vertex " + consumerVertex, t));
}
}

return null;
},
executor);
return null;
},
executor);
}

// double check to resolve race conditions
if (consumerVertex.getExecutionState() == RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
Expand Down Expand Up @@ -255,6 +256,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

/** The input dependency constraint to schedule tasks. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;

// ------ Execution status and progress. These values are volatile, and accessed under the lock -------

private final AtomicInteger verticesFinished;
Expand Down Expand Up @@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() {
return scheduleMode;
}

public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
this.inputDependencyConstraint = inputDependencyConstraint;
}

public InputDependencyConstraint getInputDependencyConstraint() {
return inputDependencyConstraint;
}

public Time getAllocationTimeout() {
return allocationTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph(
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());

try {
executionGraph.setInputDependencyConstraint(
jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint());
} catch (IOException | ClassNotFoundException e) {
throw new JobException("Fail to deserialize execution config.", e);
}

try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {

if (partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
partition.markDataProduced();
execution.scheduleOrUpdateConsumers(partition.getConsumers());
}
else {
Expand Down Expand Up @@ -726,6 +728,56 @@ List<IntermediateResultPartition> finishAllBlockingPartitions() {
}
}

/**
* Check whether the InputDependencyConstraint is satisfied for this vertex.
*
* @return whether the input constraint is satisfied
*/
public boolean checkInputDependencyConstraints() {
if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) {
// InputDependencyConstraint == ANY
for (int i = 0; i < inputEdges.length; i++) {
if (isInputConsumable(i)) {
return true;
}
}
return false;
} else {
// InputDependencyConstraint == ALL
for (int i = 0; i < inputEdges.length; i++) {
if (!isInputConsumable(i)) {
return false;
}
}
return true;
}
}

/**
* An input is consumable when
* 1. the source result is PIPELINED and one of the result partition has produced data.
* 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED).
*
* @return whether the input is consumable
*/
public boolean isInputConsumable(int inputNumber) {
IntermediateResult result = jobVertex.getInputs().get(inputNumber);

if (result.getResultType().isPipelined()) {
// For PIPELINED result, the input is consumable if any result partition has produced records or is finished
for (ExecutionEdge edge : inputEdges[inputNumber]) {
if (edge.getSource().hasDataProduced()) {
return true;
}
}
} else {
// For BLOCKING result, the input is consumable if all the partitions in the result are finished
return result.areAllPartitionsFinished();
}

return false;
}

// --------------------------------------------------------------------------------------------
// Notifications from the Execution Attempt
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,17 @@ public int getConnectionIndex() {

void resetForNewExecution() {
this.numberOfRunningProducers.set(numParallelProducers);
for (IntermediateResultPartition partition : partitions) {
partition.resetForNewExecution();
}
}

int decrementNumberOfRunningProducersAndGetRemaining() {
return numberOfRunningProducers.decrementAndGet();
}

boolean isConsumable() {
if (resultType.isPipelined()) {
return true;
}
else {
return numberOfRunningProducers.get() == 0;
}
boolean areAllPartitionsFinished() {
return numberOfRunningProducers.get() == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public class IntermediateResultPartition {

private List<List<ExecutionEdge>> consumers;

/**
* Whether this partition has data produced. For pipelined result only.
*/
private boolean dataProduced = false;

public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) {
this.totalResult = totalResult;
this.producer = producer;
Expand All @@ -60,16 +65,32 @@ public IntermediateResultPartitionID getPartitionId() {
return partitionId;
}

ResultPartitionType getResultType() {
public ResultPartitionType getResultType() {
return totalResult.getResultType();
}

public List<List<ExecutionEdge>> getConsumers() {
return consumers;
}

public void markDataProduced() {
dataProduced = true;
}

public boolean hasDataProduced() {
return dataProduced;
}

public boolean isConsumable() {
return totalResult.isConsumable();
if (getResultType().isPipelined()) {
return dataProduced;
} else {
return totalResult.areAllPartitionsFinished();
}
}

void resetForNewExecution() {
dataProduced = false;
}

int addConsumerGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -192,6 +193,7 @@ private static ExecutionVertex mockExecutionVertex(ExecutionState state, Resourc

private static IntermediateResultPartition mockPartition(ExecutionVertex producer) {
IntermediateResultPartition partition = mock(IntermediateResultPartition.class);
when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
when(partition.isConsumable()).thenReturn(true);

IntermediateResult result = mock(IntermediateResult.class);
Expand Down
Loading