Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;

import javax.annotation.Nonnegative;

import java.io.Serializable;
import java.util.Arrays;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -54,23 +56,20 @@ public class InputGateDeploymentDescriptor implements Serializable {
* The index of the consumed subpartition of each consumed partition. This index depends on the
* {@link DistributionPattern} and the subtask indices of the producing and consuming task.
*/
@Nonnegative
private final int consumedSubpartitionIndex;

/** An input channel for each consumed subpartition. */
private final InputChannelDeploymentDescriptor[] inputChannels;
private final ShuffleDescriptor[] inputChannels;

public InputGateDeploymentDescriptor(
IntermediateDataSetID consumedResultId,
ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
InputChannelDeploymentDescriptor[] inputChannels) {

@Nonnegative int consumedSubpartitionIndex,
ShuffleDescriptor[] inputChannels) {
this.consumedResultId = checkNotNull(consumedResultId);
this.consumedPartitionType = checkNotNull(consumedPartitionType);

checkArgument(consumedSubpartitionIndex >= 0);
this.consumedSubpartitionIndex = consumedSubpartitionIndex;

this.inputChannels = checkNotNull(inputChannels);
}

Expand All @@ -87,11 +86,12 @@ public ResultPartitionType getConsumedPartitionType() {
return consumedPartitionType;
}

@Nonnegative
public int getConsumedSubpartitionIndex() {
return consumedSubpartitionIndex;
}

public InputChannelDeploymentDescriptor[] getInputChannelDeploymentDescriptors() {
public ShuffleDescriptor[] getShuffleDescriptors() {
return inputChannels;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

package org.apache.flink.runtime.deployment;

import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

import java.io.Serializable;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -39,99 +39,59 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {

private static final long serialVersionUID = 6343547936086963705L;

/** The ID of the result this partition belongs to. */
private final IntermediateDataSetID resultId;
private final PartitionDescriptor partitionDescriptor;

/** The ID of the partition. */
private final IntermediateResultPartitionID partitionId;
private final ShuffleDescriptor shuffleDescriptor;

/** The type of the partition. */
private final ResultPartitionType partitionType;

/** The number of subpartitions. */
private final int numberOfSubpartitions;

/** The maximum parallelism. */
private final int maxParallelism;

/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean sendScheduleOrUpdateConsumersMessage;

public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
PartitionDescriptor partitionDescriptor,
ShuffleDescriptor shuffleDescriptor,
int maxParallelism,
boolean lazyScheduling) {

this.resultId = checkNotNull(resultId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);

boolean sendScheduleOrUpdateConsumersMessage) {
this.partitionDescriptor = checkNotNull(partitionDescriptor);
this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
this.maxParallelism = maxParallelism;
this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
}

public IntermediateDataSetID getResultId() {
return resultId;
return partitionDescriptor.getResultId();
}

public IntermediateResultPartitionID getPartitionId() {
return partitionId;
return partitionDescriptor.getPartitionId();
}

public ResultPartitionType getPartitionType() {
return partitionType;
return partitionDescriptor.getPartitionType();
}

public int getNumberOfSubpartitions() {
return numberOfSubpartitions;
return partitionDescriptor.getNumberOfSubpartitions();
}

public int getMaxParallelism() {
return maxParallelism;
}

public ShuffleDescriptor getShuffleDescriptor() {
return shuffleDescriptor;
}

public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
}

@Override
public String toString() {
return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
+ "partition id: %s, partition type: %s]",
resultId, partitionId, partitionType);
}

// ------------------------------------------------------------------------

public static ResultPartitionDeploymentDescriptor from(
IntermediateResultPartition partition, int maxParallelism, boolean lazyScheduling) {

final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
final IntermediateResultPartitionID partitionId = partition.getPartitionId();
final ResultPartitionType partitionType = partition.getIntermediateResult().getResultType();

// The produced data is partitioned among a number of subpartitions.
//
// If no consumers are known at this point, we use a single subpartition, otherwise we have
// one for each consuming sub task.
int numberOfSubpartitions = 1;

if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {

if (partition.getConsumers().size() > 1) {
throw new IllegalStateException("Currently, only a single consumer group per partition is supported.");
}

numberOfSubpartitions = partition.getConsumers().get(0).size();
}

return new ResultPartitionDeploymentDescriptor(
resultId, partitionId, partitionType, numberOfSubpartitions, maxParallelism, lazyScheduling);
return String.format("ResultPartitionDeploymentDescriptor [PartitionDescriptor: %s, "
+ "ShuffleDescriptor: %s]",
partitionDescriptor, shuffleDescriptor);
}
}
Loading