Skip to content

Conversation

@azagrebin
Copy link
Contributor

@azagrebin azagrebin commented May 7, 2019

What is the purpose of the change

This PR introduces ShuffleMaster interface. JobMaster and ExecutionGraph use ShuffleMaster to register resources for produced partition of intermediate results. This is part of the overall effort to make shuffle implementation pluggable.

Brief change log

  • Introduce ShuffleMaster interface
  • Introduce ProducerDescriptor to describe producer for ShuffleMaster
  • Introduce PartitionDescriptor to describe partition for ShuffleMaster
  • Introduce ShuffleDescriptor: partition handle produced by ShuffleMaster
    to hand over to task and its local shuffle service
  • Introduce UnknownShuffleDescriptor for eager consumer scheduling
    when producer is unknown yet
  • Register produced partitions when execution gets slot
  • Refactor TaskDeploymentDescriptor creation into a TaskDeploymentDescriptorFactory
  • Refactor ResultPartitionDeploymentDescriptor to consist of
    PartitionDescriptor and ShuffleDeploymentDescriptor
  • Refactor InputGateDeploymentDescriptor and PartitionInfo to use ShuffleDescriptor
    instead of InputChannelDeploymentDescriptor
  • Introduce NettyShuffleMaster and NettyShuffleDescriptor implementations
    for existing shuffle service based on Netty communication and local files
  • Refactor SingleInputGate.create and updateInputChannel to determine partition location based on
    consumer resource id and producer id from NettyShuffleDescriptor
  • Adjust tests

Verifying this change

unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented May 7, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit f7014b7 (Wed Aug 07 16:30:56 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @tillrohrmann @zhijiangW

@tillrohrmann tillrohrmann self-assigned this May 7, 2019
@azagrebin azagrebin force-pushed the FLINK-11391-az branch 3 times, most recently from d9129e5 to 1c84635 Compare May 9, 2019 07:41
}

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether to add the left infos of numberOfSubpartitions and maxParallelism here.

return maxParallelism;
}

public int getConnectionIndex() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it could be only package private. I am not sure whether it could be used outside of package future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be potentially used outside of this package in other shuffle implementation. I would leave methods public here because it is basically part of shuffle API then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend following the conservative approach Zhijiang proposed and make things only public if they are really needed. Decreasing visibility is always much harder than increasing it.


return new PartitionShuffleDescriptor(
resultId, partitionId, partitionType, numberOfSubpartitions, maxParallelism,
partition.getIntermediateResult().getConnectionIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether it is worth defining a private connectionIndex var in front for partition.getIntermediateResult().getConnectionIndex() to keep the same way as other vars.

}

@Nonnull
public ShuffleDeploymentDescriptor getShuffleDeploymentDescriptor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private

private final ConnectionID producerConnection;

@Nonnull
private final ResultPartitionID resultPartitionID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to name resultPartitionId consistent with producerResourceId

import java.util.List;
import java.util.Map;

/** Factory of {@link TaskDeploymentDescriptor} to deploy {@link Execution}. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotation format /** */ seems mainly used in variable not in class. I am not very sure of it.

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR and it seems really good @azagrebin .

I just left some minor inline comments and thought of two concerns currently. I have not finished the whole review and would continue on it.

  1. The class PartitionInfo seems a bit redundant which might be replaced by InputGateDeploymentDescriptor(IGDD) completely. Because they cover most of the same informations, only ResultPartitionType and consumedSubpartitionIndex in IGDD seems not necessary for PartitionInfo now, but the ResultPartitionType might also make sense in PartitionInfo if we support the dynamically determine the type future. The current TaskExecutor#updatePartitions(IGDD) could also work well to make a bit change. And it seems also consistent to use the same structure during deploying and updating.

  2. I am not sure it is a good way for using instanceof in SingleInputGate to check the instance of ShuffleDeploymentDescriptor and make the transformation. Another option is we define some methods in the interface ShuffleDeploymentDescriptor, such as boolean isUnknown(), getResultPartitionID(), getConnectionID() to avoid this.

@rmetzger rmetzger requested a review from tillrohrmann May 11, 2019 11:43
@azagrebin
Copy link
Contributor Author

Thanks for the review @zhijiangW ! I have addressed smaller comments.

  1. True, PartitionInfo looks similar to IGDD. The difference is atm that PartitionInfo represents update of only one (but any) gate channel. IGDD has a consistent view of all gate channels. Not sure, how well it is semantically to treat IGDD as one channel update. I will think about it.

  2. In general, I do not see a problem with instanceof or what is the concern here? If we add more methods like isUnknown(), all SDDs will have to implement them. Of course, we can make them inherit from a base class but it will clutter their code. Also channel's unknownness is more a general concept of scheduling in the absence of producer.

