[Host Ir] refactor and cleanup lowering and segmentation#4145
[Host Ir] refactor and cleanup lowering and segmentation#4145samnordmann merged 16 commits intomainfrom
Conversation
|
Review updated until commit 684118f Description
Changes walkthrough 📝
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
|
!test |
1 similar comment
|
!test |
csrc/host_ir/lower.h
Outdated
| static bool ShouldMergeSegmentedGroups( | ||
| SegmentedGroup* group1, | ||
| SegmentedGroup* group2); |
There was a problem hiding this comment.
This function could not been exposed if it wasn't used intests/cpp/test_resharding.cpp. We could alternatively not expose it and just duplicate its implementation (which is short and simple) in the test.
|
!test |
| } | ||
| } | ||
| return true; | ||
| if (options_.custom_should_merge_groups != nullptr) { |
There was a problem hiding this comment.
There's also segment_set that serves a similar purpose. But it's hard to tell why it's not sufficient without reviewing the following PRs.
There was a problem hiding this comment.
You are right that both mechanisms serve the same purpose. Btw, this was also true before this PR with the only_segment_resharding_exprs option. For the time being, besides the fact that passing a function is much closer to the existing code than moving to segmenter sets, I also find this way more lightweight and usable in this context, more precisely, it saves me
- a pass for adding the sets
- adding an option in the segmenter to only segment according to the segmenter set
- a pass for removing the segmenter sets
| } | ||
| } | ||
| } | ||
| hic->resetTopLevelExprs(new_top_level_exprs); |
There was a problem hiding this comment.
Can you clarify why we need reseting? In general, I tend to avoid APIs that reset. They make it hard to reason about life cycles of things. I might even create a new HostIrContainer in order to not having to reset.
There was a problem hiding this comment.
We need resetting here because we want to replace an expression with its lowered version, i.e. replace
Expr1 Expr2 Expr3
with
Expr1 Expr2' Expr3
There are other examples in forthcoming PR where we instead want to move expression inside a for loop, i.e., replace
Expr1
Expr2
Expr3
To
Expr1
For Loop { Expr2}
Expr3
resetting the top level expression seemed like a simple ok way to do it, but I'm open to suggestions.
reason about life cycles of things
What do you mean exactly? would it be ok if we free the deleted expressions ?
There was a problem hiding this comment.
Instead of reseting, can we add the correct top-level expressions in one go? The high-level logic seems to be:
for each group in topo order:
if group leads to a kernel:
create a PostOnStream and put it in hir's top-level
else:
for each expression in topo order:
if the expression needs lowering:
lower it to host IR and append the lowered into hir
else:
append the expression directly to hir
There was a problem hiding this comment.
In this PR we are separating out the "segmentation step" from "lowering a resharding expr into comms". So, with this patch there is
- a first pass in the topological order where we create the
top_level_expression_, mapping exactly the segments of the SegmentedFusion - a second lowering pass that goes through the expression and maybe lower them to (allocation + communication + wait)
In a coming PR this second pass will be moved to a different file and become a proper preseg_pass.
Besides being IMO cleaner and more readable this way, it also allows us to control the preseg_pass and their order of execution. This is needed for stream lowering.
For this reason, in this PR, I just separated in two loop what used to be done in "one go" as you are suggesting here. Therefore, I cannot do as you suggest
There was a problem hiding this comment.
In a coming PR this second pass will be moved to a different file and become a proper preseg_pass.
Don't bother doing this. Host IR lowering is after segmentation, so I'd rather it being kept in csrc/host_ir not csrc/preseg_passes.
There was a problem hiding this comment.
I understood the need to change host IR in a container. Many (if not all) host IR optimizations will be implemented as "passes".
I don't think "reset" is the right API. Most optimizations will be "surgical", e.g., fusing two adjacent for loops and adding stream assignments for overlapping. Such optimizations would "reset" the container with a list of mostly identical host IR besides those being changed. This makes it hard to figure out what's actually changed in the code. I think we'll end up building something like kir::IrVisitor in the future, which is of course not required in this PR.
There was a problem hiding this comment.
In a coming PR this second pass will be moved to a different file and become a proper preseg_pass.
Don't bother doing this. Host IR lowering is after segmentation, so I'd rather it being kept in csrc/host_ir not csrc/preseg_passes.
Why is it a problem? It is very useful to have it in a separate pass, in order to:
- begin able to enable/disable the pass with the Optimization Pass Guard
- get useful debug print with the NVFUSER_DUMP=preseg_pass option
- have the code more factored and structured, and treat the passes uniformly. For example, it took me many trial-and-error to firgure out what should be the right order of the passes.
Host IR lowering is after segmentation
It is debatable, it depends on which segmentation, right? In the classical sense, it is still a preseg pass; the only segmentation that happens before is the hostIr segmentation.
If the naming or file organzation is an issue, should I create another class of Optimization pass for HIR passes happening after HIR segmentation?
There was a problem hiding this comment.
which segmentation
I understood MultiDeviceExecutor runs two segmentations. FusionExecutorCache is different. It runs only one and host IR lowering is after segmentation (cf. https://docs.google.com/document/d/1QrRmN27XsVjZu7QrZWJJyRENO50878LC3MvlQY1cRYA/edit?tab=t.0)
should I create another class of Optimization pass for HIR passes
Yes and in a different folder, e.g., csrc/host_ir. This is similar to device_lower/passes. Optimization passes can be as simple as a function, and I'd probably start with just that. When you need to add more features like guards for these passes, you can make them classes.
| } | ||
| } | ||
| } | ||
| hic->resetTopLevelExprs(new_top_level_exprs); |
There was a problem hiding this comment.
Instead of reseting, can we add the correct top-level expressions in one go? The high-level logic seems to be:
for each group in topo order:
if group leads to a kernel:
create a PostOnStream and put it in hir's top-level
else:
for each expression in topo order:
if the expression needs lowering:
lower it to host IR and append the lowered into hir
else:
append the expression directly to hir
|
!test |
b3ce2b4 to
4964680
Compare
| SegmentedGroup* group) { | ||
| FUSER_PERF_SCOPE("SegmentCandidateFinder::deriveSchedulerType"); | ||
| if (options_.only_segment_resharding_exprs) { | ||
| if (options_.custom_should_merge_groups != nullptr) { |
There was a problem hiding this comment.
It is nullptr by default, and if it is, we fallback to the traditional single device segmenter using the schedulers
does it answer your question?
There was a problem hiding this comment.
Sorry I misunderstood your question. I guess this one is more for @wujingyue -- here I'm only reproducing the previous behavior, but replacing the option "only_segment_resharding_exprs" with a more agnostic one.
The idea of returning None here has something to do with how FusionExecutorCache decide to lower segments. However, this is not used in MultiDeviceExecutor, so I am not so familiar about this part
There was a problem hiding this comment.
I think this is where the extension of the custom "should merge" function feels more like a hack. The overall design of the segmenter is tightly coupled with scheduling, so it is assumed to have this scheduler type. However, what we are finding is that sometimes we also want to use this without scheduling.
This is a good learning for when we redesign the segmenter. For now, can you please leave a note? Something like:
The segmemter should ideally be redesigned to be more flexible and decoupled from the schedulers, but for now, we just return `SchedulerType::None` as it is not relevant when the segmenter is used with a custom should-merge function.
There was a problem hiding this comment.
Yes, agreed. For the record, this hack has been present for quite a long time now. Let me add the comment as you suggest
There was a problem hiding this comment.
The overall design of the segmenter is tightly coupled with scheduling, so it is assumed to have this scheduler type
That's correct, @naoyam. FWIW, this flag is only turned on for MultiDeviceExecutor. In FusionExecutorCache, schedulers test isResharding as you suggested.
| } | ||
|
|
||
| bool HostIrLower::isLowerableAsStandaloneHostOp(Expr* expr) { | ||
| return isResharding(expr); |
There was a problem hiding this comment.
Are you expecting this to change in the future to something other than return isResharding(expr)? I'm just wondering, why not just use isResharding like before?
|
!test |
|
!test |
This PR belongs to a series of stacked PRs: 1. **=> You are here: #4144** 2. #4145 3. #4146 4. #4147 # What - Support for aliases in HostIrContainer. When a Tensor tv1 is marked as being the alias of tv0, then, at runtime, tv0's concrete data/buffer will be used for the op. It is a way to reuse buffers that have been allocated elsewhere within the TensorView's SSA paradigm. Chained aliasing (tv2-->tv1-->tv0) are supported. - Fix preallocated outputs in HostIrEvaluator # Why It is necessary for stream parallelization, where typically we allocate the full output buffer but each stream writes to a slice of this buffer. # How The aliasing is stored in the HostIrContainer through a map. At the HostIrEvaluator level, instead of operating directly on the ExprEvaluator to write/read concrete data, we first apply the alias indirection
This PR belongs to a series of stacked PRs: 1. #4144 2. #4145 3. **=> You are here:** #4146 4. #4147 Add support for `LoadStoreOp`, `BinaryOp`, `ReductionOp`, including support for pre-allocated output, which is not provided by ExprEvaluator. --------- Co-authored-by: Jingyue Wu <wujingyue@gmail.com>
This PR belongs to a series of stacked PRs: 1. #4144 2. #4145 3. #4146 4. #4301 5. **=> You are here:** #4147 # What Implement a proper lowering for handling ParallelType::Stream. This PR has the following restrictions: - Single device fusion - No split/merge of Stream axis We add to Hir lowering a new pass that reads the hir container's top level expressions, reads the consumer's stream parallelization and create For Loop with stream management and sync for expressing the stream parallelization. Basic logic for merging For-Loop are written. Let me explain through some examples that can be found in the PR. We suggest to run those examples as follows: ``` NVFUSER_DUMP=host_ir test_host_ir --gtest_filter=* ``` ## Single expr and for-loop Look at `MultiDeviceExecutorLowerStreamTest.SingleSetOp` simple scenario: ``` TensorView* tv0 = makeContigTensor(2); TensorView* tv1 = set(tv0); fusion->addInput(tv0); fusion->addOutput(tv1); tv1->axis(0)->parallelize(ParallelType::Stream); ``` the dumped generated Host Ir program is: ``` %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T1_g_float[iStreamIdx2{i0}, iS3{i2}]) : T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false) FOR StreamIdx in iStreamIdx2{i0}: GetCurrentStream into Stream 0 SetCurrentStream to Stream ( StreamIdx % numberOfStreams ) Synchronize Stream 0 T2_l_float[iS4{i2}] = HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx ) T3_l_float[iS5{i2}] = HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx ) T3_l_float[iS5{i2}] = Set( T2_l_float[iS4{i2}], cache_op=Streaming ) SetCurrentStream to Stream 0 Synchronize Stream ( StreamIdx % numberOfStreams ) } // %HostIrContainer ``` We can see that the expr, here the "Set", gets embedded into a For Loop. Let us analyze further: - outside the for loop, we allocate the global output buffer. - The start of the for loop body does the new stream assignment and sync of that stream to the user stream - Then, we "Select" (aka slice) through `HirAliasSelect` into the input and output - The "Set" operation is executed on the "selected" I/O. Note that the output is an alias to the output's slice. - At the end of the for loop, we reset to the user's stream (I mean, the currently selected stream before entering the program) and sync the user's stream with the running stream. ## Merging for loops To avoid unnecessary synchronization across streams, it is important to be able to fuse the stream for-loop. This is exercised by the test `MultiDeviceExecutorLowerStreamTest.TwoSetOps`: ``` TensorView* tv0 = makeContigTensor(2); TensorView* tv1 = set(tv0); TensorView* tv2 = set(tv1); fusion->addInput(tv0); fusion->addOutput(tv2); tv1->axis(0)->parallelize(ParallelType::Stream); tv2->axis(0)->parallelize(ParallelType::Stream); ``` dump: ``` %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T2_g_float[iStreamIdx4{i0}, iS5{i2}]) : T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false) T2_g_float[iStreamIdx4{i0}, iS5{i2}] = ALLOCATE(buffer=T2_g_float[iStreamIdx4{i0}, iS5{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false) FOR StreamIdx in iStreamIdx2{i0}: GetCurrentStream into Stream 0 SetCurrentStream to Stream ( StreamIdx % numberOfStreams ) Synchronize Stream 0 T3_l_float[iS6{i2}] = HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx ) T4_l_float[iS7{i2}] = HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx ) T4_l_float[iS7{i2}] = Set( T3_l_float[iS6{i2}], cache_op=Streaming ) T5_l_float[iS8{i2}] = HirAliasSelect( T2_g_float[iStreamIdx4{i0}, iS5{i2}], axis = iStreamIdx4{i0}, index = StreamIdx ) T5_l_float[iS8{i2}] = Set( T4_l_float[iS7{i2}], cache_op=Streaming ) SetCurrentStream to Stream 0 Synchronize Stream ( StreamIdx % numberOfStreams ) } // %HostIrContainer ``` We observe that the For-loop are indeed merged. **Possible future optimization:** the allocation of the intermediate buffer could be only of length `numberOfStreams` ## separating for loops We also need to be able to separate and create new for loops if necessary, as exercised in `ThreeSetOpsWithDisjointsForLoops`, which considers the Fusion: ``` TensorView* tv0 = makeContigTensor(2); TensorView* tv1 = set(tv0); TensorView* tv2 = set(tv1); TensorView* tv3 = set(tv2); fusion->addInput(tv0); fusion->addOutput(tv3); tv1->axis(0)->parallelize(ParallelType::Stream); tv3->axis(0)->parallelize(ParallelType::Stream); ``` Here, tv2 is not stream-parallelized so it should be be produced in a for-loop. Dump: ``` %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T3_g_float[iStreamIdx6{i0}, iS7{i2}]) : T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false) FOR StreamIdx in iStreamIdx2{i0}: GetCurrentStream into Stream 0 SetCurrentStream to Stream ( StreamIdx % numberOfStreams ) Synchronize Stream 0 T4_l_float[iS8{i2}] = HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx ) T5_l_float[iS9{i2}] = HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx ) T5_l_float[iS9{i2}] = Set( T4_l_float[iS8{i2}], cache_op=Streaming ) SetCurrentStream to Stream 0 Synchronize Stream ( StreamIdx % numberOfStreams ) T2_g_float[iS4{i0}, iS5{i2}] = Set( T1_g_float[iStreamIdx2{i0}, iS3{i2}], cache_op=Streaming ) T3_g_float[iStreamIdx6{i0}, iS7{i2}] = ALLOCATE(buffer=T3_g_float[iStreamIdx6{i0}, iS7{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false) FOR StreamIdx in iStreamIdx6{i0}: GetCurrentStream into Stream 2 SetCurrentStream to Stream ( StreamIdx % numberOfStreams ) Synchronize Stream 2 T6_l_float[iS10{i2}] = HirAliasSelect( T2_g_float[iS4{i0}, iS5{i2}], axis = iS4{i0}, index = StreamIdx ) T7_l_float[iS11{i2}] = HirAliasSelect( T3_g_float[iStreamIdx6{i0}, iS7{i2}], axis = iStreamIdx6{i0}, index = StreamIdx ) T7_l_float[iS11{i2}] = Set( T6_l_float[iS10{i2}], cache_op=Streaming ) SetCurrentStream to Stream 2 Synchronize Stream ( StreamIdx % numberOfStreams ) } // %HostIrContainer ``` --------- Co-authored-by: Jacob Hinkle <1454944+jacobhinkle@users.noreply.github.com> Co-authored-by: Jingyue Wu <wujingyue@gmail.com> Co-authored-by: Ryan Spring <rspring@nvidia.com> Co-authored-by: Liqiang Lu <116412316+liqiangxl@users.noreply.github.com> Co-authored-by: jjsjann123 <jiej@nvidia.com> Co-authored-by: Naoya Maruyama <naoyam@users.noreply.github.com> Co-authored-by: Gao, Xiang <qasdfgtyuiop@gmail.com> Co-authored-by: Priya Mishra <52657555+Priya2698@users.noreply.github.com> Co-authored-by: Christian Sarofeen <csarofeen@nvidia.com> Co-authored-by: Nick Sarkauskas <nsarkauskas@nvidia.com> Co-authored-by: Wang, Xiao <24860335+xwang233@users.noreply.github.com> Co-authored-by: root <26priya11@gmail.com>
This PR belongs to a series of stacked PRs:
What
SegmentCandidateFinderOptions:: only_segment_resharding_exprsby a pointer to a predicate function. This allow the user of the segmented to (optionally) provide a custom function to be used to decide whether two given groups should be merged. This achieves better separation of responsibility: with this option, the segmented is only responsible of applying the segmentation algorithm, but does not embed the specific rule for merging group which depends on the application. The specific rule in our context is decided by the Hir lowering. Imo this refactoring should ideally go further and make the segmented a more abstract class that would be used in both Host Ir and FusionExecutorCache lowering but only changing the newly introduced function pointer.Why
that's a preliminary refactoring useful for more advanced Host Ir Lowering, notably ParallelType::Stream lowering