Skip to content

[Host ir] support for set reduce and binary op#4146

Merged
samnordmann merged 26 commits intomainfrom
host_irs/LoadStore_Reduction_binaryOp_support
Apr 27, 2025
Merged

[Host ir] support for set reduce and binary op#4146
samnordmann merged 26 commits intomainfrom
host_irs/LoadStore_Reduction_binaryOp_support

Conversation

@samnordmann
Copy link
Collaborator

@samnordmann samnordmann commented Mar 26, 2025

This PR belongs to a series of stacked PRs:

  1. [Host irs] alias and preallocated output support #4144
  2. [Host Ir] refactor and cleanup lowering and segmentation #4145
  3. => You are here: [Host ir] support for set reduce and binary op #4146
  4. [Host irs] Stream lowering of single device fusions #4147

Add support for LoadStoreOp, BinaryOp, ReductionOp, including support for pre-allocated output, which is not provided by ExprEvaluator.

@github-actions
Copy link

github-actions bot commented Mar 26, 2025

Review updated until commit 15552c2

Description

  • Added support for LoadStoreOp, BinaryOp, and ReductionOp in HostIrEvaluator.

  • Enhanced isLowerableAsStandaloneHostOp to include LoadStoreOp of Set type without permute.

  • Added tests for LoadStoreOp, BinaryOp, and ReductionOp with pre-allocated and non-pre-allocated outputs.


Changes walkthrough 📝

Relevant files
Enhancement
executor.cpp
Add handlers for LoadStoreOp, BinaryOp, and ReductionOp   

csrc/host_ir/executor.cpp

  • Added handle methods for LoadStoreOp, BinaryOp, and ReductionOp.
  • Included ir/iostream.h for potential debugging.
  • +123/-0 
    lower.cpp
    Update isLowerableAsStandaloneHostOp for LoadStoreOp         

    csrc/host_ir/lower.cpp

  • Updated isLowerableAsStandaloneHostOp to include LoadStoreOp of Set
    type without permute.
  • +27/-1   
    executor.h
    Add declarations for LoadStoreOp, BinaryOp, and ReductionOp handlers

    csrc/host_ir/executor.h

  • Added declarations for handle methods for LoadStoreOp, BinaryOp, and
    ReductionOp.
  • +3/-0     
    Tests
    test_host_irs.cpp
    Add tests for LoadStoreOp, BinaryOp, and ReductionOp         

    tests/cpp/test_host_irs.cpp

  • Added tests for LoadStoreOp, BinaryOp, and ReductionOp with
    pre-allocated and non-pre-allocated outputs.
  • +180/-0 
    Cleanup
    test_multidevice_pipeline.cpp
    Remove outdated PipelineTestStagedReduction                           

    tests/cpp/test_multidevice_pipeline.cpp

  • Removed outdated PipelineTestStagedReduction and related scheduling
    modes.
  • +0/-131 

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    🧪 PR contains tests
    ⚡ Recommended focus areas for review

    Performance Consideration

    The implementation of handle(LoadStoreOp*) includes a commented-out section that could improve performance by checking if the output tensor's allocation matches the input tensor and copying data directly if possible. This should be evaluated and potentially implemented if it provides a performance benefit.

      t = in_tensor.permute(*permutation);
    } else {
      t = in_tensor;
    }
    
    if (isKnown(out_tv)) {
      auto out_tensor =
          getKnownConcreteValue(load_store_op->out()).as<at::Tensor>();
      out_tensor.copy_(t);
    } else {
      // For completeness, we may check if out_tv's allocation matches `t` and
      // copy data if yes. For example,
      //
      // clang-format off
      // ```
      // const auto& [sizes, strides] = inferShapeOfOutput(out_tv, expr_evaluator_);
      // if (strides == t.strides()) {
      //   bind(out_tv, t);
      // } else {
      //   auto out_tensor = at::empty_strided(sizes, strides, in_tensor.dtype());
      //   out_tensor.copy_(t);
      //   bind_(out_tv, out_tensor);
      // }
      // ```
      // clang-format on
      //
      // For now, I choose to keep code simple for the limited use cases.
    Error Handling

    The handle(ReductionOp*) method throws an error if the output tensor has a root, indicating it is not supported for rFactored reductions. This should be documented or handled more gracefully, possibly with a fallback mechanism or a more descriptive error message.

        !output_tv->hasRoot(),
        "Evaluation for rFactored reductions is not supported.");
    auto input = getKnownConcreteValue(input_tv).as<at::Tensor>();
    Test Coverage

    The tests for LoadStoreOp, BinaryOp, and ReductionOp cover pre-allocated and non-pre-allocated outputs, but they do not cover edge cases such as tensors with different data types, empty tensors, or tensors with non-contiguous memory layouts. Additional tests should be added to ensure robustness.

    using HirSetTest = NVFuserTest;
    
    TEST_F(HirSetTest, HostIr) {
      const std::vector<int64_t> sizes = {8, 64};
    
      auto hic = std::make_unique<HostIrContainer>();
      FusionGuard fg(hic.get());
    
      auto* in = makeConcreteTensor(sizes);
      auto* out = makeConcreteTensor(sizes);
      auto* set = IrBuilder::create<LoadStoreOp>(LoadStoreOpType::Set, out, in);
      hic->addInput(in);
      hic->addInput(out);
      hic->pushBackTopLevelExprs(set);
    
      HostIrEvaluator hie(std::move(hic));
    
      auto options = at::TensorOptions().device(at::kCUDA, 0);
      auto in_aten = at::randn(sizes, options);
      auto out_aten = at::empty(sizes, options);
    
      hie.runWithInput({{in, in_aten}, {out, out_aten}});
    
      EXPECT_TRUE(out_aten.equal(in_aten))
          << "Obtained output: " << out_aten << "\n"
          << "Expected output: " << in_aten;
    }
    
    class HirBinaryOpTest : public NVFuserFixtureParamTest<BinaryOpType> {
     protected:
      at::Tensor executeBinaryOp(at::Tensor lhs, at::Tensor rhs) {
        switch (GetParam()) {
          case BinaryOpType::Add:
            return lhs + rhs;
          case BinaryOpType::Sub:
            return lhs - rhs;
          case BinaryOpType::Mul:
            return lhs * rhs;
          case BinaryOpType::Div:
            return lhs / rhs;
          default:
            NVF_ERROR("Unsupported binary op type ", GetParam());
            return at::Tensor();
        }
      }
    };
    
    TEST_P(HirBinaryOpTest, PreAllocatedOutputs) {
      const std::vector<int64_t> sizes = {8, 64};
      const auto& binary_op_type = GetParam();
    
      auto hic = std::make_unique<HostIrContainer>();
      FusionGuard fg(hic.get());
    
      auto* lhs = makeConcreteTensor(sizes);
      auto* rhs = makeConcreteTensor(sizes);
      auto* out = makeConcreteTensor(sizes);
      auto* binary_op = IrBuilder::create<BinaryOp>(binary_op_type, out, lhs, rhs);
      hic->addInput(lhs);
      hic->addInput(rhs);
      hic->addInput(out);
      hic->pushBackTopLevelExprs(binary_op);
    
      HostIrEvaluator hie(std::move(hic));
    
      auto options = at::TensorOptions().device(at::kCUDA, 0);
      auto lhs_aten = at::randn(sizes, options);
      auto rhs_aten = at::randn(sizes, options);
      auto out_aten = at::empty(sizes, options);
    
      hie.runWithInput({{lhs, lhs_aten}, {rhs, rhs_aten}, {out, out_aten}});
    
      at::Tensor expected_out = executeBinaryOp(lhs_aten, rhs_aten);
      EXPECT_TRUE(expected_out.equal(out_aten))
          << "Obtained output: " << out_aten << "\n"
          << "Expected output: " << expected_out;
    }
    
    TEST_P(HirBinaryOpTest, NonPreAllocatedOutputs) {
      const std::vector<int64_t> sizes = {8, 64};
      const auto& binary_op_type = GetParam();
    
      auto hic = std::make_unique<HostIrContainer>();
      FusionGuard fg(hic.get());
    
      auto* lhs = makeConcreteTensor(sizes);
      auto* rhs = makeConcreteTensor(sizes);
      auto* out = binaryOp(binary_op_type, lhs, rhs);
      hic->addInput(lhs);
      hic->addInput(rhs);
      hic->addOutput(out);
      hic->pushBackTopLevelExprs(out->definition());
    
      HostIrEvaluator hie(std::move(hic));
    
      auto options = at::TensorOptions().device(at::kCUDA, 0);
      auto lhs_aten = at::randn(sizes, options);
      auto rhs_aten = at::randn(sizes, options);
    
      auto out_aten =
          hie.runWithInput({{lhs, lhs_aten}, {rhs, rhs_aten}})[0].as<at::Tensor>();
    
      at::Tensor expected_out = executeBinaryOp(lhs_aten, rhs_aten);
      EXPECT_TRUE(expected_out.equal(out_aten))
          << "Obtained output: " << out_aten << "\n"
          << "Expected output: " << expected_out;
    }
    
    INSTANTIATE_TEST_SUITE_P(
        ,
        HirBinaryOpTest,
        testing::Values(
            BinaryOpType::Add,
            BinaryOpType::Sub,
            BinaryOpType::Mul,
            BinaryOpType::Div),
        [](const testing::TestParamInfo<BinaryOpType>& info) -> std::string {
          std::stringstream ss;
          ss << "BinaryOpType_" << info.param;
          return ss.str();
        });
    
    using HirReductionOpTest = NVFuserTest;
    
    TEST_F(HirReductionOpTest, PreAllocatedOutputs) {
      constexpr int64_t size0 = 8, size1 = 64;
      constexpr int64_t reduction_axis = 1;
    
      auto hic = std::make_unique<HostIrContainer>();
      FusionGuard fg(hic.get());
    
      auto* in = makeConcreteTensor({size0, size1});
      auto* out = newForReduction(in, {reduction_axis}, in->dtype());
      auto* reduction_op = IrBuilder::create<ReductionOp>(
          BinaryOpType::Add, hic->zeroVal(), out, in);
      hic->addInput(in);
      hic->addOutput(out);
      hic->pushBackTopLevelExprs(reduction_op);
    
      HostIrEvaluator hie(std::move(hic));
    
      auto options = at::TensorOptions().device(at::kCUDA, 0);
      auto in_aten = at::randn({size0, size1}, options);
      auto out_aten = at::empty({size0}, options);
    
      hie.runWithInput({{in, in_aten}, {out, out_aten}});
    
      at::Tensor expected_out = in_aten.sum(reduction_axis);
      EXPECT_TRUE(expected_out.equal(out_aten))
          << "Obtained output: " << out_aten << "\n"
          << "Expected output: " << expected_out;
    }
    
    TEST_F(HirReductionOpTest, NonPreAllocatedOutputs) {
      constexpr int64_t size0 = 8, size1 = 64;
      constexpr int64_t reduction_axis = 1;
    
      auto hic = std::make_unique<HostIrContainer>();
      FusionGuard fg(hic.get());
    
      auto* in = makeConcreteTensor({size0, size1});
      auto* out = sum(in, {reduction_axis});
      hic->addInput(in);
      hic->addOutput(out);
      hic->pushBackTopLevelExprs(out->definition());
    
      HostIrEvaluator hie(std::move(hic));
    
      auto options = at::TensorOptions().device(at::kCUDA, 0);
      auto in_aten = at::randn({size0, size1}, options);
      auto out_aten = at::empty({size0}, options);
    
      hie.runWithInput({{in, in_aten}, {out, out_aten}});
    
      at::Tensor expected_out = in_aten.sum(reduction_axis);
      EXPECT_TRUE(expected_out.equal(out_aten))
          << "Obtained output: " << out_aten << "\n"
          << "Expected output: " << expected_out;
    }
    
    } // namespace hir

    @samnordmann samnordmann force-pushed the host_irs/LoadStore_Reduction_binaryOp_support branch from 588e130 to 10daa92 Compare March 26, 2025 13:05
    @samnordmann
    Copy link
    Collaborator Author

    !test

    permutation.has_value(),
    "The logical domain of a Set.Permute is supposed to be a permutation of the root domain: ",
    out_tv->toString());
    in_tensor = in_tensor.permute(*permutation).contiguous();
    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    note that the .contiguous() is necessary here, and I think this is an unexposed bug in LoadStoreOp::evaluate() -- however fixing it there incidentally causes another test failure.

    The bug was not exposed because we never "host evaluate" a set.Permute op before this PR

    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    What was the symptom and can you write a standalone repro so I can help here?

    The bug was not exposed because we never "host evaluate" a set.Permute op before this PR

    I'm sure many tests in

    * All rights reserved.
    "host evaluate" Set.Permute (aka transpose).

    I suspect the bug is not with LoadStoreOp::evaluate() but with some downstream code assuming the input tensor being contiguous. Therefore, a standalone repro would help a lot.

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Nevermind, the problem actually comes from that at::permute is an alias operation, but the operation must be realized before a communication, since nccl/ucc only look at the pointer.

    IIUC, before this patch, a permute op would generate a kernel, which does not produce an alias. Now that it's going through Aten, we get an alias.

    If we remove .contiguous() in the host Ir evaluator, then, for example, test Scatter/PipelineTestTwoStages.Communication/10

    %HostIrContainer { (T0_g_float[iS0{3}, iS1{4}, iS2{3}, iS3{5}] (DeviceMesh{0})) -> (T3_g_float[iS11{3}, ideviceIdx.x12{4}, iS13{3}] (DeviceMesh{0 1 2 3})) :
      T1_l_float[iS4{3}, iS5{4}, iS6{3}, rS7{5}] (DeviceMesh{0})
         = reduction( T0_g_float[iS0{3}, iS1{4}, iS2{3}, iS3{5}] (DeviceMesh{0}), op = add, initial value = float(0), allreduce = false )
      T4_g_float[iS15{4}, iS14{3}, iS16{3}] (DeviceMesh{0})
         = Set.Permute( T1_l_float[iS4{3}, iS5{4}, iS6{3}, rS7{5}] (DeviceMesh{0}), cache_op=Streaming )
      T5_g_float[ideviceIdx.x17{4}, iS18{3}, iS19{3}] (DeviceMesh{0 1 2 3}) = ALLOCATE(buffer=T5_g_float[ideviceIdx.x17{4}, iS18{3}, iS19{3}] (DeviceMesh{0 1 2 3}), mem_type=global, size=36, zero_init=false, resets_to_zero=false)
      Communication 23 (type=Scatter, team=(0 1 2 3), root=0, input=T4_g_float[iS15{4}, iS14{3}, iS16{3}] (DeviceMesh{0}), output=T5_g_float[ideviceIdx.x17{4}, iS18{3}, iS19{3}] (DeviceMesh{0 1 2 3}), backend=NCCL)
      Wait Communication 23
      T6_l_float[iS21{3}, ideviceIdx.x20{4}, iS22{3}] (DeviceMesh{0 1 2 3})
         = Set.Permute( T5_g_float[ideviceIdx.x17{4}, iS18{3}, iS19{3}] (DeviceMesh{0 1 2 3}), cache_op=Streaming )
      T3_g_float[iS11{3}, ideviceIdx.x12{4}, iS13{3}] (DeviceMesh{0 1 2 3})
         = T6_l_float[iS21{3}, ideviceIdx.x20{4}, iS22{3}] (DeviceMesh{0 1 2 3})
         + T6_l_float[iS21{3}, ideviceIdx.x20{4}, iS22{3}] (DeviceMesh{0 1 2 3});
    } // %HostIrContainer
    

    will fail because the input tensor at node 0 is not contiguous.

    I can either leave the .contiguous() as is to force the op to be realized, or add it in postScatter etc. I'm fine either way (those cases are really corner cases that we do not especially target for now), let me know

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I tried to move contiguous() in postScatter in this commit move .contiguous to be in postScatter, but I'm reverting it because:

    1. The same fix needs to be done in postReduceScatter as well, see CI failures
    2. Since a recent PR DID loop split for allgather for non-outermost sharded axis. #4170, the error is now intercepted earlier at
      validateTensors(

    Therefore I revert the commit and keep .contiguous() in HostIrEvaluator::handle(LoadStoreOp*). We might want to refine this behavior in the future (i.e. control whether to realise or alias the permute), but it seems like the safe option for now.

    (Alternatively, a fix could be to add .contiguous() on the input here

    at::Tensor input_tensor =
    .)

    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    IIUC, before this patch, a permute op would generate a kernel, which does not produce an alias.

    That sounds like the right behavior. AFAIU, reorder-sharded-axis and make-resharding-contiguous kicked in. When seeing the allocation domain not in favor, nvFuser considers the permute op as a non-meta op and therefore generates a kernel.

    Now that it's going through Aten, we get an alias.

    That sounds like a bug somewhere. Assuming reorder-sharded-axis and make-resharding-contiguous still kicked in as before, why didn't nvFuser generate a kernel for the Set.Permute?

    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Could you write a test triggering this bug?

    Sure, I'll try that tomorrow. I suspect some of the tests in test_alias.cpp would fail if host IR lowering was on.

    Copy link
    Collaborator Author

    @samnordmann samnordmann Apr 24, 2025

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Another solution that would be fine with me is to only lower as a standalone host op the LoadStoreOps that do not contain any permute

    This is the solution I took for now. It should address the concerns you had

    Copy link
    Collaborator

    @wujingyue wujingyue Apr 24, 2025

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Sure, I'll try that tomorrow. I suspect some of the tests in test_alias.cpp would fail if host IR lowering was on.

    I managed to reproduce the error based on this branch.

    Patch

    diff --git a/tests/cpp/test_alias.cpp b/tests/cpp/test_alias.cpp
    index 04f034f1..559dbafd 100644
    --- a/tests/cpp/test_alias.cpp
    +++ b/tests/cpp/test_alias.cpp
    @@ -150,6 +150,9 @@ TEST_F(AliasTest, View_NoAliasForIncompliantLayout) {
     }
    
     TEST_F(AliasTest, ViewPermute) {
    +  EnableOptionsGuard opt_guard;
    +  EnableOptionsGuard::getCurOptions().set(EnableOption::HostIrLowering);
    +
       auto fusion = std::make_unique<Fusion>();
       FusionGuard fg(fusion.get());

    and then run

    $ NVFUSER_DUMP=host_ir bin/test_nvfuser --gtest_filter=AliasTest.ViewPermute
    
    Note: Google Test filter = AliasTest.ViewPermute
    [==========] Running 1 test from 1 test suite.
    [----------] Global test environment set-up.
    [----------] 1 test from AliasTest
    [ RUN      ] AliasTest.ViewPermute
    
    %HostIrContainer { (T0_g_float[iS0{2}, iS1{3}, iS2{4}]) -> (T2_g_float[iS10{12}, iS9{2}]) :
      T1_l_float[iS3{2}, iS8{12}rf] = view( T0_g_float[iS0{2}, iS1{3}, iS2{4}] )
      T2_g_float[iS10{12}, iS9{2}]
         = Set.Permute( T1_l_float[iS3{2}, iS8{12}rf], cache_op=Streaming )
    } // %HostIrContainer
    
    Aliases:{
    }
    unknown file: Failure
    C++ exception with description " INTERNAL ASSERT FAILED at "/opt/pytorch/nvfuser/csrc/host_ir/executor.cpp":645, please report a bug with repro script to NVFuser at https://github.com/NVIDIA/Fuser/issues. the set opT2_g_float[iS10{12}, iS9{2}]
       = Set.Permute( T1_l_float[iS3{2}, iS8{12}rf], cache_op=Streaming )
    must not be a permute
    

    If the same patch is based on main, the test passes.

    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I pushed a change which hopefully resolve this. PTAL

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    LGTM, thank you very much!

    testing::Values(0, 1),
    testing::Values(true)));

    // Different scheduling modes used in
    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    This test is not relevant anymore since we don't use generated kernels for now. So we'll add it back in times if we think this is useful. But in the meantime it is just technical debt

    @samnordmann samnordmann requested a review from wujingyue April 4, 2025 08:16
    @samnordmann samnordmann force-pushed the host_irs/refactor_lowering_and_segmentation branch from b3ce2b4 to 4964680 Compare April 11, 2025 12:15
    @samnordmann samnordmann force-pushed the host_irs/LoadStore_Reduction_binaryOp_support branch from 10daa92 to 85b7b75 Compare April 11, 2025 13:30
    @samnordmann
    Copy link
    Collaborator Author

    !test

    @samnordmann samnordmann force-pushed the host_irs/LoadStore_Reduction_binaryOp_support branch from 37d6561 to d265584 Compare April 16, 2025 10:37
    @samnordmann
    Copy link
    Collaborator Author

    !test

    samnordmann added a commit that referenced this pull request Apr 16, 2025
    This PR belongs to a series of stacked PRs:
    1. **=> You are here: #4144**
    2. #4145
    3. #4146
    4. #4147
    
    # What
    
    - Support for aliases in HostIrContainer. When a Tensor tv1 is marked as
    being the alias of tv0, then, at runtime, tv0's concrete data/buffer
    will be used for the op. It is a way to reuse buffers that have been
    allocated elsewhere within the TensorView's SSA paradigm. Chained
    aliasing (tv2-->tv1-->tv0) are supported.
    - Fix preallocated outputs in HostIrEvaluator 
    
    # Why
    
    It is necessary for stream parallelization, where typically we allocate
    the full output buffer but each stream writes to a slice of this buffer.
    
    # How 
    
    The aliasing is stored in the HostIrContainer through a map.
    
    At the HostIrEvaluator level, instead of operating directly on the
    ExprEvaluator to write/read concrete data, we first apply the alias
    indirection
    samnordmann added a commit that referenced this pull request Apr 16, 2025
    This PR belongs to a series of stacked PRs:
    1. #4144
    2. **=> You are here:** #4145
    3. #4146
    4. #4147
    
    # What
    
    1. We replace the bool option `SegmentCandidateFinderOptions::
    only_segment_resharding_exprs` by a pointer to a predicate function.
    This allow the user of the segmented to (optionally) provide a custom
    function to be used to decide whether two given groups should be merged.
    This achieves better separation of responsibility: with this option, the
    segmented is only responsible of applying the segmentation algorithm,
    but does not embed the specific rule for merging group which depends on
    the application. The specific rule in our context is decided by the Hir
    lowering. Imo this refactoring should ideally go further and make the
    segmented a more abstract class that would be used in both Host Ir and
    FusionExecutorCache lowering but only changing the newly introduced
    function pointer.
    2. In HIR lowering, we clearly separate (in a distinct for-loop, but
    this later will become a preseg pass) the pass that transforms
    resharding exprs into a Communication
    
    # Why 
    that's a preliminary refactoring useful for more advanced Host Ir
    Lowering, notably ParallelType::Stream lowering
    Base automatically changed from host_irs/refactor_lowering_and_segmentation to main April 16, 2025 14:57
    @samnordmann
    Copy link
    Collaborator Author

    !build

    @samnordmann
    Copy link
    Collaborator Author

    !test

    @samnordmann samnordmann requested a review from wujingyue April 23, 2025 14:36
    @samnordmann
    Copy link
    Collaborator Author

    !test

    Copy link
    Collaborator

    @wujingyue wujingyue left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    LGTM otherwise

    @samnordmann
    Copy link
    Collaborator Author

    !test

    @samnordmann samnordmann merged commit cf5c6d2 into main Apr 27, 2025
    53 checks passed
    samnordmann added a commit that referenced this pull request Apr 28, 2025
    porting PR #4147 to here which is based on main
    
    This PR only serves as a reference since it got broken down into the
    following PRs to ease reviewing and merging:
    1. #4144
    2. #4145
    3. #4146
    4. #4147
    samnordmann added a commit that referenced this pull request Apr 28, 2025
    This PR belongs to a series of stacked PRs:
    1. #4144
    2. #4145
    3. #4146
    4. #4301
    5. **=> You are here:** #4147
    
    # What
    
    Implement a proper lowering for handling ParallelType::Stream. This PR
    has the following restrictions:
    - Single device fusion
    - No split/merge of Stream axis
    
    We add to Hir lowering a new pass that reads the hir container's top
    level expressions, reads the consumer's stream parallelization and
    create For Loop with stream management and sync for expressing the
    stream parallelization. Basic logic for merging For-Loop are written.
    
    Let me explain through some examples that can be found in the PR. We
    suggest to run those examples as follows:
    ```
    NVFUSER_DUMP=host_ir test_host_ir --gtest_filter=*
    ```
    
    ## Single expr and for-loop
    Look at `MultiDeviceExecutorLowerStreamTest.SingleSetOp` simple
    scenario:
    ```
      TensorView* tv0 = makeContigTensor(2);
      TensorView* tv1 = set(tv0);
      fusion->addInput(tv0);
      fusion->addOutput(tv1);
      tv1->axis(0)->parallelize(ParallelType::Stream);
    ```
    the dumped generated Host Ir program is:
    ```
    %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T1_g_float[iStreamIdx2{i0}, iS3{i2}]) :
      T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
      FOR StreamIdx in iStreamIdx2{i0}:
        GetCurrentStream into Stream 0
        SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
        Synchronize Stream 0
        T2_l_float[iS4{i2}]
           = HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx )
        T3_l_float[iS5{i2}]
           = HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx )
        T3_l_float[iS5{i2}]
           = Set( T2_l_float[iS4{i2}], cache_op=Streaming )
        SetCurrentStream to Stream 0
        Synchronize Stream ( StreamIdx % numberOfStreams )
    } // %HostIrContainer
    ```
    We can see that the expr, here the "Set", gets embedded into a For Loop.
    Let us analyze further:
    - outside the for loop, we allocate the global output buffer.
    - The start of the for loop body does the new stream assignment and sync
    of that stream to the user stream
    - Then, we "Select" (aka slice) through `HirAliasSelect` into the input
    and output
    - The "Set" operation is executed on the "selected" I/O. Note that the
    output is an alias to the output's slice.
    - At the end of the for loop, we reset to the user's stream (I mean, the
    currently selected stream before entering the program) and sync the
    user's stream with the running stream.
    
    ## Merging for loops
    
    To avoid unnecessary synchronization across streams, it is important to
    be able to fuse the stream for-loop. This is exercised by the test
    `MultiDeviceExecutorLowerStreamTest.TwoSetOps`:
    ```
      TensorView* tv0 = makeContigTensor(2);
      TensorView* tv1 = set(tv0);
      TensorView* tv2 = set(tv1);
      fusion->addInput(tv0);
      fusion->addOutput(tv2);
      tv1->axis(0)->parallelize(ParallelType::Stream);
      tv2->axis(0)->parallelize(ParallelType::Stream);
    ```
    dump:
    ```
    %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T2_g_float[iStreamIdx4{i0}, iS5{i2}]) :
      T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
      T2_g_float[iStreamIdx4{i0}, iS5{i2}] = ALLOCATE(buffer=T2_g_float[iStreamIdx4{i0}, iS5{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
      FOR StreamIdx in iStreamIdx2{i0}:
        GetCurrentStream into Stream 0
        SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
        Synchronize Stream 0
        T3_l_float[iS6{i2}]
           = HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx )
        T4_l_float[iS7{i2}]
           = HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx )
        T4_l_float[iS7{i2}]
           = Set( T3_l_float[iS6{i2}], cache_op=Streaming )
        T5_l_float[iS8{i2}]
           = HirAliasSelect( T2_g_float[iStreamIdx4{i0}, iS5{i2}], axis = iStreamIdx4{i0}, index = StreamIdx )
        T5_l_float[iS8{i2}]
           = Set( T4_l_float[iS7{i2}], cache_op=Streaming )
        SetCurrentStream to Stream 0
        Synchronize Stream ( StreamIdx % numberOfStreams )
    } // %HostIrContainer
    ```
    We observe that the For-loop are indeed merged.
    **Possible future optimization:** the allocation of the intermediate
    buffer could be only of length `numberOfStreams`
    
    ## separating for loops
    
    We also need to be able to separate and create new for loops if
    necessary, as exercised in `ThreeSetOpsWithDisjointsForLoops`, which
    considers the Fusion:
    ```
      TensorView* tv0 = makeContigTensor(2);
      TensorView* tv1 = set(tv0);
      TensorView* tv2 = set(tv1);
      TensorView* tv3 = set(tv2);
      fusion->addInput(tv0);
      fusion->addOutput(tv3);
      tv1->axis(0)->parallelize(ParallelType::Stream);
      tv3->axis(0)->parallelize(ParallelType::Stream);
    ```
    Here, tv2 is not stream-parallelized so it should be be produced in a
    for-loop. Dump:
    ```
    %HostIrContainer { (T0_g_float[iS0{i0}, iS1{i2}]) -> (T3_g_float[iStreamIdx6{i0}, iS7{i2}]) :
      T1_g_float[iStreamIdx2{i0}, iS3{i2}] = ALLOCATE(buffer=T1_g_float[iStreamIdx2{i0}, iS3{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
      FOR StreamIdx in iStreamIdx2{i0}:
        GetCurrentStream into Stream 0
        SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
        Synchronize Stream 0
        T4_l_float[iS8{i2}]
           = HirAliasSelect( T0_g_float[iS0{i0}, iS1{i2}], axis = iS0{i0}, index = StreamIdx )
        T5_l_float[iS9{i2}]
           = HirAliasSelect( T1_g_float[iStreamIdx2{i0}, iS3{i2}], axis = iStreamIdx2{i0}, index = StreamIdx )
        T5_l_float[iS9{i2}]
           = Set( T4_l_float[iS8{i2}], cache_op=Streaming )
        SetCurrentStream to Stream 0
        Synchronize Stream ( StreamIdx % numberOfStreams )
      T2_g_float[iS4{i0}, iS5{i2}]
         = Set( T1_g_float[iStreamIdx2{i0}, iS3{i2}], cache_op=Streaming )
      T3_g_float[iStreamIdx6{i0}, iS7{i2}] = ALLOCATE(buffer=T3_g_float[iStreamIdx6{i0}, iS7{i2}], mem_type=global, size=( i0 * i2 ), zero_init=false, resets_to_zero=false)
      FOR StreamIdx in iStreamIdx6{i0}:
        GetCurrentStream into Stream 2
        SetCurrentStream to Stream ( StreamIdx % numberOfStreams )
        Synchronize Stream 2
        T6_l_float[iS10{i2}]
           = HirAliasSelect( T2_g_float[iS4{i0}, iS5{i2}], axis = iS4{i0}, index = StreamIdx )
        T7_l_float[iS11{i2}]
           = HirAliasSelect( T3_g_float[iStreamIdx6{i0}, iS7{i2}], axis = iStreamIdx6{i0}, index = StreamIdx )
        T7_l_float[iS11{i2}]
           = Set( T6_l_float[iS10{i2}], cache_op=Streaming )
        SetCurrentStream to Stream 2
        Synchronize Stream ( StreamIdx % numberOfStreams )
    } // %HostIrContainer
    ```
    
    ---------
    
    Co-authored-by: Jacob Hinkle <1454944+jacobhinkle@users.noreply.github.com>
    Co-authored-by: Jingyue Wu <wujingyue@gmail.com>
    Co-authored-by: Ryan Spring <rspring@nvidia.com>
    Co-authored-by: Liqiang Lu <116412316+liqiangxl@users.noreply.github.com>
    Co-authored-by: jjsjann123 <jiej@nvidia.com>
    Co-authored-by: Naoya Maruyama <naoyam@users.noreply.github.com>
    Co-authored-by: Gao, Xiang <qasdfgtyuiop@gmail.com>
    Co-authored-by: Priya Mishra <52657555+Priya2698@users.noreply.github.com>
    Co-authored-by: Christian Sarofeen <csarofeen@nvidia.com>
    Co-authored-by: Nick Sarkauskas <nsarkauskas@nvidia.com>
    Co-authored-by: Wang, Xiao <24860335+xwang233@users.noreply.github.com>
    Co-authored-by: root <26priya11@gmail.com>
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

    Labels

    None yet

    Projects

    None yet

    Development

    Successfully merging this pull request may close these issues.

    2 participants