diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
deleted file mode 100644
index d45ca51f92826..0000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.deployment;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import javax.annotation.Nonnull;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Deployment descriptor for a single input channel instance.
- *
- *
Each input channel consumes a single subpartition. The index of the subpartition to consume
- * is part of the {@link InputGateDeploymentDescriptor} as it is the same for each input channel of
- * the respective input gate.
- *
- * @see InputChannel
- * @see SingleInputGate
- */
-public class InputChannelDeploymentDescriptor implements Serializable {
-
- private static final long serialVersionUID = 373711381640454080L;
-
- /** The ID of the partition the input channel is going to consume. */
- private final ResultPartitionID consumedPartitionId;
-
- /** The location of the partition the input channel is going to consume. */
- private final ResultPartitionLocation consumedPartitionLocation;
-
- public InputChannelDeploymentDescriptor(
- ResultPartitionID consumedPartitionId,
- ResultPartitionLocation consumedPartitionLocation) {
-
- this.consumedPartitionId = checkNotNull(consumedPartitionId);
- this.consumedPartitionLocation = checkNotNull(consumedPartitionLocation);
- }
-
- public ResultPartitionID getConsumedPartitionId() {
- return consumedPartitionId;
- }
-
- public ResultPartitionLocation getConsumedPartitionLocation() {
- return consumedPartitionLocation;
- }
-
- @Override
- public String toString() {
- return String.format("InputChannelDeploymentDescriptor [consumed partition id: %s, " +
- "consumed partition location: %s]",
- consumedPartitionId, consumedPartitionLocation);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates an input channel deployment descriptor for each partition.
- */
- public static List fromEdges(
- List edges,
- boolean allowLazyDeployment) {
- return edges.stream().map(edge -> fromEdgeAndValidate(allowLazyDeployment, edge)).collect(Collectors.toList());
- }
-
- @Nonnull
- private static InputChannelDeploymentDescriptor fromEdgeAndValidate(boolean allowLazyDeployment, ExecutionEdge edge) {
- final InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor = fromEdge(edge);
-
- if (!allowLazyDeployment && inputChannelDeploymentDescriptor.getConsumedPartitionLocation().isUnknown()) {
- final IntermediateResultPartition consumedPartition = edge.getSource();
- final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
- final ExecutionState producerState = producer.getState();
-
- if (producerState == ExecutionState.CANCELING
- || producerState == ExecutionState.CANCELED
- || producerState == ExecutionState.FAILED) {
- String msg = "Trying to schedule a task whose inputs were canceled or failed. " +
- "The producer is in state " + producerState + '.';
- throw new IllegalStateException(msg);
- } else {
- final LogicalSlot producerSlot = producer.getAssignedResource();
-
- String msg = String.format("Trying to eagerly schedule a task whose inputs " +
- "are not ready (result type: %s, partition consumable: %s, producer state: %s, producer slot: %s).",
- consumedPartition.getResultType(),
- consumedPartition.isConsumable(),
- producerState,
- producerSlot);
- throw new IllegalStateException(msg);
- }
- }
-
- return inputChannelDeploymentDescriptor;
- }
-
- @Nonnull
- public static InputChannelDeploymentDescriptor fromEdge(ExecutionEdge edge) {
- final IntermediateResultPartition consumedPartition = edge.getSource();
- final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
-
- final ExecutionState producerState = producer.getState();
- final LogicalSlot producerSlot = producer.getAssignedResource();
-
- final ResultPartitionLocation partitionLocation;
-
- // The producing task needs to be RUNNING or already FINISHED
- if ((consumedPartition.getResultType().isPipelined() || consumedPartition.isConsumable()) &&
- producerSlot != null &&
- (producerState == ExecutionState.RUNNING ||
- producerState == ExecutionState.FINISHED ||
- producerState == ExecutionState.SCHEDULED ||
- producerState == ExecutionState.DEPLOYING)) {
-
- final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
- final LogicalSlot consumerSlot = edge.getTarget().getCurrentAssignedResource();
-
- if (consumerSlot != null) {
- partitionLocation = createKnownResultPartitionLocation(consumerSlot.getTaskManagerLocation().getResourceID(), consumedPartition, partitionTaskManagerLocation);
- } else {
- throw new IllegalStateException("Cannot create an input channel descriptor for a consumer which has no slot assigned.");
- }
- }
- else {
- // The producing task might not have registered the partition yet
- partitionLocation = ResultPartitionLocation.createUnknown();
- }
-
- final ResultPartitionID consumedPartitionId = new ResultPartitionID(
- consumedPartition.getPartitionId(), producer.getAttemptId());
-
- return new InputChannelDeploymentDescriptor(consumedPartitionId, partitionLocation);
- }
-
- @Nonnull
- private static ResultPartitionLocation createKnownResultPartitionLocation(
- ResourceID consumerResourceId,
- IntermediateResultPartition consumedPartition,
- TaskManagerLocation partitionTaskManagerLocation) {
- ResultPartitionLocation partitionLocation;
- final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
-
- if (partitionTaskManager.equals(consumerResourceId)) {
- // Consuming task is deployed to the same TaskManager as the partition => local
- partitionLocation = ResultPartitionLocation.createLocal();
- }
- else {
- // Different instances => remote
- final ConnectionID connectionId = new ConnectionID(
- partitionTaskManagerLocation,
- consumedPartition.getIntermediateResult().getConnectionIndex());
-
- partitionLocation = ResultPartitionLocation.createRemote(connectionId);
- }
- return partitionLocation;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 8b0dbad7fd4b9..345a99d3c9c17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -22,11 +22,13 @@
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+
+import javax.annotation.Nonnegative;
import java.io.Serializable;
import java.util.Arrays;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -54,23 +56,20 @@ public class InputGateDeploymentDescriptor implements Serializable {
* The index of the consumed subpartition of each consumed partition. This index depends on the
* {@link DistributionPattern} and the subtask indices of the producing and consuming task.
*/
+ @Nonnegative
private final int consumedSubpartitionIndex;
/** An input channel for each consumed subpartition. */
- private final InputChannelDeploymentDescriptor[] inputChannels;
+ private final ShuffleDescriptor[] inputChannels;
public InputGateDeploymentDescriptor(
IntermediateDataSetID consumedResultId,
ResultPartitionType consumedPartitionType,
- int consumedSubpartitionIndex,
- InputChannelDeploymentDescriptor[] inputChannels) {
-
+ @Nonnegative int consumedSubpartitionIndex,
+ ShuffleDescriptor[] inputChannels) {
this.consumedResultId = checkNotNull(consumedResultId);
this.consumedPartitionType = checkNotNull(consumedPartitionType);
-
- checkArgument(consumedSubpartitionIndex >= 0);
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
-
this.inputChannels = checkNotNull(inputChannels);
}
@@ -87,11 +86,12 @@ public ResultPartitionType getConsumedPartitionType() {
return consumedPartitionType;
}
+ @Nonnegative
public int getConsumedSubpartitionIndex() {
return consumedSubpartitionIndex;
}
- public InputChannelDeploymentDescriptor[] getInputChannelDeploymentDescriptors() {
+ public ShuffleDescriptor[] getShuffleDescriptors() {
return inputChannels;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2a3feb960de2a..064c9bdff173f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -18,16 +18,16 @@
package org.apache.flink.runtime.deployment;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import java.io.Serializable;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -39,99 +39,59 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
private static final long serialVersionUID = 6343547936086963705L;
- /** The ID of the result this partition belongs to. */
- private final IntermediateDataSetID resultId;
+ private final PartitionDescriptor partitionDescriptor;
- /** The ID of the partition. */
- private final IntermediateResultPartitionID partitionId;
+ private final ShuffleDescriptor shuffleDescriptor;
- /** The type of the partition. */
- private final ResultPartitionType partitionType;
-
- /** The number of subpartitions. */
- private final int numberOfSubpartitions;
-
- /** The maximum parallelism. */
private final int maxParallelism;
/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean sendScheduleOrUpdateConsumersMessage;
public ResultPartitionDeploymentDescriptor(
- IntermediateDataSetID resultId,
- IntermediateResultPartitionID partitionId,
- ResultPartitionType partitionType,
- int numberOfSubpartitions,
+ PartitionDescriptor partitionDescriptor,
+ ShuffleDescriptor shuffleDescriptor,
int maxParallelism,
- boolean lazyScheduling) {
-
- this.resultId = checkNotNull(resultId);
- this.partitionId = checkNotNull(partitionId);
- this.partitionType = checkNotNull(partitionType);
-
+ boolean sendScheduleOrUpdateConsumersMessage) {
+ this.partitionDescriptor = checkNotNull(partitionDescriptor);
+ this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
- checkArgument(numberOfSubpartitions >= 1);
- this.numberOfSubpartitions = numberOfSubpartitions;
this.maxParallelism = maxParallelism;
- this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
+ this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
}
public IntermediateDataSetID getResultId() {
- return resultId;
+ return partitionDescriptor.getResultId();
}
public IntermediateResultPartitionID getPartitionId() {
- return partitionId;
+ return partitionDescriptor.getPartitionId();
}
public ResultPartitionType getPartitionType() {
- return partitionType;
+ return partitionDescriptor.getPartitionType();
}
public int getNumberOfSubpartitions() {
- return numberOfSubpartitions;
+ return partitionDescriptor.getNumberOfSubpartitions();
}
public int getMaxParallelism() {
return maxParallelism;
}
+ public ShuffleDescriptor getShuffleDescriptor() {
+ return shuffleDescriptor;
+ }
+
public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
}
@Override
public String toString() {
- return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
- + "partition id: %s, partition type: %s]",
- resultId, partitionId, partitionType);
- }
-
- // ------------------------------------------------------------------------
-
- public static ResultPartitionDeploymentDescriptor from(
- IntermediateResultPartition partition, int maxParallelism, boolean lazyScheduling) {
-
- final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
- final IntermediateResultPartitionID partitionId = partition.getPartitionId();
- final ResultPartitionType partitionType = partition.getIntermediateResult().getResultType();
-
- // The produced data is partitioned among a number of subpartitions.
- //
- // If no consumers are known at this point, we use a single subpartition, otherwise we have
- // one for each consuming sub task.
- int numberOfSubpartitions = 1;
-
- if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {
-
- if (partition.getConsumers().size() > 1) {
- throw new IllegalStateException("Currently, only a single consumer group per partition is supported.");
- }
-
- numberOfSubpartitions = partition.getConsumers().get(0).size();
- }
-
- return new ResultPartitionDeploymentDescriptor(
- resultId, partitionId, partitionType, numberOfSubpartitions, maxParallelism, lazyScheduling);
+ return String.format("ResultPartitionDeploymentDescriptor [PartitionDescriptor: %s, "
+ + "ShuffleDescriptor: %s]",
+ partitionDescriptor, shuffleDescriptor);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
deleted file mode 100644
index 78b65273cf6ee..0000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.deployment;
-
-import org.apache.flink.runtime.io.network.ConnectionID;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Location of a result partition from the perspective of the consuming task.
- *
- * The location indicates both the instance, on which the partition is produced and the state of
- * the producing task. There are three possibilities:
- *
- *
- * - Local: The partition is available at the same instance on which the
- * consuming task is (being) deployed and the producing task has registered the result partition.
- *
- *
- Remote: The result partition is available at a different instance from the
- * one, on which the consuming task is (being) deployed and the producing task has registered the
- * result partition.
- *
- *
- Unknown: The producing task has not yet registered the result partition.
- * When deploying the consuming task, the instance might be known or unknown. In any case, the
- * consuming task cannot request it yet. Instead, it will be updated at runtime after the
- * producing task is guaranteed to have registered the partition. A producing task is guaranteed
- * to have registered the partition after its state has switched to running.
- *
- */
-public class ResultPartitionLocation implements Serializable {
-
- private static final long serialVersionUID = -6354238166937194463L;
- /** The type of location for the result partition. */
- private final LocationType locationType;
-
- /** The connection ID of a remote result partition. */
- private final ConnectionID connectionId;
-
- private enum LocationType {
- LOCAL,
- REMOTE,
- UNKNOWN
- }
-
- private ResultPartitionLocation(LocationType locationType, ConnectionID connectionId) {
- this.locationType = checkNotNull(locationType);
- this.connectionId = connectionId;
- }
-
- public static ResultPartitionLocation createRemote(ConnectionID connectionId) {
- return new ResultPartitionLocation(LocationType.REMOTE, checkNotNull(connectionId));
- }
-
- public static ResultPartitionLocation createLocal() {
- return new ResultPartitionLocation(LocationType.LOCAL, null);
- }
-
- public static ResultPartitionLocation createUnknown() {
- return new ResultPartitionLocation(LocationType.UNKNOWN, null);
- }
-
- // ------------------------------------------------------------------------
-
- public boolean isLocal() {
- return locationType == LocationType.LOCAL;
- }
-
- public boolean isRemote() {
- return locationType == LocationType.REMOTE;
- }
-
- public boolean isUnknown() {
- return locationType == LocationType.UNKNOWN;
- }
-
- public ConnectionID getConnectionId() {
- return connectionId;
- }
-
- @Override
- public String toString() {
- return "ResultPartitionLocation [" + locationType + (isRemote() ? " [" + connectionId + "]]" : "]");
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
new file mode 100644
index 0000000000000..9bef21ad9ed76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+ private final ExecutionAttemptID executionId;
+ private final int attemptNumber;
+ private final MaybeOffloaded serializedJobInformation;
+ private final MaybeOffloaded taskInfo;
+ private final JobID jobID;
+ private final boolean allowUnknownPartitions;
+ private final int subtaskIndex;
+ private final ExecutionEdge[][] inputEdges;
+
+ private TaskDeploymentDescriptorFactory(
+ ExecutionAttemptID executionId,
+ int attemptNumber,
+ MaybeOffloaded serializedJobInformation,
+ MaybeOffloaded taskInfo,
+ JobID jobID,
+ boolean allowUnknownPartitions,
+ int subtaskIndex,
+ ExecutionEdge[][] inputEdges) {
+ this.executionId = executionId;
+ this.attemptNumber = attemptNumber;
+ this.serializedJobInformation = serializedJobInformation;
+ this.taskInfo = taskInfo;
+ this.jobID = jobID;
+ this.allowUnknownPartitions = allowUnknownPartitions;
+ this.subtaskIndex = subtaskIndex;
+ this.inputEdges = inputEdges;
+ }
+
+ public TaskDeploymentDescriptor createDeploymentDescriptor(
+ AllocationID allocationID,
+ int targetSlotNumber,
+ @Nullable JobManagerTaskRestore taskRestore,
+ Collection producedPartitions) {
+ return new TaskDeploymentDescriptor(
+ jobID,
+ serializedJobInformation,
+ taskInfo,
+ executionId,
+ allocationID,
+ subtaskIndex,
+ attemptNumber,
+ targetSlotNumber,
+ taskRestore,
+ new ArrayList<>(producedPartitions),
+ createInputGateDeploymentDescriptors());
+ }
+
+ private List createInputGateDeploymentDescriptors() {
+ List inputGates = new ArrayList<>(inputEdges.length);
+
+ for (ExecutionEdge[] edges : inputEdges) {
+ // If the produced partition has multiple consumers registered, we
+ // need to request the one matching our sub task index.
+ // TODO Refactor after removing the consumers from the intermediate result partitions
+ int numConsumerEdges = edges[0].getSource().getConsumers().get(0).size();
+
+ int queueToRequest = subtaskIndex % numConsumerEdges;
+
+ IntermediateResult consumedIntermediateResult = edges[0].getSource().getIntermediateResult();
+ IntermediateDataSetID resultId = consumedIntermediateResult.getId();
+ ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
+
+ inputGates.add(new InputGateDeploymentDescriptor(
+ resultId,
+ partitionType,
+ queueToRequest,
+ getConsumedPartitionShuffleDescriptors(edges)));
+ }
+
+ return inputGates;
+ }
+
+ private ShuffleDescriptor[] getConsumedPartitionShuffleDescriptors(ExecutionEdge[] edges) {
+ ShuffleDescriptor[] shuffleDescriptors = new ShuffleDescriptor[edges.length];
+ // Each edge is connected to a different result partition
+ for (int i = 0; i < edges.length; i++) {
+ shuffleDescriptors[i] =
+ getConsumedPartitionShuffleDescriptor(edges[i], allowUnknownPartitions);
+ }
+ return shuffleDescriptors;
+ }
+
+ public static TaskDeploymentDescriptorFactory fromExecutionVertex(
+ ExecutionVertex executionVertex,
+ int attemptNumber) throws IOException {
+ ExecutionGraph executionGraph = executionVertex.getExecutionGraph();
+ return new TaskDeploymentDescriptorFactory(
+ executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ attemptNumber,
+ getSerializedJobInformation(executionGraph),
+ getSerializedTaskInformation(executionVertex.getJobVertex().getTaskInformationOrBlobKey()),
+ executionGraph.getJobID(),
+ executionGraph.getScheduleMode().allowLazyDeployment(),
+ executionVertex.getParallelSubtaskIndex(),
+ executionVertex.getAllInputEdges());
+ }
+
+ private static MaybeOffloaded getSerializedJobInformation(ExecutionGraph executionGraph) {
+ Either, PermanentBlobKey> jobInformationOrBlobKey =
+ executionGraph.getJobInformationOrBlobKey();
+ if (jobInformationOrBlobKey.isLeft()) {
+ return new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
+ } else {
+ return new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
+ }
+ }
+
+ private static MaybeOffloaded getSerializedTaskInformation(
+ Either,
+ PermanentBlobKey> taskInfo) {
+ return taskInfo.isLeft() ?
+ new TaskDeploymentDescriptor.NonOffloaded<>(taskInfo.left()) :
+ new TaskDeploymentDescriptor.Offloaded<>(taskInfo.right());
+ }
+
+ public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
+ ExecutionEdge edge,
+ boolean allowUnknownPartitions) {
+ IntermediateResultPartition consumedPartition = edge.getSource();
+ Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
+
+ ExecutionState producerState = producer.getState();
+ Optional consumedPartitionDescriptor =
+ producer.getResultPartitionDeploymentDescriptor(consumedPartition.getPartitionId());
+
+ ResultPartitionID consumedPartitionId = new ResultPartitionID(
+ consumedPartition.getPartitionId(),
+ producer.getAttemptId());
+
+ return getConsumedPartitionShuffleDescriptor(
+ consumedPartitionId,
+ consumedPartition.getResultType(),
+ consumedPartition.isConsumable(),
+ producerState,
+ allowUnknownPartitions,
+ consumedPartitionDescriptor.orElse(null));
+ }
+
+ @VisibleForTesting
+ static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
+ ResultPartitionID consumedPartitionId,
+ ResultPartitionType resultPartitionType,
+ boolean isConsumable,
+ ExecutionState producerState,
+ boolean allowUnknownPartitions,
+ @Nullable ResultPartitionDeploymentDescriptor consumedPartitionDescriptor) {
+ // The producing task needs to be RUNNING or already FINISHED
+ if ((resultPartitionType.isPipelined() || isConsumable) &&
+ consumedPartitionDescriptor != null &&
+ isProducerAvailable(producerState)) {
+ // partition is already registered
+ return consumedPartitionDescriptor.getShuffleDescriptor();
+ }
+ else if (allowUnknownPartitions) {
+ // The producing task might not have registered the partition yet
+ return new UnknownShuffleDescriptor(consumedPartitionId);
+ }
+ else {
+ // throw respective exceptions
+ handleConsumedPartitionShuffleDescriptorErrors(
+ consumedPartitionId,
+ resultPartitionType,
+ isConsumable,
+ producerState);
+ return null; // should never happen
+ }
+ }
+
+ private static void handleConsumedPartitionShuffleDescriptorErrors(
+ ResultPartitionID consumedPartitionId,
+ ResultPartitionType resultPartitionType,
+ boolean isConsumable,
+ ExecutionState producerState) {
+ if (isProducerFailedOrCanceled(producerState)) {
+ String msg = "Trying to consume an input partition whose producer has been canceled or failed. " +
+ "The producer is in state " + producerState + ".";
+ throw new IllegalStateException(msg);
+ }
+ else {
+ String msg = String.format("Trying to consume an input partition whose producer " +
+ "is not ready (result type: %s, partition consumable: %s, producer state: %s, partition id: %s).",
+ resultPartitionType,
+ isConsumable,
+ producerState,
+ consumedPartitionId);
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ private static boolean isProducerAvailable(ExecutionState producerState) {
+ return producerState == ExecutionState.RUNNING ||
+ producerState == ExecutionState.FINISHED ||
+ producerState == ExecutionState.SCHEDULED ||
+ producerState == ExecutionState.DEPLOYING;
+ }
+
+ private static boolean isProducerFailedOrCanceled(ExecutionState producerState) {
+ return producerState == ExecutionState.CANCELING ||
+ producerState == ExecutionState.CANCELED ||
+ producerState == ExecutionState.FAILED;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index d744f60895089..48e2f87f1a949 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -34,10 +34,14 @@
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -49,6 +53,10 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -64,8 +72,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -75,6 +85,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
@@ -177,6 +188,8 @@ public class Execution implements AccessExecution, Archiveable producedPartitions;
+
// --------------------------------------------------------------------------------------------
/**
@@ -215,6 +228,7 @@ public Execution(
markTimestamp(CREATED, startTimestamp);
this.partitionInfos = new ArrayList<>(16);
+ this.producedPartitions = Collections.emptyMap();
this.terminalStateFuture = new CompletableFuture<>();
this.releaseFuture = new CompletableFuture<>();
this.taskManagerLocationFuture = new CompletableFuture<>();
@@ -268,6 +282,11 @@ public LogicalSlot getAssignedResource() {
return assignedResource;
}
+ public Optional getResultPartitionDeploymentDescriptor(
+ IntermediateResultPartitionID id) {
+ return Optional.ofNullable(producedPartitions.get(id));
+ }
+
/**
* Tries to assign the given slot to the execution. The assignment works only if the
* Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
@@ -423,7 +442,7 @@ public CompletableFuture scheduleForExecution(
final ExecutionGraph executionGraph = vertex.getExecutionGraph();
final Time allocationTimeout = executionGraph.getAllocationTimeout();
try {
- final CompletableFuture allocationFuture = allocateAndAssignSlotForExecution(
+ final CompletableFuture allocationFuture = allocateResourcesForExecution(
slotProvider,
queued,
locationPreferenceConstraint,
@@ -463,7 +482,13 @@ public CompletableFuture scheduleForExecution(
}
/**
- * Allocates and assigns a slot obtained from the slot provider to the execution.
+ * Allocates resources for the execution.
+ *
+ * Allocates following resources:
+ *
+ * - slot obtained from the slot provider
+ * - registers produced partitions with the {@link org.apache.flink.runtime.shuffle.ShuffleMaster}
+ *
*
* @param slotProvider to obtain a new slot from
* @param queued if the allocation can be queued
@@ -473,14 +498,40 @@ public CompletableFuture scheduleForExecution(
* @param allocationTimeout rpcTimeout for allocating a new slot
* @return Future which is completed with this execution once the slot has been assigned
* or with an exception if an error occurred.
- * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state
*/
- public CompletableFuture allocateAndAssignSlotForExecution(
+ CompletableFuture allocateResourcesForExecution(
+ SlotProvider slotProvider,
+ boolean queued,
+ LocationPreferenceConstraint locationPreferenceConstraint,
+ @Nonnull Set allPreviousExecutionGraphAllocationIds,
+ Time allocationTimeout) {
+ return allocateAndAssignSlotForExecution(
+ slotProvider,
+ queued,
+ locationPreferenceConstraint,
+ allPreviousExecutionGraphAllocationIds,
+ allocationTimeout)
+ .thenCompose(slot -> registerProducedPartitions(slot.getTaskManagerLocation()));
+ }
+
+ /**
+ * Allocates and assigns a slot obtained from the slot provider to the execution.
+ *
+ * @param slotProvider to obtain a new slot from
+ * @param queued if the allocation can be queued
+ * @param locationPreferenceConstraint constraint for the location preferences
+ * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
+ * Can be empty if the allocation ids are not required for scheduling.
+ * @param allocationTimeout rpcTimeout for allocating a new slot
+ * @return Future which is completed with the allocated slot once it has been assigned
+ * or with an exception if an error occurred.
+ */
+ private CompletableFuture allocateAndAssignSlotForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set allPreviousExecutionGraphAllocationIds,
- Time allocationTimeout) throws IllegalExecutionStateException {
+ Time allocationTimeout) {
checkNotNull(slotProvider);
@@ -551,7 +602,7 @@ public CompletableFuture allocateAndAssignSlotForExecution(
}
if (tryAssignResource(logicalSlot)) {
- return this;
+ return logicalSlot;
} else {
// release the slot
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
@@ -566,6 +617,68 @@ public CompletableFuture allocateAndAssignSlotForExecution(
}
}
+ @VisibleForTesting
+ CompletableFuture registerProducedPartitions(TaskManagerLocation location) {
+ assertRunningInJobMasterMainThread();
+
+ return FutureUtils.thenApplyAsyncIfNotDone(
+ registerProducedPartitions(vertex, location, attemptId),
+ vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
+ producedPartitionsCache -> {
+ producedPartitions = producedPartitionsCache;
+ return this;
+ });
+ }
+
+ @VisibleForTesting
+ static CompletableFuture