-
Notifications
You must be signed in to change notification settings - Fork 331
SAMZA-1860: Modularize Join input validation in ExecutionPlanner #637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1860: Modularize Join input validation in ExecutionPlanner #637
Conversation
This change breaks down the validation of partition counts of input and
intermediate streams participating in Join operations into 3 separate steps:
1. Grouping InputOperatorSpecs by the JoinOperatorSpecs of the Join
operations they participate in
2. Replacing InputOperatorSpecs with their corresponding StreamEdges
3. Verifying/Inferring partition counts of input/intermediate streams
|
@bharathkk @vjagadish1989 Can you take a look at this? |
|
cc @nickpan47, since this affects the ExecutionPlanner. |
vjagadish1989
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, Ahmad! The planner is one of the complex pieces of the Samza code-base, thanks much for refactoring it!
|
|
||
| // Verify agreement between joined input/intermediate streams. | ||
| // This may involve setting partition counts of intermediate stream edges. | ||
| joinedStreamsGroups.forEach(ExecutionPlanner::validateJoinedStreamsGroupPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like validate is also "assigning" partition counts.. Would it at all be cleaner to separate out computation of partition-counts from their validation? As an example, computing of partition-counts depends on the order in which we process the StreamEdgeGroups while validation may not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, validate is also assigning partition counts. It's a little difficult to separate both operations at the moment. For groups containing stream edges with known partition counts only, we just need verification — this is the easy case. On the other hand, stream edge groups with a mix of set/unset partitions require a mix of verification and assignment.
I thought of deciding whether to do verification vs assignment depending on the JoinedStreamsGroupCategory of a JoinedStreamsGroup. Problem with that is:
-
For a group with a mix of set/unset partitions, I could very easily need to do both verification and assignment within the same group, e.g. {e1 (8), e2 (?), e3 (8)}.
-
JoinedStreamsGroupCategoryis not really reliable once we start setting partition counts (which is another reason whyStreamEdges are better off being immutable). For instance, by the time we process group delete it sorry #2 of {e1 (8), e2 (?)} and {e2 (?), e3 (8)}, it will only require verification even though its (stale) category will beSOME_PARTITION_COUNT_SET.
I think we can just change the verb from validate to something else that conveys the possibility of mutation. I'll try to come up with something but I'm also open to suggestions.
| * processing them in the above order (most constrained first) is guaranteed to | ||
| * yield correct assignment of partition counts of e3 and e4 in a single scan. | ||
| */ | ||
| Collections.sort(joinedStreamsGroups, Comparator.comparing(JoinedStreamsGroup::getCategory)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessarily in scope for this PR, Do you have a sense for what would it take to make "StreamEdge" immutable?
for eg: it seems like the setPartitionCount method on the StreamEdge could be replaced with a map of StreamEdge -> partitionCount
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came to the same conclusion actually — making StreamEdge immutable can greatly improve the ExecutionPlanner. I didn't want to do it in this series of PRs to avoid a scoop creep though.
I think one idea we can explore is making partition count a ctor param and a readonly property of a StreamEdge. This would require deferring the creation of any StreamEdge until its partition count is known, which is probably not going to be difficult after this PR.
I'll take note of this and send a follow-up PR later on.
| } else { | ||
| category = JoinedStreamsGroupCategory.SOME_PARTITION_COUNT_SET; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The notion of a "category" looks like a detail of the JoinedStreamsGroup, which can be inferred from the other params. What do you think about moving the logic that determines "category" to the constructor of JoinedStreamsGroup ?
A nice property is that we could avoid inconsistent object states. For eg: with the current constructor: JoinedStreamsGroup(groupId, streamEdges, category), one could create an instance of JoinedStreamsGroup such that its "streamEdges" and "category" contradict each other
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did consider this and I would have really liked to do so. However,
-
Deciding the category in
JoinedStreamsGrouprequires one more (redundant) iteration over theStreamEdges. -
More importantly, there is no way to avoid inconsistent object states in
JoinedStreamsGroupbecauseStreamEdges are mutable. In fact, we already throw everyJoinedStreamsGroupwith intermediate streams into this inconsistent state once we start setting partition counts. This made me less inclined to incur the overhead of the extra iteration in [doc] remove samza-serializers maven dependency #1.
Together, these 2 points made me prefer to keep JoinedStreamsGroup as a passive data object that contains zero logic. I thought this would help set readers' expectations that this object is just a way of organizing data w/o maintaining any invariants.
| */ | ||
| private static <T> void traverse(T vertex, Consumer<T> visitor, Function<T, Iterable<? extends T>> getNextVertexes) { | ||
| visitor.accept(vertex); | ||
| for (T nextVertex : getNextVertexes.apply(vertex)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Could you have cycles anywhere in the traversal? If so, should this method guard against that? Instead, if the visitors are expected to track and avoid cycles, would be worth documenting it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there could be cycles in the OperatorSpecGraph, and if there could be then we never handled them.
A visitor's responsibility is strictly dictated by getNextVertexes, and since both are user-supplied, it's all up to the user. There are no general requirements on visitors.
samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
Outdated
Show resolved
Hide resolved
bharathkk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for putting this together. It is definitely looking much better.
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
Outdated
Show resolved
Hide resolved
| * A utility class that encapsulates the logic for traversing an {@link OperatorSpecGraph} and building | ||
| * associations between related {@link OperatorSpec}s. | ||
| */ | ||
| /* package private */ class OperatorSpecGraphAnalyzer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are my thoughts on this class. I feel we don't have a pressing need for it to be generified yet.
Can we start with a simple helper class that does traversal and returns a mapping in one go?
By that,
- we are still isolating the traversal logic.
- It simplifies the review
- It simplifies testing
We can always refactor to extract the traversal part if we plan to introduce more visitors.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bharathkk : We'll likely introduce more visitors for computing partition-counts for StreamTableJoin and side-inputs. Once we have those follow-up PRs, we can decide if the current implementation is overly general. If that is indeed the case, we can certainly revisit it.
Until then, it's probably efficient to leave this PR in its current state. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're only doing stream-stream join then I agree. The reason I wrote it this way though is because I have another PR that will add one more visitor in order to support stream-table joins. I wanted to lay the grounds for the upcoming change since I have already finished coding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bharathkk @vjagadish1989 Would you prefer I send you the follow-up PRs now or is this PR good to go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this PR should be good to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahmedahamid sorry to be late to the party. One comment for your future PR: OperatorSpecGraph is only available for high-level APIs. In the effort to unify the runtime support for both high-level and low-level APIs, ExecutionPlanner and the corresponding JobGraph/JobNode classes now have only access to ApplicationDescriptorImpl. Any need to traverse the graph should be starting from ApplicationDescriptorImpl.getInputOperators() now (see PR #642)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries. All the changes I have been making to the ExecutionPlanner only rely on InputOperatorSpecs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approved
|
merged and submitted! |
| Set<StreamEdge> streamEdges = new HashSet<>(); | ||
|
|
||
| for (InputOperatorSpec inputOpSpec : inputOpSpecs) { | ||
| StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(inputOpSpec.getStreamId(), streamConfig)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the record, I have a strong concern here that we are potentially modifying the StreamEdge in jobGraph in a method called validateJoinInputStreamPartitions(). This also breaks the abstraction that createJobGraph() should already created the StreamEdges and JobNodes needed based on traversal of the operator DAG. Are we saying that even after createJobGraph() method is called, JobGraph can be missing some StreamEdges? That should not be the case, since each StreamEdge should be corresponding to an explicit partitionBy() operator, or explicitly defined as input/output streams. Let's sync up on the purpose and use case of this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's take this comment to a separate PR. I realized that this is an existing pattern in ExecutionPlanner/JobGraph. Ideally, get/create streamEdge from JobGraph should have separate methods, one should be read-only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly agree. This call site is actually using getOrCreateStreamEdge to retrieve existing StreamEdges not create new ones.
This change breaks down the validation of partition counts of input and
intermediate streams participating in Join operations into 3 separate steps:
InputOperatorSpecs by theJoinOperatorSpecs of the Join operations they participate inInputOperatorSpecs with their correspondingStreamEdgesThis change covers stream-stream Joins only.