Skip to content

Conversation

@zhijiangW
Copy link
Contributor

@zhijiangW zhijiangW commented Feb 1, 2019

What is the purpose of the change

This is a sub task for introducing ShuffleMaster component on JM side based on pluggable ShuffleManager architecture.

In the first step, we try to refactor the related information structures during deployment. So we introduce the PartitionShuffleDescriptor (PSD) to cover all the necessary info which might come from ExecutionGraph directly or registration from TM/ShuffleService.

The ShuffleDeploymentDescriptor (SDD) is also introduced for covering only shuffle specific info and SDD is created by ShuffleMaster during registerPartitionProducer.

PSD and SDD would be cached and used for generating ResultPartitionDeploymentDescriptor (RPDD), InputGateDeploymentDescriptor (IGDD), InputChannelDeploymentDescriptor (ICDD), etc during producer/consumer task deployments. The relationship between them seems PSD+SDD = RPDD/IGDD/ICDD.

In addition, we remove the ResultPartitionLocation structure to separate the ConnectionID and LocationType info. The ConnectionID can be regarded as shuffle specific info which would be covered in PSD, SDD, ICDD. And LocationType is covered only in ICDD when both producer and consumer are deployed.

Notes:

  1. The DefaultShuffleMaster here is only for interacting with the related logics, and the formal implementation would be done in a separate pr.

  2. We might not confirm the deployment sequence of scheduler, that means it might exist the scenario of deploying consumer before producer. So we can not rely on the producer's PSD/SDD to generate IGDD/ICDD of consumer, and this part is still relying on the ExecutionEdge.

Brief change log

  • Introduce the structure of PSD
  • Introduce the structure of SDD
  • Remove the ResultPartitionLocation
  • Refactor the related process of task deployment
  • Refactor the related process of scheduleOrUpdateConsumers

Verifying this change

*The related tests would be added after reviewing to confirm current refactoring make sense. *

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 1, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 48e3aac (Fri Aug 23 22:37:40 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhijiangW , I have left some comments for discussion.

PartitionShuffleDescriptor psd = PartitionShuffleDescriptor.from(targetSlot, executionId, partition, maxParallelism);

producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd));
getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(), psd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work if the complete TaskDeploymentDescriptor would be just cached as volatile field in Execution? Maybe we would not need any of three descriptors caches, what do think?

Copy link
Contributor Author

@zhijiangW zhijiangW Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From functional aspect, caching the TaskDeploymentDescriptor might also make sense. But I have other concerns:

  1. The structure of TDD is complicated and would take more memory if caching completely, such as unnecessary fields serializedJobInformation, serializedTaskInformation,etc.

  2. We might need adjust the current collection structure of producedPartitions, inputGates in TDD to map structure in order to find required PSD, SDD directly for other usages.

  3. If replacing the current three descriptors caches, we might not need the class of PartialInputChannelDeploymentDescriptor any more if I understand correctly. But I wonder there exists such scenarios that during deploying consumer execution, only some input channel descriptors are unknown. During sending partition infos we only want to send these unknown infos, so how can we distinguish them from all the cached producer's TDD? In other words, the current cached partialInputChannelDeploymentDescriptors might be only a sub collection of all cached TDDs on producer side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. (and 2) ok, I agree, let's cache it similar to what we have now or maybe we could already prepare Map<IntermediateDataSetID, PartitionInfo> on producer side?

  2. As I understand, the whole concurrent caching was done when consumer deploy and scheduleOrUpdateConsumers happened concurrently. Now, it should not be the case and all state transitions should happen on main thread of Job Master. So any cache maps could be just a HashMap.

As far as I see, there are 2 cases:

  • deploy the consumer: here we rely on partition.isConsumable flag to decide whether the input channel SDD/Location is known or not at the moment (scheduleOrUpdateConsumers has happened or not).
  • scheduleOrUpdateConsumers:
    • here first of all, partition.isConsumable flag is set to true before;
    • In CREATED and SCHEDULED state we do not have to do anything because TDD has not been created yet and when TDD is created in deploy(), it will use already partition.isConsumable = true to populate known locations;
    • then if some consumers are DEPLOYING or RUNNING, their TDDs have been already sent with unknown SDD/Locations, so they have to be updated by sendUpdatePartitionInfoRpcCall using cached PartitionInfos, the update message will be applied in Task after deploy message.

Currently in master, we call sendPartitionInfos to resolve previous race conditions in several places. Now we basically do not need it along with partialInputChannelDeploymentDescriptors.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with it. We can refactor this process simpler than before. Remove the class of PartialInputChannelDeploymentDescriptor and cache PartitionInfo directly for the case of DEPLOYING or RUNNING status of consumer to send the update during scheduleOrUpdateConsumers.

private final LocationType locationType;

/** The connection to use to request the remote partition. */
private final Optional<ConnectionID> connectionId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we would just have here ShuffleDeploymentDescriptor instead of ConnectionID. SDD also contains ConnectionID. If LocationType.Unknown is unknown, SDD field could be just special singleton implementation of ShuffleDeploymentDescriptor -> UnknownShuffleDeploymentDescriptor, or is it coming later?
Also, in ResultPartitionDeploymentDescriptor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also considered using ShuffleDeploymentDescriptor here to replace ConnectionID before. But there are two concerns in implementation:

  1. In eager schedule mode when receiving all required slots, we might not assume the deployment sequence must be strict with topology sequence. That means the consumer execution deployment might be earlier than the producer execution. So in the process of InputChannelDeploymentDescriptor#fromEdges, we might not get cached SDD directly from producer execution. But we can generate ConnectionID based on other infos. Otherwise we must confirm the deployment sequence is from producer to consumer or generate producer's SDD during deploying consumer in InputChannelDeploymentDescriptor#fromEdges.

  2. I thought of introducing UnknownShuffleDeploymentDescriptor before, but from semantic aspect it is a bit redundant with LocationType.Unknown. In addition, it seems no specific usages like instanceof UnknownShuffleDeploymentDescriptor in other processes. The SDD should be generated by ShuffleMaster by design, but the special UnknownShuffleDeploymentDescriptor is generated only in the case of LocationType.Unknown which is not via ShuffleMaster.

Copy link
Contributor

@azagrebin azagrebin Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think I see the problem now, thanks for explanation. I will put my thoughts in other order :)

During the design, I thought ShuffleDeploymentDescriptor was supposed to contain shuffle specific info generated by ShuffleMaster as a central point and used eventually by ShuffleService in producer and consumer Task to setup readers/writers.

The example could be some partition identification or connection inside external shuffle system. The existing connection id/location is also an example of it for the existing netty stack, but might be not relevant for other shuffle systems.

For example, let's say the partition is stored remotely (not in producer), the batch job is restored and some the partition is finished, we do not even need to deploy the producer, just connect the consumer to the existing 'done' external partition, then the existing connection id does not make sense, the consumer needs some kind of internal shuffle id of the partition.

That is why I thought: PSD(ProducerResourceId,ProducerConnection,...) -> ShuffleMaster -> SDD(Internal) -> ICDD(SDD) -> Task -> ICDD,ConsumerResourceId -> ShuffleService -> InputGate -> read records.

I think even ShuffleService itself can decide what to do with ProducerResourceId/ConsumerResourceId and calculate internally LocationType in case of existing netty. For other shuffle services, LocationType might be not relevant (like external partition), then maybe ICDD=SDD=PartitionInfo and we could leave only one of them, not sure.

I thought of UnknownShuffleDeploymentDescriptor as a replacement of LocationType.Unknown\ConnectionId=null based on the above arguments. It is just a singleton stub to signal that SDD will be updated later with the sendUpdatePartitionInfoRpcCall in case of lazy scheduling. True, it is not generated by ShuffleMaster, what could be an alternative for this approach?

In case of eager deployment (lazyScheduling=false), currently, we can already deploy the consumer when the slot is assigned to the producer but its deployment has not started yet and we planned to generate the SDD during producer deployment. If we agree on 2., it seems that we need SDD for consumer to consume and it has to be known.

Thinking more about ShuffleMaster interface, depending on its nature, it might be an asynchronous API like registering and talking to an external system. This means that ideally its partition register method should return a CompletableFuture<SDD>.

Then the producer execution life cycle should be: created -> scheduled -> slot assigned -> register partition (get and cache SDD) -> deploying (generate TDD with previously acquired SDD). Everything happening on the main thread of Job Master. The consumer has to be deployed not after producer slot is assigned but after partition is registered in eager scheduling. In lazy scheduling, we have the sendUpdatePartitionInfoRpcCall to send SDD later.

I would suggest we do the partitions registering and SDD caching in allocateAndAssignSlotForExecution, right after slot assignment (needs rebase on the latest master):

return FutureUtils.handleAsyncIfNotDone(..tryAssignResource..)
  .thenComposeAsync(
    slot -> {..ShuffleMaster.register(PSD), cache SDDs..}, 
    mainThreadExecutor);

Just maybe with refactoring the steps into different functions :)

Copy link
Contributor Author

@zhijiangW zhijiangW Feb 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the above comments can be summarized into three points:

  1. LocationType can be decided by ShuffleService by comparing ResourceID between producer and consumer. And the consumerResourceID could be covered in IGDD. Regarding with ICDD=SDD=PartitionInfo, I only concern the only different field IntermediateDataSetID existing in PartitionInfo for finding the proper SingleInputGate to update partition info on Task side. I would think through this step by step.

  2. registerPartition and cachePartition are triggered in allocateAndAssignSlotForExecution and return future, then we can confirm during deploying the consumer in eager mode, we can always get corresponding registered/cached SDD of producers.

  3. UnknownShuffleDeploymentDescriptor should also be introduced for lazy deployment mode to indicate there would be updated SDD later.

I agree with the above points, especially for the point 2 which solves my previous concern naturally. :)

/**
* Deployment descriptor for shuffle specific information.
*/
public class ShuffleDeploymentDescriptor implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eventually it needs to be an interface, probably an empty one. This one could stay an implementation for the default shuffle master. Also, special UnknownShuffleDeploymentDescriptor could extend the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might need an explicit method of getConnectionId in the interface if to do? Because the ICDD might either see UnknownShuffleDeploymentDescriptor or KnownShuffleDeploymentDescriptor and it should provide the way of getting ConnectionID if LocationType==Remote.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, for existing netty shuffle, it has to have more methods. Internally, I would suggest, the future NettyShuffleService will cast SDD to KnownNettySDD if it is not an UnknownSDD:

interface SDD {  }

enum UnknownSDD implements SDD { INSTANCE; } // special singleton stub

class KnownNettySDD implements SDD { + ProducerResourceId, ProducerConnection, etc }

// later:
class AnyOtherSDD implements SDD { other specific shuffle identification }

@zhijiangW
Copy link
Contributor Author

@azagrebin , thanks for your reviews! :)
I was also confused a bit by the points you mentioned during implementation. I left some thoughts if I understood your suggestions correctly.

@pnowojski
Copy link
Contributor

@azagrebin @zhijiangW is this PR still valid? Or was it subsumed by something else?

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 23, 2019

CI report:

@azagrebin
Copy link
Contributor

@pnowojski
AFAIK, it was actually subsumed by #8362
sorry for confusion
@zhijiangW please, reopen if you have other thoughts

@azagrebin azagrebin closed this Aug 23, 2019
@zhijiangW
Copy link
Contributor Author

No problem, it should be closed and I forgot it.

@zhijiangW zhijiangW deleted the FLINK-11391 branch June 10, 2020 10:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants