This PR belongs to a series of stacked PRs:
1. #4144
2. #4145
3. #4146
4. #4301
5. **=> You are here:** #4147
# What
Implement a proper lowering for handling ParallelType::Stream. This PR
has the following restrictions:
- Single device fusion
- No split/merge of Stream axis
We add to Hir lowering a new pass that reads the hir container's top
level expressions, reads the consumer's stream parallelization and
create For Loop with stream management and sync for expressing the
stream parallelization. Basic logic for merging For-Loop are written.
Let me explain through some examples that can be found in the PR. We
suggest to run those examples as follows:
```
NVFUSER_DUMP=host_ir test_host_ir --gtest_filter=*
```
## Single expr and for-loop
Look at `MultiDeviceExecutorLowerStreamTest.SingleSetOp` simple
scenario:
```
TensorView* tv0 = makeContigTensor(2);
TensorView* tv1 = set(tv0);
fusion->addInput(tv0);
fusion->addOutput(tv1);
tv1->axis(0)->parallelize(ParallelType::Stream);
```
the dumped generated Host Ir program is:
```
%HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T1_g_float[iStreamIdx2{i0}, iS3{i2}]) :
T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
FOR StreamIdx in iStreamIdx2{i0}:
GetCurrentStream into Stream 0
SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
Synchronize Stream 0
T2_l_float[iS4{i2}]
= HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx )
T3_l_float[iS5{i2}]
= HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx )
T3_l_float[iS5{i2}]
= Set( T2_l_float[iS4{i2}], cache_op=Streaming )
SetCurrentStream to Stream 0
Synchronize Stream ( StreamIdx % numberOfStreams )
} // %HostIrContainer
```
We can see that the expr, here the "Set", gets embedded into a For Loop.
Let us analyze further:
- outside the for loop, we allocate the global output buffer.
- The start of the for loop body does the new stream assignment and sync
of that stream to the user stream
- Then, we "Select" (aka slice) through `HirAliasSelect` into the input
and output
- The "Set" operation is executed on the "selected" I/O. Note that the
output is an alias to the output's slice.
- At the end of the for loop, we reset to the user's stream (I mean, the
currently selected stream before entering the program) and sync the
user's stream with the running stream.
## Merging for loops
To avoid unnecessary synchronization across streams, it is important to
be able to fuse the stream for-loop. This is exercised by the test
`MultiDeviceExecutorLowerStreamTest.TwoSetOps`:
```
TensorView* tv0 = makeContigTensor(2);
TensorView* tv1 = set(tv0);
TensorView* tv2 = set(tv1);
fusion->addInput(tv0);
fusion->addOutput(tv2);
tv1->axis(0)->parallelize(ParallelType::Stream);
tv2->axis(0)->parallelize(ParallelType::Stream);
```
dump:
```
%HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T2_g_float[iStreamIdx4{i0}, iS5{i2}]) :
T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
T2_g_float[iStreamIdx4{i0}, iS5{i2}] = ALLOCATE(buffer=T2_g_float[iStreamIdx4{i0}, iS5{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
FOR StreamIdx in iStreamIdx2{i0}:
GetCurrentStream into Stream 0
SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
Synchronize Stream 0
T3_l_float[iS6{i2}]
= HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx )
T4_l_float[iS7{i2}]
= HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx )
T4_l_float[iS7{i2}]
= Set( T3_l_float[iS6{i2}], cache_op=Streaming )
T5_l_float[iS8{i2}]
= HirAliasSelect( T2_g_float[iStreamIdx4{i0}, iS5{i2}], axis = iStreamIdx4{i0}, index = StreamIdx )
T5_l_float[iS8{i2}]
= Set( T4_l_float[iS7{i2}], cache_op=Streaming )
SetCurrentStream to Stream 0
Synchronize Stream ( StreamIdx % numberOfStreams )
} // %HostIrContainer
```
We observe that the For-loop are indeed merged.
**Possible future optimization:** the allocation of the intermediate
buffer could be only of length `numberOfStreams`
## separating for loops
We also need to be able to separate and create new for loops if
necessary, as exercised in `ThreeSetOpsWithDisjointsForLoops`, which
considers the Fusion:
```
TensorView* tv0 = makeContigTensor(2);
TensorView* tv1 = set(tv0);
TensorView* tv2 = set(tv1);
TensorView* tv3 = set(tv2);
fusion->addInput(tv0);
fusion->addOutput(tv3);
tv1->axis(0)->parallelize(ParallelType::Stream);
tv3->axis(0)->parallelize(ParallelType::Stream);
```
Here, tv2 is not stream-parallelized so it should be be produced in a
for-loop. Dump:
```
%HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T3_g_float[iStreamIdx6{i0}, iS7{i2}]) :
T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
FOR StreamIdx in iStreamIdx2{i0}:
GetCurrentStream into Stream 0
SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
Synchronize Stream 0
T4_l_float[iS8{i2}]
= HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx )
T5_l_float[iS9{i2}]
= HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx )
T5_l_float[iS9{i2}]
= Set( T4_l_float[iS8{i2}], cache_op=Streaming )
SetCurrentStream to Stream 0
Synchronize Stream ( StreamIdx % numberOfStreams )
T2_g_float[iS4{i0}, iS5{i2}]
= Set( T1_g_float[iStreamIdx2{i0}, iS3{i2}], cache_op=Streaming )
T3_g_float[iStreamIdx6{i0}, iS7{i2}] = ALLOCATE(buffer=T3_g_float[iStreamIdx6{i0}, iS7{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
FOR StreamIdx in iStreamIdx6{i0}:
GetCurrentStream into Stream 2
SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
Synchronize Stream 2
T6_l_float[iS10{i2}]
= HirAliasSelect( T2_g_float[iS4{i0}, iS5{i2}], axis = iS4{i0}, index = StreamIdx )
T7_l_float[iS11{i2}]
= HirAliasSelect( T3_g_float[iStreamIdx6{i0}, iS7{i2}], axis = iStreamIdx6{i0}, index = StreamIdx )
T7_l_float[iS11{i2}]
= Set( T6_l_float[iS10{i2}], cache_op=Streaming )
SetCurrentStream to Stream 2
Synchronize Stream ( StreamIdx % numberOfStreams )
} // %HostIrContainer
```
---------
Co-authored-by: Jacob Hinkle <1454944+jacobhinkle@users.noreply.github.com>
Co-authored-by: Jingyue Wu <wujingyue@gmail.com>
Co-authored-by: Ryan Spring <rspring@nvidia.com>
Co-authored-by: Liqiang Lu <116412316+liqiangxl@users.noreply.github.com>
Co-authored-by: jjsjann123 <jiej@nvidia.com>
Co-authored-by: Naoya Maruyama <naoyam@users.noreply.github.com>
Co-authored-by: Gao, Xiang <qasdfgtyuiop@gmail.com>
Co-authored-by: Priya Mishra <52657555+Priya2698@users.noreply.github.com>
Co-authored-by: Christian Sarofeen <csarofeen@nvidia.com>
Co-authored-by: Nick Sarkauskas <nsarkauskas@nvidia.com>
Co-authored-by: Wang, Xiao <24860335+xwang233@users.noreply.github.com>
Co-authored-by: root <26priya11@gmail.com>
What
Add a
SelectOp-like HIR to express indexing into ATen tensor.Why
it is used in the context of stream lowering, see #4147 and
especially the discussion in #4147 (comment)