@zhijiangW
Copy link
Contributor

zhijiangW commented May 15, 2019

Thanks for the confirmation @azagrebin .

  1. Yes, I have not thought through the changes caused by single channel in PartitionInfo and all channels in IGDD. Just from the aspect of rpc call taskManagerGateway.updatePartitions(partitionInfos), the parameter is a collection of PartitionInfo which is the same as array of channels in IGDD. Maybe the IGDD should support cache ICDD internally and replace the array with collection. It might involve in more refactoring and I would also further consider it.

  2. From functional aspect the current way is no problem. But I was ever suggested in my PR not using instanceof via introducing the interface method ChannelSelector#isBroadcast. Because instanceof sounds like a hacky, not a proper solution. I am not sure whether it is not suggested in common sense atm, or maybe it is just a personal preference. I think you could confirm this way with other guys. :)

BTW, I have not finished the whole review yet. I would continue on it later.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin. I gave it a first pass mainly having minor comments (many style comments which do not affect correctness). I will give it another pass to look in detail at the abstractions. I will post comments regarding the abstractions next.

@Nonnull ResultPartitionType consumedPartitionType,
@Nonnegative int consumedSubpartitionIndex,
@Nonnull ShuffleDeploymentDescriptor[] inputChannels,
@Nonnull ResourceID consumerResourceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink's code base uses double indentation to distinguish the parameters from the function body.

}

public static TaskDeploymentDescriptorFactory fromExecutionVertex(
ExecutionVertex executionVertex, ExecutionAttemptID executionId, int attemptNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we break the parameter list, then I would suggest to break all parameters.

JobID jobID,
boolean lazyScheduling,
int subtaskIndex,
ExecutionEdge[][] inputEdges) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not super consistent in the Flink code base but newer code tries to indent parameters lists an additional level to distinguish them from the method body.

this.inputEdges = inputEdges;
}

public static TaskDeploymentDescriptorFactory fromExecutionVertex(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually static functions go to the bottom of the class. The order is roughly

Static fields;
Field;
Constructors;
Methods;
Static functions;


/**
* Creates a task deployment descriptor to deploy a subtask to the given target slot.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These JavaDocs don't add much information.

}

private TaskDeploymentDescriptor createReceiver(
DefaultShuffleDeploymentDescriptor sdd, ResourceID location) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting


PartitionShuffleDescriptor psd = new PartitionShuffleDescriptor(
new IntermediateDataSetID(), sdd.getResultPartitionID().getPartitionId(),
ResultPartitionType.PIPELINED, 1, 1, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

}

public static InputGateDeploymentDescriptor createInputGateDeploymentDescriptor(
ShuffleDeploymentDescriptor sdd, ResourceID consumerLocation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line breaks

ResultPartitionType.PIPELINED_BOUNDED,
channel,
channelDescriptors);
channelDescriptors, localLocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line break

}
}

private static ShuffleDeploymentDescriptor createLocalSdd(ResultPartitionID resultPartitionID, ResourceID location) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could ShuffleTestUtils#createSddWithLocalConnection be used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also use NettyShuffleDescriptorBuilder here.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the abstractions you've put in place @azagrebin. Good work! I've had a couple of minor comments.

/**
* Default implementation of {@link ShuffleDeploymentDescriptor} for {@link DefaultShuffleMaster}.
*/
public class DefaultShuffleDeploymentDescriptor implements ShuffleDeploymentDescriptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serial version UID is missing. I recommend to activate IntelliJ's inspections which will mark this as an error.

/**
* Partition producer descriptor for {@link ShuffleMaster} to obtain {@link ShuffleDeploymentDescriptor}.
*/
public class ProducerShuffleDescriptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could rename this class into ProducerDescriptor because it is already part of the shuffle package.

/**
* Partition descriptor for {@link ShuffleMaster} to obtain {@link ShuffleDeploymentDescriptor}.
*/
public class PartitionShuffleDescriptor implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do the same here with PartitionShuffleDescriptor --> PartitionDescriptor

* Intermediate result partition registry to use in {@link org.apache.flink.runtime.jobmaster.JobMaster}.
*/
public interface ShuffleMaster {
CompletableFuture<ShuffleDeploymentDescriptor> registerPartitionWithProducer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could think about introducing a T extends ShuffleDeploymentDescriptor and then letting the specific implementations define the T. This could avoid type casting when testing the implementations.

this.intermediateDataSetID = Preconditions.checkNotNull(intermediateResultPartitionID);
this.inputChannelDeploymentDescriptor = Preconditions.checkNotNull(inputChannelDeploymentDescriptor);
@Nonnull
private final ResourceID consumerResourceID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this information should come from the TaskExecutor where it is used to update the channels.

/**
* Default {@link ShuffleMaster} for netty and local file based shuffle implementation.
*/
public class DefaultShuffleMaster implements ShuffleMaster {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we name this class and all related classes directly NettyShuffleMaster?

Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> producedPartitions =
producer.getProducedPartitions();
Preconditions.checkArgument(checkInputReady(consumedPartition.getPartitionId(), producedPartitions),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkState would fit better here I think.


public Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> getProducedPartitions() {
return producedPartitions;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could hide this implementation detail by offering a Optional<ResultPartitionDeploymentDescriptor> getProducedPartition(IntermediateResultPartitionID intermediateResultPartitionId)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type could even be Optional<ShuffleDeploymentDescriptor> if I'm not mistaken.

/**
* Unknown {@link ShuffleDeploymentDescriptor}.
*/
public final class UnknownShuffleDeploymentDescriptor implements ShuffleDeploymentDescriptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing serial version UID

@Nonnull
private final ResourceID producerResourceId;

/** The address to use to request the remote partition. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For external shuffle service implementations, this does not necessarily be true.

@rmetzger rmetzger requested a review from tillrohrmann May 22, 2019 19:51
@azagrebin azagrebin force-pushed the FLINK-11391-az branch 2 times, most recently from 8a6079d to cdf56b1 Compare May 27, 2019 09:46
* Intermediate result partition registry to use in {@link org.apache.flink.runtime.jobmaster.JobMaster}.
*/
public interface ShuffleMaster<T extends ShuffleDescriptor> {
CompletableFuture<T> registerPartitionWithProducer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing javadoc, in particular for the return value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add class parameter description to class level javadoc, not sure method comment will add too much value to it. The method name already says basically what is happening, unless you think that some specific detail should be mentioned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think interface methods should get JavaDocs especially if they are public API. Since we want to offer external shuffle service implementations this is the case.

@Nonnegative int consumedSubpartitionIndex,
ShuffleDescriptor[] inputChannels,
ResourceID consumerLocation) {
this.consumedResultId = consumedResultId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep checkNotNull for these parameters?

* Interface for shuffle deployment descriptor of result partition resource.
*/
public interface ShuffleDescriptor extends Serializable {
ResultPartitionID getResultPartitionID();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line before this method?

* <p>It can be used e.g. to compare with partition producer {@link ResourceID} in
* {@link ProducerDescriptor} to determine producer/consumer co-location.
*/
private final ResourceID consumerLocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerLocation -> consumerResourceId? Because in ProducerDescriptor or NettyShuffleDescriptor, we also name producerResourceId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think it is better to rename it in ProducerDescriptor & NettyShuffleDescriptor

* Intermediate result partition registry to use in {@link org.apache.flink.runtime.jobmaster.JobMaster}.
*/
public interface ShuffleMaster<T extends ShuffleDescriptor> {
CompletableFuture<T> registerPartitionWithProducer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line before method?

*/
private final int dataPort;

public ProducerDescriptor(
Copy link
Contributor

@zhijiangW zhijiangW May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShuffleDescriptor remoteSdd = createRemoteWithIdAndLocation(
remoteResultPartitionId.getPartitionId(),
ResourceID.generate());
inputGate.updateInputChannel(localLocation, new PartitionInfo(new IntermediateDataSetID(), remoteSdd));
Copy link
Contributor

@zhijiangW zhijiangW Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: reuse createPartitionInfo()

ShuffleDescriptor sdd = createRemoteWithIdAndLocation(
resultPartitionId.getPartitionId(),
ResourceID.generate());
inputGate.updateInputChannel(ResourceID.generate(), new PartitionInfo(new IntermediateDataSetID(), sdd));
Copy link
Contributor

@zhijiangW zhijiangW Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could reuse createPartitionInfo(resultPartitionId, ResourceID.generate()) to create PartitionInfo here.

}

public static NettyShuffleDescriptor createRemoteWithIdAndLocation(
IntermediateResultPartitionID partitionId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add one more indentation for parameters

Collection<InputGateDeploymentDescriptor> inputGates) throws Exception {
Collection<ResultPartitionDeploymentDescriptor> resultPartitions,
Collection<InputGateDeploymentDescriptor> inputGates) throws Exception {
String errorMessage = "Network buffer pool has already been destroyed.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary changes for this method.


private TaskDeploymentDescriptor createReceiver(
NettyShuffleDescriptor shuffleDescriptor,
ResourceID location) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess location is not needed in the parameter which could be created directly inside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some tests rely on it to be the same as in some other components outside of this method.

slotProvider,
new NoRestartStrategy(),
jobVertex);
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this change?

public void testSendCancelAndReceiveFail() throws Exception {
final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();

graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @azagrebin !

I have finished the whole reviews. Sorry for intermediate suspend reviews these days.

@azagrebin
Copy link
Contributor Author

Thanks for the review @zhijiangW ! I have addressed the comments.

@azagrebin azagrebin force-pushed the FLINK-11391-az branch 5 times, most recently from 34a400a to 0b09ef0 Compare June 4, 2019 12:10
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments @azagrebin. I had a couple of additional comments. Once these are resolved I think we are good to merge this PR :-)

private final ShuffleDescriptor[] inputChannels;

/**
* {@link ResourceID} of partition consume to identify its location.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: consume -> consumer

* <p>It can be used e.g. to compare with partition producer {@link ResourceID} in
* {@link ProducerDescriptor} to determine producer/consumer co-location.
*/
private final ResourceID consumerLocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this field? I thought that the InputGateDeploymentDescriptor is being used to create an InputGate on the TaskExecutor. If this is the case, then the information about the TaskExecutor's ResourceID should already be there. No need to transmit this additional information.

}

public InputChannelDeploymentDescriptor[] getInputChannelDeploymentDescriptors() {
public ShuffleDescriptor[] getInputChannelDescriptors() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to call getShuffleDescriptors

return inputChannels;
}

public ResourceID getConsumerLocation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks as if this method + the field were introduced to make the creation of the InputGate a bit more convenient. I'm wondering whether this is not the wrong place to get this information from. I think it should come from the TaskExecutor.

new TaskDeploymentDescriptor.Offloaded<>(taskInfo.right());
}

private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors(ResourceID location) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are mixing static methods with methods. Feels a bit weird given that we us this function in createDeploymentDescriptor

* Intermediate result partition registry to use in {@link org.apache.flink.runtime.jobmaster.JobMaster}.
*/
public interface ShuffleMaster<T extends ShuffleDescriptor> {
CompletableFuture<T> registerPartitionWithProducer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think interface methods should get JavaDocs especially if they are public API. Since we want to offer external shuffle service implementations this is the case.

/**
* Tests for the {@link ResultPartitionDeploymentDescriptor}.
*/
public class ResultPartitionDeploymentDescriptorTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing extends TestLogger

createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);

assertThat(copy.getShuffleDescriptor(), instanceOf(UnknownShuffleDescriptor.class));
UnknownShuffleDescriptor copySdd = (UnknownShuffleDescriptor) copy.getShuffleDescriptor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this cast needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover, will remove

}

private TaskDeploymentDescriptor createSender(
NettyShuffleDescriptor shuffleDeploymentDescriptor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming of this parameter is not consistent with createReceiver.

/**
* Builder to mock {@link NettyShuffleDescriptor} in tests.
*/
public class NettyShuffleDescriptorBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :-)

@azagrebin azagrebin force-pushed the FLINK-11391-az branch 2 times, most recently from 6a40912 to 4820219 Compare June 4, 2019 18:10
@rmetzger rmetzger requested a review from tillrohrmann June 5, 2019 08:47
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments @azagrebin. LGTM. Merging this PR once Travis gives green light.

@azagrebin
Copy link
Contributor Author

Thanks for the review @tillrohrmann @zhijiangW !

@rmetzger rmetzger requested a review from tillrohrmann June 5, 2019 11:55
tillrohrmann pushed a commit to azagrebin/flink that referenced this pull request Jun 5, 2019
Introduce PartitionLocation in NettyShuffleDescriptor and NettyShuffleDescriptorBuilder for tests

Add ShuffleDescriptor.getResultPartitionID and isUnknown

Use NettyShuffleDescriptorBuilder in StreamNetworkBenchmarkEnvironment

Introduce ShuffleUtils.applyWithShuffleTypeCheck to isolate inout channel shuffle descriptor 'known' check and cast

This closes apache#8362.
Introduce PartitionLocation in NettyShuffleDescriptor and NettyShuffleDescriptorBuilder for tests

Add ShuffleDescriptor.getResultPartitionID and isUnknown

Use NettyShuffleDescriptorBuilder in StreamNetworkBenchmarkEnvironment

Introduce ShuffleUtils.applyWithShuffleTypeCheck to isolate inout channel shuffle descriptor 'known' check and cast

This closes apache#8362.
@tillrohrmann
Copy link
Contributor

Failing test case seems to be unrelated. Merging this PR now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants