Skip to content

Conversation

@zhuzhurk
Copy link
Contributor

@zhuzhurk zhuzhurk commented Dec 6, 2018

…locks in LAZY_FROM_SOURCES scheduling when resources are limited

What is the purpose of the change

This PR add a job config InputDependencyConstraint, which helps to avoid resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited, as described in FLINK-10945.

Brief change log

  • Add InputDependencyConstraint to ExecutionConfig
  • Adjust isConsumable interface in IntermediateResultPartition to fit for the data actual consumable definition
  • Change current execution lazy scheduling logic(in Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the InputDependencyConstraint is satisfied(an interface ExecutionVertex.checkInputDependencyConstraints is added to serve this purpose).

Verifying this change

This change added tests and can be verified as follows:

  • Added IntermediateResultPartitionTest to validate IntermediateResultPartition changes
  • Added ExecutionVertexInputConstraintTest to validate the constraint check logic in ExecutionVertex

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

…locks for finite stream jobs when resources are limited
Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @zhuzhurk !
I have left some comments.

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhuzhurk ! I also added couple of comments for tests.

return null;
},
executor);
if (consumerVertex.checkInputDependencyConstraints()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the TODO comment belongs to the first line of the new scheduleConsumer method

Copy link
Contributor Author

@zhuzhurk zhuzhurk Dec 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, the TODO comment is related to the "consumerState == CREATED" section in scheduleOrUpdateConsumers, which invokes cachePartitionInfo first and then schedules the vertex. The cachePartitionInfo action is needed to avoid deployment race, at the cost of redundant partition infos to update to task, which is the concern as described in the TODO comment.

So far the redundant partition it's not a big problem. But I think we can optimize it later. One possible solution in my mind is to remove known partition infos from the cache when creating InputChannelDeploymentDescriptor.

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments @zhuzhurk

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @zhuzhurk. The changes look very good. I had some minor comments which we could address before merging.

public boolean checkInputDependencyConstraints() {
if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) {
// InputDependencyConstraint == ANY
return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having moved isInputConsumable into IntermediateResult allows to get rid of the indirection of the IntStream.

* @return whether the input constraint is satisfied
*/
public boolean checkInputDependencyConstraints() {
if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the InputDependencyConstraint rather be a value of the ExecutionJobVertex than the ExecutionGraph? I guess it should be configurable for each operator individually.

Copy link
Contributor Author

@zhuzhurk zhuzhurk Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've move InputDependencyConstraint to JobVertex. And the job wide default value can be configured in ExecutionConfig. But I haven't make it configurable through DataSet/DataStream API yet.

I agree we should support the constraint configurable for each operator. But I'm not quite sure whether we should support it with DataSet API or later for the stream/batch unified StreamGraph/Transformation API? Could you share your suggestion?

In our production experience, a job-wide configured input constraint satisfies most users, together with BATCH_FORCED execution mode, to ensure a batch job can finish with limited resources.


if (partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
partition.markSomePipelinedDataProduced();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be generalized by having a partition#markDataProduced and then calling it fall intermediate results independent of the type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


try {
executionGraph.setInputDependencyConstraint(
jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that this was the easiest way how to get the InputDependencyConstraint into the ExecutionGraph but I think it should be part of the JobVertex, because it is not a global setting but rather controls how each vertex is scheduled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I've move InputDependencyConstraint to JobVertex.

…gurable through API yet)

2.  Refine input consumable checks
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fixup @zhuzhurk. My last comment is whether markDataProduced should not be always called in the scheduleOrUpdate method independent of the result type. Once we have resolved this comment, the PR is ready to be merged.

@tillrohrmann
Copy link
Contributor

Thanks for addressing my comments @zhuzhurk. Looks really good now. Merging this PR now.

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Jan 16, 2019
…locks for finite stream jobs when resources are limited

This commit adds a job config InputDependencyConstraint, which helps to avoid
resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited.

The InputDependencyConstraint controls across multiple inputs when consumers are
scheduled. Currently it supports ANY and ALL. ANY means that any input intermediate
result partition must be consumable and ALL means that all input intermediate result
partitions (from all inputs) need to be consumable in order to schedule the consumer task.

This closes apache#7255.
@asfgit asfgit closed this in 171a3b1 Jan 16, 2019
@zhuzhurk
Copy link
Contributor Author

Thanks Andrey(@azagrebin) and Till(@tillrohrmann) for the reviewing.

tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
…locks for finite stream jobs when resources are limited

This commit adds a job config InputDependencyConstraint, which helps to avoid
resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited.

The InputDependencyConstraint controls across multiple inputs when consumers are
scheduled. Currently it supports ANY and ALL. ANY means that any input intermediate
result partition must be consumable and ALL means that all input intermediate result
partitions (from all inputs) need to be consumable in order to schedule the consumer task.

This closes apache#7255.
@zhuzhurk zhuzhurk deleted the zhuzh_master branch April 17, 2019 07:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants