From 9a0dc9e8d6e0cdc8699384a8bae203591d01ec4b Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 26 Mar 2025 04:29:58 -0700 Subject: [PATCH 01/19] host ir alias and prealloc output support --- csrc/host_ir/container.cpp | 5 + csrc/host_ir/container.h | 12 ++ csrc/host_ir/executor.cpp | 216 ++++++++++++++++++------------------ csrc/host_ir/executor.h | 33 +++++- tests/cpp/test_host_irs.cpp | 112 ++++++++++++++++++- 5 files changed, 268 insertions(+), 110 deletions(-) diff --git a/csrc/host_ir/container.cpp b/csrc/host_ir/container.cpp index ca4c98f2a56..a133c0521ba 100644 --- a/csrc/host_ir/container.cpp +++ b/csrc/host_ir/container.cpp @@ -35,6 +35,11 @@ Stream* HostIrContainer::getDefaultStream() { std::ostream& HostIrContainer::print(std::ostream& os) const { IrMathPrinter op_exprs(os); op_exprs.handle(this); + os << "Aliases:{"; + for (const auto& alias : alias_) { + os << "\n " << alias.first << " -> " << alias.second; + } + os << "\n}\n"; return os; } diff --git a/csrc/host_ir/container.h b/csrc/host_ir/container.h index ab029f63fa2..7dcd66b4436 100644 --- a/csrc/host_ir/container.h +++ b/csrc/host_ir/container.h @@ -55,10 +55,22 @@ class HostIrContainer final : public Fusion { Stream* getDefaultStream(); + void markAlias(TensorView* original, const TensorView* new_alias) { + if (alias_.count(original)) { + original = alias_[original]->as(); + } + alias_[new_alias] = original; + } + + const auto& alias() const { + return alias_; + } + private: std::vector top_level_exprs_; std::vector> kernel_executors_; Stream* default_stream_ = nullptr; + std::unordered_map alias_; }; } // namespace hir diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 4b6c2fe6796..4b631bd0061 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -171,32 +171,6 @@ KernelArgumentHolder HostIrExecutor::run( namespace hir { -namespace { - -at::Tensor getKnownTensorOrUndefined( - Val* val, - const ExpressionEvaluator& expr_evaluator) { - return expr_evaluator.isKnown(val) - ? expr_evaluator.evaluate(val).as() - : at::Tensor(); -} - -KernelArgumentHolder getKnownTensorOrUndefined( - const std::vector& vals, - const ExpressionEvaluator& expr_evaluator) { - std::vector tensors(vals.size()); - std::transform( - vals.begin(), - vals.end(), - tensors.begin(), - [&expr_evaluator](Val* val) -> at::Tensor { - return getKnownTensorOrUndefined(val, expr_evaluator); - }); - return KernelArgumentHolder(tensors); -} - -} // namespace - HostIrEvaluator::HostIrEvaluator( std::unique_ptr container, Communicator* communicator, @@ -216,10 +190,23 @@ HostIrEvaluator::HostIrEvaluator( {container_->getDefaultStream(), c10::cuda::getDefaultCUDAStream( static_cast(device_index))}); - expr_evaluator_.bind("numberOfStreams", params_.number_of_streams); + NVF_ERROR( + std::all_of( + container_->inputs().begin(), + container_->inputs().end(), + [this](Val* input) { return !container_->alias().count(input); }), + "Inputs cannot be aliased"); } -KernelArgumentHolder HostIrEvaluator::dispatchAndCollectOutputs() { +KernelArgumentHolder HostIrEvaluator::runWithInput( + const std::unordered_map& val_to_PValue) { + expr_evaluator_ = ExpressionEvaluator(); + expr_evaluator_.bind("numberOfStreams", params_.number_of_streams); + // process input values, converting IValue to PolymorphicValue + for (const auto& [val, pvalue] : val_to_PValue) { + bind(val, pvalue); + } + // Interpret each instruction in an "eager" way by iterate over the Host Ir // Container's top level expression list for (auto expr : container_->topLevelExprs()) { @@ -227,17 +214,15 @@ KernelArgumentHolder HostIrEvaluator::dispatchAndCollectOutputs() { } // Collect global outputs - return getKnownTensorOrUndefined(container_->outputs(), expr_evaluator_); -} - -KernelArgumentHolder HostIrEvaluator::runWithInput( - const std::unordered_map& val_to_PValue) { - // process input values, converting IValue to PolymorphicValue - for (const auto& [val, pvalue] : val_to_PValue) { - expr_evaluator_.bind(val, pvalue); - } - - return dispatchAndCollectOutputs(); + std::vector outputs(container_->outputs().size()); + std::transform( + container_->outputs().begin(), + container_->outputs().end(), + outputs.begin(), + [this](Val* val) -> at::Tensor { + return this->getKnownTensorOrUndefined(val); + }); + return KernelArgumentHolder(outputs); } std::string HostIrEvaluator::canRun() const { @@ -320,13 +305,7 @@ void HostIrEvaluator::handle(Synchronize* synchronize) { void HostIrEvaluator::handle(LaunchKernel* launch_kernel) { KernelArgumentHolder args; for (auto& input : launch_kernel->inputs()) { - NVF_ERROR( - expr_evaluator_.isKnown(input), - "No buffer associated with Val ", - input, - " for handling ", - launch_kernel->toString()); - args.push(expr_evaluator_.evaluate(input)); + args.push(getKnownConcreteData(input)); } args.setDeviceIndex(); @@ -341,25 +320,35 @@ void HostIrEvaluator::handle(LaunchKernel* launch_kernel) { // Store the outputs in the context for (auto output_idx : c10::irange(outputs.size())) { - expr_evaluator_.bind( - launch_kernel->outputs().at(output_idx), outputs[output_idx]); + bind(launch_kernel->outputs().at(output_idx), outputs[output_idx]); } } void HostIrEvaluator::handle(PostOnStream* post_ir) { KernelArgumentHolder input_args; for (auto& input : post_ir->inputs()) { - NVF_ERROR( - expr_evaluator_.isKnown(input), - "No buffer associated with Val ", - input, - " for handling ", - post_ir->toString()); - input_args.push(expr_evaluator_.evaluate(input)); + input_args.push(getKnownConcreteData(input)); } input_args.setDeviceIndex(); // placeholder for storing the outputs KernelArgumentHolder outputs; + bool use_preallocated_outputs = std::all_of( + post_ir->outputs().begin(), + post_ir->outputs().end(), + [this](Val* output) { return this->isKnown(output); }); + NVF_ERROR( + use_preallocated_outputs || + std::all_of( + post_ir->outputs().begin(), + post_ir->outputs().end(), + [this](Val* output) { return !this->isKnown(output); }), + "outputs must be all or none preallocated in expr ", + post_ir); + if (use_preallocated_outputs) { + for (auto output : post_ir->outputs()) { + outputs.push(getKnownConcreteData(output)); + } + } NVF_ERROR( post_ir->hostOpToPost()->isA(), @@ -376,16 +365,23 @@ void HostIrEvaluator::handle(PostOnStream* post_ir) { /*fusion_id=*/0, !params_.skip_auto_scheduling); } - outputs = fec_.at(hu).runFusionWithInputs(input_args); + if (use_preallocated_outputs) { + TORCH_WARN( + "FusionExecutorCache does not support with preallocated outputs, so we are copying the outputs in expr ", + post_ir); + auto tmp_outputs = fec_.at(hu).runFusionWithInputs(input_args); + for (auto output_idx : c10::irange(tmp_outputs.size())) { + outputs[output_idx].as().copy_( + tmp_outputs[output_idx].as()); + } + } else { + outputs = fec_.at(hu).runFusionWithInputs(input_args); + } } else { // This path should generally be avoided as it will likely send the fusion // held in HostUnit directly to KernelExecutor which means it will try to // compile and run a device kernel with a single thread. - if (auto it = executors_.find(hu); it != executors_.end()) { - ExecutorAbstract* ea = it->second.get(); - outputs = ExecutorDispatch::run(ea, input_args); - - } else { + if (auto it = executors_.find(hu); it == executors_.end()) { DynamicTransform::concretizeFusion(hu->fusion_to_execute(), input_args); auto it2 = executors_.insert( {hu, @@ -402,14 +398,20 @@ void HostIrEvaluator::handle(PostOnStream* post_ir) { } else { ExecutorDispatch::compile(ea, hu->fusion_to_execute()); } + } + ExecutorAbstract* ea = executors_[hu].get(); + if (use_preallocated_outputs) { + ExecutorDispatch::run(ea, input_args, outputs); + } else { outputs = ExecutorDispatch::run(ea, input_args); } } - // Store the outputs in the context - for (auto output_idx : c10::irange(outputs.size())) { - expr_evaluator_.bind( - post_ir->outputs().at(output_idx), outputs[output_idx]); + if (!use_preallocated_outputs) { + // Store the outputs in the context + for (auto output_idx : c10::irange(outputs.size())) { + bind(post_ir->outputs().at(output_idx), outputs[output_idx]); + } } } @@ -418,10 +420,9 @@ void HostIrEvaluator::handle(Communication* communication) { communicator_ != nullptr && communicator_->is_available(), "A valid communicator must be provided"); - at::Tensor input_tensor = - getKnownTensorOrUndefined(communication->input(0), expr_evaluator_); + at::Tensor input_tensor = getKnownTensorOrUndefined(communication->input(0)); at::Tensor output_tensor = - getKnownTensorOrUndefined(communication->output(0), expr_evaluator_); + getKnownTensorOrUndefined(communication->output(0)); CommunicatorBackend backend_type = communication->backend(); c10d::Backend* backend = @@ -439,8 +440,7 @@ void HostIrEvaluator::handle(P2PCommunication* communication) { communicator_ != nullptr && communicator_->is_available(), "A valid communicator must be provided"); - at::Tensor buffer = - getKnownTensorOrUndefined(communication->buffer(), expr_evaluator_); + at::Tensor buffer = getKnownTensorOrUndefined(communication->buffer()); works_[communication] = postSingleCommunication( communication, @@ -495,11 +495,11 @@ void HostIrEvaluator::handle(ForLoop* for_loop) { for (auto i = start; i < stop; i += step) { // invalidate i and its consumers before binding - expr_evaluator_.invalidate(for_loop->index()); + invalidate(for_loop->index()); for (auto consumer : allConsumerValsOf(for_loop->index())) { - expr_evaluator_.invalidate(consumer); + invalidate(consumer); } - expr_evaluator_.bind(for_loop->index(), i); + bind(for_loop->index(), i); for (Expr* expr : for_loop->body().exprs()) { dispatch(expr); } @@ -536,15 +536,11 @@ void HostIrEvaluator::handle(MatmulOp* matmul) { TensorView* a = matmul->inA(); TensorView* b = matmul->inB(); TensorView* out = matmul->out(); - NVF_ERROR( - expr_evaluator_.isKnown(a) && expr_evaluator_.isKnown(b), - "Inputs of the matmul ", - matmul->toString(), - "must be precomputed before being retrieved"); - if (expr_evaluator_.isKnown(out)) { - auto t_a = expr_evaluator_.evaluate(a).as(); - auto t_b = expr_evaluator_.evaluate(b).as(); - auto t_out = expr_evaluator_.evaluate(out).as(); + + if (isKnown(out)) { + auto t_a = getKnownConcreteData(a).as(); + auto t_b = getKnownConcreteData(b).as(); + auto t_out = getKnownConcreteData(out).as(); at::matmul_out(t_out, t_a, t_b); } else { unhandled(matmul); @@ -556,24 +552,18 @@ void HostIrEvaluator::handle(LinearOp* linear) { TensorView* weight = linear->inB()->as(); TensorView* bias = linear->bias()->as(); TensorView* out = linear->out()->as(); - NVF_ERROR( - expr_evaluator_.isKnown(in) && expr_evaluator_.isKnown(weight) && - (!linear->has_bias() || expr_evaluator_.isKnown(bias)), - "Inputs of the Linear Op ", - linear->toString(), - "must be precomputed before being retrieved"); - if (!expr_evaluator_.isKnown(out)) { + if (!isKnown(out)) { unhandled(linear); return; } - auto in_at = expr_evaluator_.evaluate(in).as(); - auto weight_at = expr_evaluator_.evaluate(weight).as(); - auto out_at = expr_evaluator_.evaluate(out).as(); + auto in_at = getKnownConcreteData(in).as(); + auto weight_at = getKnownConcreteData(weight).as(); + auto out_at = getKnownConcreteData(out).as(); if (linear->has_bias()) { - auto bias_at = expr_evaluator_.evaluate(bias).as(); + auto bias_at = getKnownConcreteData(bias).as(); at::linear_out(out_at, in_at, weight_at.squeeze(), bias_at.squeeze()); } else { at::linear_out(out_at, in_at, weight_at.squeeze()); @@ -600,25 +590,37 @@ void HostIrEvaluator::handle(kir::Allocate* allocate) { c10::nullopt, device, c10::nullopt); - - expr_evaluator_.bind(tv, tensor); + bind(tv, tensor); } void HostIrEvaluator::unhandled(Statement* stmt) { NVF_ERROR(stmt->isA(), stmt, " must be an Expr"); auto* expr = stmt->as(); - for (auto input : ir_utils::filterByType(expr->inputs())) { - NVF_ERROR( - expr_evaluator_.isKnown(input), - "input ", - input->toString(), - " of the expression ", - expr->toString(), - "must be precomputed before being retrieved"); - } - for (auto output : expr->outputs()) { - expr_evaluator_.bind( - output, expr_evaluator_.evaluate(output), /*evaluate_validate=*/true); + std::vector inputs; + for (auto input : expr->inputs()) { + if (input->isA()) { + // Tensor inputs must be already computed at this point + inputs.push_back(getKnownConcreteData(input)); + } else { + inputs.push_back(expr_evaluator_.evaluate(input)); + } + } + + // Check that there is no pre-allocated output + NVF_ERROR( + std::all_of( + expr->outputs().begin(), + expr->outputs().end(), + [this](Val* output) { + return !this->expr_evaluator_.isKnown(output); + }), + "Do not support pre-allocated outputs for the op ", + expr); + // using ExpressionEvaluator::evaluate to evaluate the output is not valid + // here if the output or one of its producer is an alias + auto concrete_outputs = expr->evaluate(expr_evaluator_, inputs); + for (int64_t i : c10::irange(expr->outputs().size())) { + bind(expr->output(i), concrete_outputs.at(i)); } } diff --git a/csrc/host_ir/executor.h b/csrc/host_ir/executor.h index f1b8ed4ef88..73f52a7bb90 100644 --- a/csrc/host_ir/executor.h +++ b/csrc/host_ir/executor.h @@ -133,7 +133,38 @@ class HostIrEvaluator final : public OptOutDispatch { c10::cuda::CUDAStream getCUDAStream(Stream* stream); - KernelArgumentHolder dispatchAndCollectOutputs(); + Val* getAlias(Val* val) const { + const auto& aliases = container_->alias(); + auto it = aliases.find(val); + return it != aliases.end() ? it->second : val; + } + + bool isKnown(Val* value) const { + return expr_evaluator_.isKnown(getAlias(value)); + } + + PolymorphicValue getKnownConcreteData(Val* val) const { + NVF_ERROR( + isKnown(val), + "value ", + val->toString(), + "must be precomputed before being retrieved"); + return expr_evaluator_.evaluate(getAlias(val)); + } + + at::Tensor getKnownTensorOrUndefined(Val* val) const { + return isKnown(val) + ? expr_evaluator_.evaluate(getAlias(val)).as() + : at::Tensor(); + } + + void bind(Val* value, PolymorphicValue concrete_value) { + expr_evaluator_.bind(getAlias(value), concrete_value); + } + + void invalidate(Val* value) { + expr_evaluator_.invalidate(getAlias(value)); + } std::unique_ptr container_; Communicator* communicator_; diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index da85466c8a0..ba296f6f357 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -456,6 +456,62 @@ TEST_P(HostIrTest, ForLoops) { EXPECT_TRUE(expected_result.equal(buffer_at)); } +TEST_P(HostIrTest, PreAllocatedOutputs) { + const std::vector input_sizes = {4, 8, 32}; + const std::vector output_sizes = { + input_sizes.at(1), input_sizes.at(2)}; + + auto get_fusion = [input_sizes]() -> std::unique_ptr { + auto fusion = std::make_unique(); + FusionGuard fg(fusion.get()); + + auto tv0 = makeConcreteTensor(input_sizes); + auto tv1 = add(tv0, tv0); + auto tv2 = sum(tv1, {0}); + fusion->addInput(tv0); + fusion->addOutput(tv2); + return fusion; + }; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto host_unit = IrBuilder::create(get_fusion()); + + IrCloner ir_cloner(hic.get()); + std::vector post_on_stream_inputs = { + ir_cloner.clone(host_unit->fusion_to_execute()->inputs().at(0))}; + std::vector post_on_stream_outputs = { + ir_cloner.clone(host_unit->fusion_to_execute()->outputs().at(0))}; + + auto post_on_stream = IrBuilder::create( + host_unit, post_on_stream_inputs, post_on_stream_outputs); + + hic->pushBackTopLevelExprs(post_on_stream); + + hic->addInput(post_on_stream->inputs().at(0)); + hic->addInput(post_on_stream->outputs().at(0)); + + HostIrEvaluatorParams params; + auto [use_fusion_executor_cache] = GetParam(); + params.use_fusion_executor_cache = use_fusion_executor_cache; + HostIrEvaluator hie(std::move(hic), nullptr, params); + + // define concrete inputs and compute ref output for validation + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto input = at::randn(input_sizes, options); + auto output = at::empty(output_sizes, options); + auto ref_output = at::sum(input * 2, {0}); + + hie.runWithInput( + {{post_on_stream->inputs().at(0), input}, + {post_on_stream->outputs().at(0), output}}); + + // validate the obtained results + GTEST_EXPECT_TRUE(torch::allclose(ref_output, output)) + << "Output: " << output << " Expected: " << ref_output; +} + INSTANTIATE_TEST_SUITE_P( , HostIrTest, @@ -1095,7 +1151,12 @@ TEST_F(IfThenElseTest, HostIr) { hic->addOutput(output_buffer); hic->pushBackTopLevelExprs(if_then_else); - HostIrEvaluator hie(std::move(hic)); + // Need to use FusionExecutorCache, otherwise hitting error + // https://github.com/NVIDIA/Fuser/blob/4d032f74d2347fd68f5be607ef94956500eb917b/csrc/runtime/executor.cpp#L750 + HostIrEvaluator hie( + std::move(hic), + /*Communicator=*/nullptr, + {.use_fusion_executor_cache = true}); for (auto boolean : {true, false}) { auto options = @@ -1155,7 +1216,7 @@ TEST_F(AllocationTest, inHostForLoop) { TensorView* tv0 = makeConcreteTensor(sizes); tv0->setMemoryType(MemoryType::Global); auto* allocate = IrBuilder::create(tv0, MemoryType::Global); - TensorView* tv1 = abs(tv0); + TensorView* tv1 = set(tv0); for_loop->body().push_back(allocate); for_loop->body().push_back(tv1->definition()); @@ -1170,6 +1231,53 @@ TEST_F(AllocationTest, inHostForLoop) { EXPECT_EQ(sizes, outputs[0].as().sizes()); } +using HirAlias = NVFuserTest; + +TEST_F(HirAlias, SetAndGet) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + TensorView* tv0 = makeConcreteTensor(sizes); + TensorView* tv1 = set(tv0); + TensorView* tv2 = makeConcreteTensor(sizes); + hic->markAlias(tv1, tv2); + TensorView* tv3 = set(tv2); + TensorView* tv4 = makeConcreteTensor(sizes); + hic->markAlias(tv3, tv4); + hic->addInput(tv0); + hic->addOutput(tv4); + hic->pushBackTopLevelExprs(tv1->definition()); + hic->pushBackTopLevelExprs(tv3->definition()); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + at::Tensor tv0_aten = at::randn(sizes, options); + + at::Tensor out_aten = hie.runWithInput({{tv0, tv0_aten}})[0].as(); + + at::Tensor expected_out = tv0_aten; + EXPECT_TRUE(out_aten.equal(expected_out)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + +TEST_F(HirAlias, ThrowOnInputAlias) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + TensorView* tv0 = makeConcreteTensor(sizes); + TensorView* tv1 = set(tv0); + hic->markAlias(tv1, tv0); + hic->addInput(tv0); + + EXPECT_ANY_THROW(HostIrEvaluator hie(std::move(hic))); +} + } // namespace hir } // namespace nvfuser From 9820d5aba5b81a02ee96dbcc5ba651837d6add8c Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 26 Mar 2025 04:48:56 -0700 Subject: [PATCH 02/19] harden and simplify allocation in for loop test --- tests/cpp/test_host_irs.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index ba296f6f357..654e60c5d31 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -1216,13 +1216,11 @@ TEST_F(AllocationTest, inHostForLoop) { TensorView* tv0 = makeConcreteTensor(sizes); tv0->setMemoryType(MemoryType::Global); auto* allocate = IrBuilder::create(tv0, MemoryType::Global); - TensorView* tv1 = set(tv0); for_loop->body().push_back(allocate); - for_loop->body().push_back(tv1->definition()); hic->pushBackTopLevelExprs(for_loop); - hic->addOutput(tv1); + hic->addOutput(tv0); HostIrEvaluator hie(std::move(hic)); From 2ad510d47138e675e45ae9ff0af4c2726fea7c0a Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 26 Mar 2025 05:16:37 -0700 Subject: [PATCH 03/19] refactor and clean host ir lowering and segmentation --- csrc/fusion_segmenter.cpp | 15 ++---- csrc/fusion_segmenter.h | 3 +- csrc/host_ir/container.h | 4 ++ csrc/host_ir/lower.cpp | 87 ++++++++++++++++++++++++----------- csrc/host_ir/lower.h | 7 +++ tests/cpp/test_resharding.cpp | 2 +- 6 files changed, 79 insertions(+), 39 deletions(-) diff --git a/csrc/fusion_segmenter.cpp b/csrc/fusion_segmenter.cpp index d5368c41066..90e5ba20cf8 100644 --- a/csrc/fusion_segmenter.cpp +++ b/csrc/fusion_segmenter.cpp @@ -3919,15 +3919,8 @@ bool SegmentCandidateFinder::codeGenSupportedMerge( NVF_ERROR( areDirectlyConnected(group1, group2), "only support testing immediate producer-consumer groups"); - if (options_.only_segment_resharding_exprs) { - for (auto group : {group1, group2}) { - for (auto expr : group->exprs()) { - if (isResharding(expr)) { - return false; - } - } - } - return true; + if (options_.custom_should_merge_groups != nullptr) { + return (*options_.custom_should_merge_groups)(group1, group2); } return tryMerge(segmented_fusion_.get(), runtimeInfo(), group1, group2) != SchedulerType::None; @@ -3938,7 +3931,7 @@ bool SegmentCandidateFinder::codeGenSupportedMerge( SchedulerType SegmentCandidateFinder::deriveSchedulerType( SegmentedGroup* group) { FUSER_PERF_SCOPE("SegmentCandidateFinder::deriveSchedulerType"); - if (options_.only_segment_resharding_exprs) { + if (options_.custom_should_merge_groups != nullptr) { // We don't need to generate a SchedulerType for multidevice segments at // this moment return SchedulerType::None; @@ -3958,7 +3951,7 @@ SegmentCandidateFinder::SegmentCandidateFinder( : options_(options), runtime_inputs_(inputs) { FUSER_PERF_SCOPE("SegmentCandidateFinder::SegmentCandidateFinder"); NVF_ERROR( - !options_.only_segment_resharding_exprs || + options_.custom_should_merge_groups == nullptr || (!options_.run_translate_welford && !options_.run_combine_reductions && options_.run_herrmann_merge && options_.run_final_merge), diff --git a/csrc/fusion_segmenter.h b/csrc/fusion_segmenter.h index 1e32d42bb36..cc721d59301 100644 --- a/csrc/fusion_segmenter.h +++ b/csrc/fusion_segmenter.h @@ -501,7 +501,8 @@ struct SegmentCandidateFinderOptions { bool run_combine_reductions = true; bool run_herrmann_merge = true; bool run_final_merge = true; - bool only_segment_resharding_exprs = false; + bool (*custom_should_merge_groups)(SegmentedGroup*, SegmentedGroup*) = + nullptr; }; //! SegmentCandidateFinder diff --git a/csrc/host_ir/container.h b/csrc/host_ir/container.h index 7dcd66b4436..eb322275422 100644 --- a/csrc/host_ir/container.h +++ b/csrc/host_ir/container.h @@ -41,6 +41,10 @@ class HostIrContainer final : public Fusion { //! Print to an output stream std::ostream& print(std::ostream& os) const; + void resetTopLevelExprs(std::vector exprs) { + top_level_exprs_ = std::move(exprs); + } + const std::vector& topLevelExprs() const; void pushBackTopLevelExprs(Expr* expr); diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index 32febda37a0..4838961ed85 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -6,8 +6,8 @@ */ // clang-format on #include -#include #include +#include #include #include #include @@ -592,6 +592,29 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( get_current_stream, allocate_tva_allgathered, allocate_tv_out, for_loop}; } +bool HostIrLower::isLoweredAsStandaloneHostOp(Expr* expr) { + return expr->isOneOf< + MatmulOp, + SliceOp, + SelectOp, + LinearOp, + Communication, + P2PCommunication>(); +} + +bool HostIrLower::ShouldMergeSegmentedGroups( + SegmentedGroup* group1, + SegmentedGroup* group2) { + for (auto group : {group1, group2}) { + for (auto expr : group->exprs()) { + if (isLoweredAsStandaloneHostOp(expr)) { + return false; + } + } + } + return true; +} + std::unique_ptr HostIrLower::lower( std::unique_ptr fusion, DeviceIdxType my_device_index) { @@ -615,7 +638,7 @@ std::unique_ptr HostIrLower::lower( .run_combine_reductions = false, .run_herrmann_merge = true, .run_final_merge = true, - .only_segment_resharding_exprs = true}; + .custom_should_merge_groups = &ShouldMergeSegmentedGroups}; std::unique_ptr staged_fusion = SegmentCandidateFinder::segment( std::move(fusion), KernelArgumentHolder(), options, true); @@ -643,32 +666,18 @@ std::unique_ptr HostIrLower::lower( if (involvedDevices(group->exprs().at(0)).count(my_device_index) == 0) { continue; } - const bool is_resharding = std::any_of( - group->exprs().begin(), group->exprs().end(), [](auto expr) { - return isResharding(expr); - }); - if (is_resharding) { + // we decide whether to insert the Expr as a standalone op in the + // HostIRContainer, which will result in using ATen Op to evaluate it -- + // or, alternatively, to wrap them into a PostOnStream(HostUnit(.)) which + // will result in a kernel code generation. + if (std::all_of( + group->exprs().begin(), + group->exprs().end(), + isLoweredAsStandaloneHostOp)) { NVF_ERROR( group->exprs().size() == 1, - "Communication segments must contain only one Expr"); - for (auto* expr : HostIrLower::lower( - ir_cloner.clone(group->exprs().at(0)), my_device_index)) { - // Allocate the recv buffers of communications - if (expr->isA()) { - auto* communication = expr->as(); - TensorView* tv = communication->out(); - if (tv->getDeviceMesh().has(my_device_index)) { - auto* allocate = - IrBuilder::create(tv, MemoryType::Global); - hic->pushBackTopLevelExprs(allocate); - } - } - hic->pushBackTopLevelExprs(expr); - if (expr->isA()) { - auto wait = IrBuilder::create(expr->as()); - hic->pushBackTopLevelExprs(wait); - } - } + "Expr executed as a standalone op cannot be fused"); + hic->pushBackTopLevelExprs(ir_cloner.clone(group->exprs().at(0))); } else { auto host_unit = IrBuilder::create( staged_fusion->makeFusion(group).second); @@ -684,6 +693,32 @@ std::unique_ptr HostIrLower::lower( hic->addOutput(ir_cloner.clone(output)); } + std::vector new_top_level_exprs; + for (auto top_level_expr : hic->topLevelExprs()) { + if (!isResharding(top_level_expr)) { + new_top_level_exprs.push_back(top_level_expr); + continue; + } + for (auto* expr : HostIrLower::lower(top_level_expr, my_device_index)) { + // Allocate the recv buffers of communications + if (expr->isA()) { + auto* communication = expr->as(); + TensorView* tv = communication->out(); + if (tv->getDeviceMesh().has(my_device_index)) { + auto* allocate = + IrBuilder::create(tv, MemoryType::Global); + new_top_level_exprs.push_back(allocate); + } + } + new_top_level_exprs.push_back(expr); + if (expr->isA()) { + auto wait = IrBuilder::create(expr->as()); + new_top_level_exprs.push_back(wait); + } + } + } + hic->resetTopLevelExprs(new_top_level_exprs); + return hic; } diff --git a/csrc/host_ir/lower.h b/csrc/host_ir/lower.h index 314c880c264..5ce2386a187 100644 --- a/csrc/host_ir/lower.h +++ b/csrc/host_ir/lower.h @@ -7,6 +7,7 @@ // clang-format on #pragma once +#include #include #include #include @@ -35,6 +36,12 @@ class HostIrLower { std::unique_ptr fusion, DeviceIdxType my_device_index); + static bool isLoweredAsStandaloneHostOp(Expr* expr); + + static bool ShouldMergeSegmentedGroups( + SegmentedGroup* group1, + SegmentedGroup* group2); + private: std::vector lowerToCollectiveBasedPipelinedGemmComm(Expr* expr); const HostIrLowerParams params_; diff --git a/tests/cpp/test_resharding.cpp b/tests/cpp/test_resharding.cpp index 3606582cdb1..69e6b307f42 100644 --- a/tests/cpp/test_resharding.cpp +++ b/tests/cpp/test_resharding.cpp @@ -53,7 +53,7 @@ class ReshardingTest : public NVFuserFixtureParamTest { .run_combine_reductions = false, .run_herrmann_merge = true, .run_final_merge = true, - .only_segment_resharding_exprs = true}; + .custom_should_merge_groups = &HostIrLower::ShouldMergeSegmentedGroups}; auto segmented_fusion = SegmentCandidateFinder::segment( std::move(fusion_), KernelArgumentHolder(), options, true); From 46c6717d5bea2a4cdf11bc418ac4e3dc201ac3a5 Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 26 Mar 2025 05:55:05 -0700 Subject: [PATCH 04/19] lint --- csrc/host_ir/lower.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index 4838961ed85..9741aca1bf0 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -594,12 +594,12 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( bool HostIrLower::isLoweredAsStandaloneHostOp(Expr* expr) { return expr->isOneOf< - MatmulOp, - SliceOp, - SelectOp, - LinearOp, - Communication, - P2PCommunication>(); + MatmulOp, + SliceOp, + SelectOp, + LinearOp, + Communication, + P2PCommunication>(); } bool HostIrLower::ShouldMergeSegmentedGroups( From 73d5d7b4db958dc18698ae1edd577f63d12911e9 Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 26 Mar 2025 07:26:05 -0700 Subject: [PATCH 05/19] put back isResharding as the condition for lower to a standalone host expr --- csrc/host_ir/lower.cpp | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index 9741aca1bf0..b2622cccd90 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -593,13 +593,7 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( } bool HostIrLower::isLoweredAsStandaloneHostOp(Expr* expr) { - return expr->isOneOf< - MatmulOp, - SliceOp, - SelectOp, - LinearOp, - Communication, - P2PCommunication>(); + return isResharding(expr); } bool HostIrLower::ShouldMergeSegmentedGroups( From e35ddd0302b26c05464de7efc36de92660d871f3 Mon Sep 17 00:00:00 2001 From: snordmann Date: Fri, 11 Apr 2025 05:01:53 -0700 Subject: [PATCH 06/19] minor comments --- csrc/fusion_segmenter.cpp | 2 +- csrc/fusion_segmenter.h | 4 ++-- csrc/host_ir/lower.cpp | 12 ++++++------ csrc/host_ir/lower.h | 4 ++-- tests/cpp/test_resharding.cpp | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/csrc/fusion_segmenter.cpp b/csrc/fusion_segmenter.cpp index 90e5ba20cf8..b522f6d11ef 100644 --- a/csrc/fusion_segmenter.cpp +++ b/csrc/fusion_segmenter.cpp @@ -3920,7 +3920,7 @@ bool SegmentCandidateFinder::codeGenSupportedMerge( areDirectlyConnected(group1, group2), "only support testing immediate producer-consumer groups"); if (options_.custom_should_merge_groups != nullptr) { - return (*options_.custom_should_merge_groups)(group1, group2); + return (options_.custom_should_merge_groups)(group1, group2); } return tryMerge(segmented_fusion_.get(), runtimeInfo(), group1, group2) != SchedulerType::None; diff --git a/csrc/fusion_segmenter.h b/csrc/fusion_segmenter.h index cc721d59301..6cca1f0c727 100644 --- a/csrc/fusion_segmenter.h +++ b/csrc/fusion_segmenter.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -501,8 +502,7 @@ struct SegmentCandidateFinderOptions { bool run_combine_reductions = true; bool run_herrmann_merge = true; bool run_final_merge = true; - bool (*custom_should_merge_groups)(SegmentedGroup*, SegmentedGroup*) = - nullptr; + std::function custom_should_merge_groups = nullptr; }; //! SegmentCandidateFinder diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index b2622cccd90..d647f0d442e 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -592,16 +592,16 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( get_current_stream, allocate_tva_allgathered, allocate_tv_out, for_loop}; } -bool HostIrLower::isLoweredAsStandaloneHostOp(Expr* expr) { +bool HostIrLower::isLowerableAsStandaloneHostOp(Expr* expr) { return isResharding(expr); } -bool HostIrLower::ShouldMergeSegmentedGroups( +bool HostIrLower::shouldMergeSegmentedGroups( SegmentedGroup* group1, SegmentedGroup* group2) { for (auto group : {group1, group2}) { - for (auto expr : group->exprs()) { - if (isLoweredAsStandaloneHostOp(expr)) { + for (Expr* expr : group->exprs()) { + if (isLowerableAsStandaloneHostOp(expr)) { return false; } } @@ -632,7 +632,7 @@ std::unique_ptr HostIrLower::lower( .run_combine_reductions = false, .run_herrmann_merge = true, .run_final_merge = true, - .custom_should_merge_groups = &ShouldMergeSegmentedGroups}; + .custom_should_merge_groups = &shouldMergeSegmentedGroups}; std::unique_ptr staged_fusion = SegmentCandidateFinder::segment( std::move(fusion), KernelArgumentHolder(), options, true); @@ -667,7 +667,7 @@ std::unique_ptr HostIrLower::lower( if (std::all_of( group->exprs().begin(), group->exprs().end(), - isLoweredAsStandaloneHostOp)) { + isLowerableAsStandaloneHostOp)) { NVF_ERROR( group->exprs().size() == 1, "Expr executed as a standalone op cannot be fused"); diff --git a/csrc/host_ir/lower.h b/csrc/host_ir/lower.h index 5ce2386a187..5b1ecadece8 100644 --- a/csrc/host_ir/lower.h +++ b/csrc/host_ir/lower.h @@ -36,9 +36,9 @@ class HostIrLower { std::unique_ptr fusion, DeviceIdxType my_device_index); - static bool isLoweredAsStandaloneHostOp(Expr* expr); + static bool isLowerableAsStandaloneHostOp(Expr* expr); - static bool ShouldMergeSegmentedGroups( + static bool shouldMergeSegmentedGroups( SegmentedGroup* group1, SegmentedGroup* group2); diff --git a/tests/cpp/test_resharding.cpp b/tests/cpp/test_resharding.cpp index 69e6b307f42..5e4bd2b749a 100644 --- a/tests/cpp/test_resharding.cpp +++ b/tests/cpp/test_resharding.cpp @@ -53,7 +53,7 @@ class ReshardingTest : public NVFuserFixtureParamTest { .run_combine_reductions = false, .run_herrmann_merge = true, .run_final_merge = true, - .custom_should_merge_groups = &HostIrLower::ShouldMergeSegmentedGroups}; + .custom_should_merge_groups = &HostIrLower::shouldMergeSegmentedGroups}; auto segmented_fusion = SegmentCandidateFinder::segment( std::move(fusion_), KernelArgumentHolder(), options, true); From 4964680a32b58bb0818a784eff6e758625e49ace Mon Sep 17 00:00:00 2001 From: snordmann Date: Fri, 11 Apr 2025 05:07:59 -0700 Subject: [PATCH 07/19] lint --- csrc/fusion_segmenter.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/csrc/fusion_segmenter.h b/csrc/fusion_segmenter.h index 6cca1f0c727..a975716676d 100644 --- a/csrc/fusion_segmenter.h +++ b/csrc/fusion_segmenter.h @@ -502,7 +502,8 @@ struct SegmentCandidateFinderOptions { bool run_combine_reductions = true; bool run_herrmann_merge = true; bool run_final_merge = true; - std::function custom_should_merge_groups = nullptr; + std::function + custom_should_merge_groups = nullptr; }; //! SegmentCandidateFinder From ed8dc7c825a466f6cf9b3e9ebbe8fb8017eda182 Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 26 Mar 2025 05:52:08 -0700 Subject: [PATCH 08/19] add host ir support for set reduce and binary op --- csrc/host_ir/executor.cpp | 99 +++++++++++++ csrc/host_ir/executor.h | 3 + csrc/host_ir/lower.cpp | 13 +- tests/cpp/test_host_irs.cpp | 180 ++++++++++++++++++++++++ tests/cpp/test_multidevice_pipeline.cpp | 131 ----------------- 5 files changed, 293 insertions(+), 133 deletions(-) diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 30bca9920db..4c0e7f441eb 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -570,6 +570,31 @@ void HostIrEvaluator::handle(LinearOp* linear) { } } +void HostIrEvaluator::handle(LoadStoreOp* load_store_op) { + NVF_ERROR( + load_store_op->out()->isA(), "out must be a TensorView"); + auto* out_tv = load_store_op->out()->as(); + auto in_tensor = getKnownConcreteData(load_store_op->in()).as(); + + // If output has root domain, compute and apply permutation + if (out_tv->hasRoot()) { + auto permutation = ir_utils::computePermutation( + out_tv->getRootDomain(), out_tv->getLogicalDomain()); + NVF_ERROR( + permutation.has_value(), + "The logical domain of a Set.Permute is supposed to be a permutation of the root domain: ", + out_tv->toString()); + in_tensor = in_tensor.permute(*permutation).contiguous(); + } + if (!isKnown(load_store_op->out())) { + bind(load_store_op->out(), in_tensor); + } else { + auto out_tensor = + getKnownConcreteData(load_store_op->out()).as(); + out_tensor.copy_(in_tensor); + } +} + void HostIrEvaluator::handle(kir::Allocate* allocate) { NVF_ERROR( allocate->buffer()->isA(), @@ -593,6 +618,80 @@ void HostIrEvaluator::handle(kir::Allocate* allocate) { bind(tv, tensor); } +void HostIrEvaluator::handle(BinaryOp* binary_op) { + if (!isKnown(binary_op->outputs().at(0))) { + return unhandled(binary_op); + } + + auto lhs = getKnownConcreteData(binary_op->inputs().at(0)).as(); + auto rhs = getKnownConcreteData(binary_op->inputs().at(1)).as(); + auto output = + getKnownConcreteData(binary_op->outputs().at(0)).as(); + + switch (binary_op->getBinaryOpType()) { + case BinaryOpType::Add: + at::add_out(output, lhs, rhs); + break; + case BinaryOpType::Sub: + at::sub_out(output, lhs, rhs); + break; + case BinaryOpType::Mul: + at::mul_out(output, lhs, rhs); + break; + case BinaryOpType::Div: + at::div_out(output, lhs, rhs); + break; + default: + NVF_CHECK( + false, + "Unexpected operator type: ", + binary_op->getBinaryOpType(), + " in ", + binary_op); + } +} + +void HostIrEvaluator::handle(ReductionOp* reduction_op) { + auto input_tv = reduction_op->in()->as(); + auto output_tv = reduction_op->out()->as(); + if (!isKnown(output_tv)) { + return unhandled(reduction_op); + } + + NVF_ERROR( + !output_tv->hasRoot(), + "Evaluation for rFactored reductions is not supported."); + auto input = getKnownConcreteData(input_tv).as(); + auto output = getKnownConcreteData(output_tv).as(); + + std::vector reduction_axes; + for (const auto i : + c10::irange(int64_t(output_tv->getLogicalDomain().size()))) { + auto ax = output_tv->getLogicalDomain().at(i); + if (ax->isReduction()) { + reduction_axes.push_back(i); + } + } + switch (reduction_op->getReductionOpType()) { + case BinaryOpType::Add: + at::sum_out(output, input, reduction_axes); + return; + case BinaryOpType::Max: + at::amax_out(output, input, reduction_axes); + return; + case BinaryOpType::Min: + at::amin_out(output, input, reduction_axes); + return; + default: + NVF_CHECK( + false, + "Unexpected operator type: ", + reduction_op->getReductionOpType(), + " in ", + reduction_op); + } +} + void HostIrEvaluator::unhandled(Statement* stmt) { NVF_ERROR(stmt->isA(), stmt, " must be an Expr"); auto* expr = stmt->as(); diff --git a/csrc/host_ir/executor.h b/csrc/host_ir/executor.h index 8f4d425fd09..faf301f0819 100644 --- a/csrc/host_ir/executor.h +++ b/csrc/host_ir/executor.h @@ -133,6 +133,9 @@ class HostIrEvaluator final : public OptOutDispatch { void handle(MatmulOp* matmul) override; void handle(LinearOp* linear) override; void handle(kir::Allocate* allocate) override; + void handle(LoadStoreOp* load_store_op) override; + void handle(BinaryOp* binary_op) override; + void handle(ReductionOp* reduction_op) override; void unhandled(Statement* stmt) override; c10::cuda::CUDAStream getCUDAStream(Stream* stream); diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index d647f0d442e..d8302a8a119 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -592,8 +592,17 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( get_current_stream, allocate_tva_allgathered, allocate_tv_out, for_loop}; } -bool HostIrLower::isLowerableAsStandaloneHostOp(Expr* expr) { - return isResharding(expr); +bool HostIrLower::isLoweredAsStandaloneHostOp(Expr* expr) { + return expr->isOneOf< + MatmulOp, + SliceOp, + SelectOp, + LinearOp, + LoadStoreOp, + BinaryOp, + ReductionOp, + Communication, + P2PCommunication>(); } bool HostIrLower::shouldMergeSegmentedGroups( diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index 654e60c5d31..751d5251aab 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -1276,6 +1276,186 @@ TEST_F(HirAlias, ThrowOnInputAlias) { EXPECT_ANY_THROW(HostIrEvaluator hie(std::move(hic))); } +using HirSetTest = NVFuserTest; + +TEST_F(HirSetTest, HostIr) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* in = makeConcreteTensor(sizes); + auto* out = makeConcreteTensor(sizes); + auto* set = IrBuilder::create(LoadStoreOpType::Set, out, in); + hic->addInput(in); + hic->addInput(out); + hic->pushBackTopLevelExprs(set); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto in_aten = at::randn(sizes, options); + auto out_aten = at::empty(sizes, options); + + hie.runWithInput({{in, in_aten}, {out, out_aten}}); + + EXPECT_TRUE(out_aten.equal(in_aten)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << in_aten; +} + +class HirBinaryOpTest : public NVFuserFixtureParamTest { + protected: + at::Tensor executeBinaryOp(at::Tensor lhs, at::Tensor rhs) { + switch (GetParam()) { + case BinaryOpType::Add: + return lhs + rhs; + case BinaryOpType::Sub: + return lhs - rhs; + case BinaryOpType::Mul: + return lhs * rhs; + case BinaryOpType::Div: + return lhs / rhs; + default: + NVF_ERROR("Unsupported binary op type ", GetParam()); + return at::Tensor(); + } + } +}; + +TEST_P(HirBinaryOpTest, PreAllocatedOutputs) { + const std::vector sizes = {8, 64}; + const auto& binary_op_type = GetParam(); + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* lhs = makeConcreteTensor(sizes); + auto* rhs = makeConcreteTensor(sizes); + auto* out = makeConcreteTensor(sizes); + auto* binary_op = IrBuilder::create(binary_op_type, out, lhs, rhs); + hic->addInput(lhs); + hic->addInput(rhs); + hic->addInput(out); + hic->pushBackTopLevelExprs(binary_op); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto lhs_aten = at::randn(sizes, options); + auto rhs_aten = at::randn(sizes, options); + auto out_aten = at::empty(sizes, options); + + hie.runWithInput({{lhs, lhs_aten}, {rhs, rhs_aten}, {out, out_aten}}); + + at::Tensor expected_out = executeBinaryOp(lhs_aten, rhs_aten); + EXPECT_TRUE(expected_out.equal(out_aten)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + +TEST_P(HirBinaryOpTest, NonPreAllocatedOutputs) { + const std::vector sizes = {8, 64}; + const auto& binary_op_type = GetParam(); + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* lhs = makeConcreteTensor(sizes); + auto* rhs = makeConcreteTensor(sizes); + auto* out = binaryOp(binary_op_type, lhs, rhs); + hic->addInput(lhs); + hic->addInput(rhs); + hic->addOutput(out); + hic->pushBackTopLevelExprs(out->definition()); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto lhs_aten = at::randn(sizes, options); + auto rhs_aten = at::randn(sizes, options); + + auto out_aten = + hie.runWithInput({{lhs, lhs_aten}, {rhs, rhs_aten}})[0].as(); + + at::Tensor expected_out = executeBinaryOp(lhs_aten, rhs_aten); + EXPECT_TRUE(expected_out.equal(out_aten)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + +INSTANTIATE_TEST_SUITE_P( + , + HirBinaryOpTest, + testing::Values( + BinaryOpType::Add, + BinaryOpType::Sub, + BinaryOpType::Mul, + BinaryOpType::Div), + [](const testing::TestParamInfo& info) -> std::string { + std::stringstream ss; + ss << "BinaryOpType_" << info.param; + return ss.str(); + }); + +using HirReductionOpTest = NVFuserTest; + +TEST_F(HirReductionOpTest, PreAllocatedOutputs) { + constexpr int64_t size0 = 8, size1 = 64; + constexpr int64_t reduction_axis = 1; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* in = makeConcreteTensor({size0, size1}); + auto* out = newForReduction(in, {reduction_axis}, in->dtype()); + auto* reduction_op = IrBuilder::create( + BinaryOpType::Add, hic->zeroVal(), out, in); + hic->addInput(in); + hic->addOutput(out); + hic->pushBackTopLevelExprs(reduction_op); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto in_aten = at::randn({size0, size1}, options); + auto out_aten = at::empty({size0}, options); + + hie.runWithInput({{in, in_aten}, {out, out_aten}}); + + at::Tensor expected_out = in_aten.sum(reduction_axis); + EXPECT_TRUE(expected_out.equal(out_aten)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + +TEST_F(HirReductionOpTest, NonPreAllocatedOutputs) { + constexpr int64_t size0 = 8, size1 = 64; + constexpr int64_t reduction_axis = 1; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* in = makeConcreteTensor({size0, size1}); + auto* out = sum(in, {reduction_axis}); + hic->addInput(in); + hic->addOutput(out); + hic->pushBackTopLevelExprs(out->definition()); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto in_aten = at::randn({size0, size1}, options); + auto out_aten = at::empty({size0}, options); + + hie.runWithInput({{in, in_aten}, {out, out_aten}}); + + at::Tensor expected_out = in_aten.sum(reduction_axis); + EXPECT_TRUE(expected_out.equal(out_aten)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + } // namespace hir } // namespace nvfuser diff --git a/tests/cpp/test_multidevice_pipeline.cpp b/tests/cpp/test_multidevice_pipeline.cpp index 5985571c57a..12dfed5dd43 100644 --- a/tests/cpp/test_multidevice_pipeline.cpp +++ b/tests/cpp/test_multidevice_pipeline.cpp @@ -457,135 +457,4 @@ INSTANTIATE_TEST_SUITE_P( testing::Values(0, 1), testing::Values(true))); -// Different scheduling modes used in -// PipelineTestStagedReduction.StagedReduction -enum class SchedulingMode { - // Manual interdevice scheduling, no intra-device scheduling - InterDeviceOnly, - // Manual inter-/intra-device scheduling - Manual, - // Manual inter-device scheduling, composed with fully automated intra-device - // scheduling (through FusionExecutorCache) - Automatic, -}; - -std::ostream& operator<<(std::ostream& out, const SchedulingMode& mode) { - switch (mode) { - case SchedulingMode::InterDeviceOnly: - out << "InterDeviceOnly"; - break; - case SchedulingMode::Manual: - out << "Manual"; - break; - case SchedulingMode::Automatic: - out << "Automatic"; - break; - } - return out; -} - -class PipelineTestStagedReduction - : public PipelineTest, - public ::testing::WithParamInterface {}; - -// 1D staged reduction -// Inputs: X[num_devices,B,C] -TEST_P(PipelineTestStagedReduction, StagedReduction) { - auto scheduling_mode = GetParam(); - - const int num_devices = communicator_->size(); - constexpr int B = 8; - constexpr int C = 64; - - FusionGuard fg(fusion.get()); - // The first dimension is made symbolic so `tv_out->definition()` won't - // become a squeeze when num_devices == 1. This wouldn't be a problem for - // automatic mode. However, for the manual mode, the scheduling code below - // assumes `tv_out->definition()` can be lowered to communication. A squeeze - // can't. - TensorView* tv0 = TensorViewBuilder() - .dtype(DataType::Float) - .contiguity(true) - .shape({-1, B, C}) - .build(); - auto mesh = DeviceMesh::createForNumDevices(num_devices); - tv0->setDeviceMesh(mesh); - TensorView* tv1 = sum(tv0, {2}); - TensorView* tv_out = sum(tv1, {0}); - fusion->addInput(tv0); - fusion->addOutput(tv_out); - - for (auto* tv : {tv0, tv1}) { - tv->axis(0)->parallelize(ParallelType::DIDx); - } - - // Intra-device reduction scheduling for the first reduction: - switch (scheduling_mode) { - case SchedulingMode::InterDeviceOnly: - break; - case SchedulingMode::Manual: { - // inspired from NVFuserTest.FusionReduction1_CUDA - // tv0[I0{A}, I1{B}, I2{C}] - tv1->split(2, 32); - // tv1[I0{A}, I1{B}, R2o{C/32}, R2i{32}] = tv0[I0{A}, I1{B}, I2{C}] - tv1->split(2, 4); - // clang-format off - // tv1[I0{A}, I1{B}, R2oo{C/32/4)}, R2oi{4}, R2i{32}] = tv0[I0{A}, I1{B}, I2{C}] - // clang-format on - - TensorView* tv2 = tv1->rFactor({2}); - // clang-format off - // tv2[I0{A}, I1{B}, R2oo{C/32/4)}, I2oi{4}, I2i{32}] = tv0[I0{A}, I1{B}, I2{C}] - // tv1[I0{A}, I1{B}, R2oi{4}, R2i{32}] = tv2[I0{A}, I1{B}, R2oo{C/32/4)}, I2oi{4}, I2i{32}] - // clang-format on - - TensorView* tv3 = tv1->rFactor({2}); - // clang-format off - // tv2[I0{A}, I1{B}, R2oo{C/32/4)}, I2oi{4}, I2i{32}] = tv0[I0{A}, I1{B}, I2{C}] - // tv3[I0{A}, I1{B}, R2oi{4}, I2i{32}] = tv2[I0{A}, I1{B}, R2oo{C/32/4)}, I2oi{4}, I2i{32}] - // tv1[I0{A}, I1{B}, R2i{32}] = tv3[I0{A}, I1{B}, R2oi{4}, I2i{32}] - // clang-format on - - // tv1 is a segment boundary so must be in global. This wouldn't be - // needed if the fusion were scheduled automatically. - tv1->setMemoryType(MemoryType::Global); - - // Use `tv2` as the reference tensor because it contains the most - // parallel IterDomains. - tv2->axis(1)->parallelize(ParallelType::BIDx); - tv2->axis(3)->parallelize(ParallelType::Unroll); - tv2->axis(-1)->parallelize(ParallelType::TIDx); - scheduler_utils::parallelizeAllLike( - tv2, - /*pos=*/-1, - // Don't propagate the parallelization to `tv_out` because that's in - // a different, resharding segment. - /*selected_tv=*/{tv0, tv1, tv2, tv3}); - inlineMost(); - break; - } - case SchedulingMode::Automatic: - host_ir_executor_params.use_fusion_executor_cache = true; - break; - } - - at::Tensor unsharded_input_tensor = - at::randn({num_devices, B, C}, tensor_options); - at::Tensor ref_unsharded_output_tensor = - unsharded_input_tensor.sum(at::IntArrayRef({0, 2})); - unsharded_args = {unsharded_input_tensor}; - ref_unsharded_outputs = {ref_unsharded_output_tensor}; - - executeAndValidate(/* validate_with_prescribed_values */ true); -} - -INSTANTIATE_TEST_SUITE_P( - , - PipelineTestStagedReduction, - testing::Values( - SchedulingMode::InterDeviceOnly, - SchedulingMode::Manual, - SchedulingMode::Automatic), - testing::PrintToStringParamName()); - } // namespace nvfuser From 85b7b751b82c31f5b019773d5adfdec21210ab8b Mon Sep 17 00:00:00 2001 From: snordmann Date: Fri, 11 Apr 2025 06:26:51 -0700 Subject: [PATCH 09/19] move .contiguous to be in postScatter --- csrc/host_ir/executor.cpp | 2 +- csrc/multidevice/communication.cpp | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 4c0e7f441eb..ab3e08bfb6f 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -584,7 +584,7 @@ void HostIrEvaluator::handle(LoadStoreOp* load_store_op) { permutation.has_value(), "The logical domain of a Set.Permute is supposed to be a permutation of the root domain: ", out_tv->toString()); - in_tensor = in_tensor.permute(*permutation).contiguous(); + in_tensor = in_tensor.permute(*permutation); } if (!isKnown(load_store_op->out())) { bind(load_store_op->out(), in_tensor); diff --git a/csrc/multidevice/communication.cpp b/csrc/multidevice/communication.cpp index 13fc3ce36ea..463cffd2879 100644 --- a/csrc/multidevice/communication.cpp +++ b/csrc/multidevice/communication.cpp @@ -352,6 +352,11 @@ c10::intrusive_ptr postScatter( c10d::Backend* backend, at::Tensor input_tensor, at::Tensor output_tensor) { + + if (my_device_index == communication->root()) { + input_tensor = input_tensor.contiguous(); + } + if (my_device_index == communication->root() && !communication->out()->getDeviceMesh().has(communication->root())) { output_tensor = at::empty_like(input_tensor.slice(0, 0, 1)); From 01e94a7bf3dd6ab271338d23853cd37117ece3af Mon Sep 17 00:00:00 2001 From: snordmann Date: Fri, 11 Apr 2025 06:57:46 -0700 Subject: [PATCH 10/19] lint and build issue --- csrc/host_ir/lower.cpp | 2 +- csrc/multidevice/communication.cpp | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index d8302a8a119..93badafceb4 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -592,7 +592,7 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( get_current_stream, allocate_tva_allgathered, allocate_tv_out, for_loop}; } -bool HostIrLower::isLoweredAsStandaloneHostOp(Expr* expr) { +bool HostIrLower::isLowerableAsStandaloneHostOp(Expr* expr) { return expr->isOneOf< MatmulOp, SliceOp, diff --git a/csrc/multidevice/communication.cpp b/csrc/multidevice/communication.cpp index 463cffd2879..b8bcf857fa2 100644 --- a/csrc/multidevice/communication.cpp +++ b/csrc/multidevice/communication.cpp @@ -352,7 +352,6 @@ c10::intrusive_ptr postScatter( c10d::Backend* backend, at::Tensor input_tensor, at::Tensor output_tensor) { - if (my_device_index == communication->root()) { input_tensor = input_tensor.contiguous(); } From e1db5183158a3662827a6cb70caae7e63c0e2191 Mon Sep 17 00:00:00 2001 From: snordmann Date: Mon, 14 Apr 2025 06:49:48 -0700 Subject: [PATCH 11/19] reviews --- csrc/host_ir/container.h | 2 +- csrc/host_ir/executor.h | 2 +- tests/cpp/test_host_irs.cpp | 41 ++++++++++++++++++++++++++++++++----- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/csrc/host_ir/container.h b/csrc/host_ir/container.h index 7dcd66b4436..07bd896790b 100644 --- a/csrc/host_ir/container.h +++ b/csrc/host_ir/container.h @@ -56,7 +56,7 @@ class HostIrContainer final : public Fusion { Stream* getDefaultStream(); void markAlias(TensorView* original, const TensorView* new_alias) { - if (alias_.count(original)) { + while (alias_.count(original)) { original = alias_[original]->as(); } alias_[new_alias] = original; diff --git a/csrc/host_ir/executor.h b/csrc/host_ir/executor.h index 8f4d425fd09..c1486285d19 100644 --- a/csrc/host_ir/executor.h +++ b/csrc/host_ir/executor.h @@ -140,7 +140,7 @@ class HostIrEvaluator final : public OptOutDispatch { Val* getAlias(Val* val) const { const auto& aliases = container_->alias(); auto it = aliases.find(val); - return it != aliases.end() ? it->second : val; + return it != aliases.end() ? getAlias(it->second) : val; } bool isKnown(Val* value) const { diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index 654e60c5d31..6a41e47c744 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -139,7 +139,7 @@ TEST_P(HostIrTest, SingleFusion) { auto outputs = hie.runWithInput({{post_on_stream->inputs().at(0), t0}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); + EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); } /* @@ -236,7 +236,7 @@ TEST_P(HostIrTest, TwoFusions) { auto outputs = hie.runWithInput({{post_on_stream_0->inputs().at(0), t0}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); + EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); } /* @@ -365,7 +365,7 @@ TEST_P(HostIrTest, ThreeFusions) { auto outputs = hie.runWithInput({{post_on_stream_0->inputs().at(0), t0_0}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(t2_2, outputs[0].as())); + EXPECT_TRUE(torch::allclose(t2_2, outputs[0].as())); } // This unit test the for-loop IR by implementing a program that could be @@ -508,7 +508,7 @@ TEST_P(HostIrTest, PreAllocatedOutputs) { {post_on_stream->outputs().at(0), output}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(ref_output, output)) + EXPECT_TRUE(torch::allclose(ref_output, output)) << "Output: " << output << " Expected: " << ref_output; } @@ -724,7 +724,7 @@ TEST_P(StreamHostIrTest, SingleFusionMultipleStreams) { // validate the obtained results for (int i = 0; i < n_iterations; i++) { - GTEST_EXPECT_TRUE(torch::allclose(ref_output, outputs[i].as())); + EXPECT_TRUE(torch::allclose(ref_output, outputs[i].as())); } EXPECT_NE( c10::cuda::getDefaultCUDAStream(0), c10::cuda::getCurrentCUDAStream(0)); @@ -1262,6 +1262,37 @@ TEST_F(HirAlias, SetAndGet) { << "Expected output: " << expected_out; } +TEST_F(HirAlias, SetAndGetReversedOrder) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + TensorView* tv0 = makeConcreteTensor(sizes); + TensorView* tv1 = set(tv0); + TensorView* tv2 = makeConcreteTensor(sizes); + TensorView* tv3 = set(tv2); + TensorView* tv4 = makeConcreteTensor(sizes); + hic->markAlias(tv3, tv4); + hic->markAlias(tv1, tv2); + hic->addInput(tv0); + hic->addOutput(tv4); + hic->pushBackTopLevelExprs(tv1->definition()); + hic->pushBackTopLevelExprs(tv3->definition()); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + at::Tensor tv0_aten = at::randn(sizes, options); + + at::Tensor out_aten = hie.runWithInput({{tv0, tv0_aten}})[0].as(); + + at::Tensor expected_out = tv0_aten; + EXPECT_TRUE(out_aten.equal(expected_out)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + TEST_F(HirAlias, ThrowOnInputAlias) { const std::vector sizes = {8, 64}; From 59622ff48520269c02ed7a6893fd32c39a2a5848 Mon Sep 17 00:00:00 2001 From: snordmann Date: Tue, 15 Apr 2025 09:33:13 -0700 Subject: [PATCH 12/19] add comment --- csrc/fusion_segmenter.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/csrc/fusion_segmenter.h b/csrc/fusion_segmenter.h index a975716676d..7141085dcdf 100644 --- a/csrc/fusion_segmenter.h +++ b/csrc/fusion_segmenter.h @@ -502,6 +502,10 @@ struct SegmentCandidateFinderOptions { bool run_combine_reductions = true; bool run_herrmann_merge = true; bool run_final_merge = true; + // if provided, this custom function will be used to determine if two groups + // should be merged. If not provided, the tryMerge function will be used. This + // option is used in the context of MultiGpus where we proceed to a first + // segmentation to scoop out communications from compute. std::function custom_should_merge_groups = nullptr; }; From 25c618c787ef37cf671cae3d652c9cc778443d52 Mon Sep 17 00:00:00 2001 From: snordmann Date: Tue, 15 Apr 2025 11:04:14 -0700 Subject: [PATCH 13/19] add comment --- csrc/fusion_segmenter.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/csrc/fusion_segmenter.cpp b/csrc/fusion_segmenter.cpp index b522f6d11ef..c98786a086d 100644 --- a/csrc/fusion_segmenter.cpp +++ b/csrc/fusion_segmenter.cpp @@ -3919,6 +3919,10 @@ bool SegmentCandidateFinder::codeGenSupportedMerge( NVF_ERROR( areDirectlyConnected(group1, group2), "only support testing immediate producer-consumer groups"); + // The segmemter should ideally be redesigned to be more flexible and + // decoupled from the schedulers, but for now, we just return + // `SchedulerType::None` as it is not relevant when the segmenter is + // used with a custom should-merge function. if (options_.custom_should_merge_groups != nullptr) { return (options_.custom_should_merge_groups)(group1, group2); } From eb46aef5aa46340e883d858fb4e707f27e6d28d3 Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 16 Apr 2025 02:22:24 -0700 Subject: [PATCH 14/19] minor comment --- csrc/host_ir/container.cpp | 2 +- csrc/host_ir/executor.cpp | 22 +++++++++++----------- csrc/host_ir/executor.h | 2 +- tests/cpp/test_host_irs.cpp | 1 + 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/csrc/host_ir/container.cpp b/csrc/host_ir/container.cpp index a133c0521ba..83e668770fc 100644 --- a/csrc/host_ir/container.cpp +++ b/csrc/host_ir/container.cpp @@ -26,7 +26,7 @@ HostIrContainer::HostIrContainer(int64_t num_kernel_executors) HostIrContainer::~HostIrContainer() = default; Stream* HostIrContainer::getDefaultStream() { - if (!default_stream_) { + if (default_stream_ == nullptr) { default_stream_ = IrBuilder::createInContainer(this); } return default_stream_; diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 551f35487c9..89710eaae4b 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -327,7 +327,7 @@ void HostIrEvaluator::handle(Synchronize* synchronize) { void HostIrEvaluator::handle(LaunchKernel* launch_kernel) { KernelArgumentHolder args; for (auto& input : launch_kernel->inputs()) { - args.push(getKnownConcreteData(input)); + args.push(getKnownConcreteValue(input)); } args.setDeviceIndex(); @@ -349,7 +349,7 @@ void HostIrEvaluator::handle(LaunchKernel* launch_kernel) { void HostIrEvaluator::handle(PostOnStream* post_ir) { KernelArgumentHolder input_args; for (auto& input : post_ir->inputs()) { - input_args.push(getKnownConcreteData(input)); + input_args.push(getKnownConcreteValue(input)); } input_args.setDeviceIndex(); // placeholder for storing the outputs @@ -368,7 +368,7 @@ void HostIrEvaluator::handle(PostOnStream* post_ir) { post_ir); if (use_preallocated_outputs) { for (auto output : post_ir->outputs()) { - outputs.push(getKnownConcreteData(output)); + outputs.push(getKnownConcreteValue(output)); } } @@ -599,9 +599,9 @@ void HostIrEvaluator::handle(MatmulOp* matmul) { TensorView* out = matmul->out(); if (isKnown(out)) { - auto t_a = getKnownConcreteData(a).as(); - auto t_b = getKnownConcreteData(b).as(); - auto t_out = getKnownConcreteData(out).as(); + auto t_a = getKnownConcreteValue(a).as(); + auto t_b = getKnownConcreteValue(b).as(); + auto t_out = getKnownConcreteValue(out).as(); at::matmul_out(t_out, t_a, t_b); } else { unhandled(matmul); @@ -619,12 +619,12 @@ void HostIrEvaluator::handle(LinearOp* linear) { return; } - auto in_at = getKnownConcreteData(in).as(); - auto weight_at = getKnownConcreteData(weight).as(); - auto out_at = getKnownConcreteData(out).as(); + auto in_at = getKnownConcreteValue(in).as(); + auto weight_at = getKnownConcreteValue(weight).as(); + auto out_at = getKnownConcreteValue(out).as(); if (linear->has_bias()) { - auto bias_at = getKnownConcreteData(bias).as(); + auto bias_at = getKnownConcreteValue(bias).as(); at::linear_out(out_at, in_at, weight_at.squeeze(), bias_at.squeeze()); } else { at::linear_out(out_at, in_at, weight_at.squeeze()); @@ -661,7 +661,7 @@ void HostIrEvaluator::unhandled(Statement* stmt) { for (auto input : expr->inputs()) { if (input->isA()) { // Tensor inputs must be already computed at this point - inputs.push_back(getKnownConcreteData(input)); + inputs.push_back(getKnownConcreteValue(input)); } else { inputs.push_back(expr_evaluator_.evaluate(input)); } diff --git a/csrc/host_ir/executor.h b/csrc/host_ir/executor.h index b603359d5b3..d71b74e0dda 100644 --- a/csrc/host_ir/executor.h +++ b/csrc/host_ir/executor.h @@ -149,7 +149,7 @@ class HostIrEvaluator final : public OptOutDispatch { return expr_evaluator_.isKnown(getAlias(value)); } - PolymorphicValue getKnownConcreteData(Val* val) const { + PolymorphicValue getKnownConcreteValue(Val* val) const { NVF_ERROR( isKnown(val), "value ", diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index 6a41e47c744..f8660f3bb83 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -6,6 +6,7 @@ */ // clang-format on #include +#include #include #include From 5f161f5357d9994cd8e9122e06262b02a343103b Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 16 Apr 2025 02:44:03 -0700 Subject: [PATCH 15/19] lint --- tests/cpp/test_host_irs.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index f8660f3bb83..6a41e47c744 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -6,7 +6,6 @@ */ // clang-format on #include -#include #include #include From d2655842281d22f3fdc811f0074ea959f12f378f Mon Sep 17 00:00:00 2001 From: snordmann Date: Wed, 16 Apr 2025 03:29:01 -0700 Subject: [PATCH 16/19] Revert "move .contiguous to be in postScatter" This reverts commit 85b7b751b82c31f5b019773d5adfdec21210ab8b. --- csrc/host_ir/executor.cpp | 2 +- csrc/multidevice/communication.cpp | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index e5dd2a86ba1..b973c7f3277 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -645,7 +645,7 @@ void HostIrEvaluator::handle(LoadStoreOp* load_store_op) { permutation.has_value(), "The logical domain of a Set.Permute is supposed to be a permutation of the root domain: ", out_tv->toString()); - in_tensor = in_tensor.permute(*permutation); + in_tensor = in_tensor.permute(*permutation).contiguous(); } if (!isKnown(load_store_op->out())) { bind(load_store_op->out(), in_tensor); diff --git a/csrc/multidevice/communication.cpp b/csrc/multidevice/communication.cpp index c88aa5944e7..041e13eb80c 100644 --- a/csrc/multidevice/communication.cpp +++ b/csrc/multidevice/communication.cpp @@ -378,10 +378,6 @@ c10::intrusive_ptr postScatter( c10d::Backend* backend, at::Tensor input_tensor, at::Tensor output_tensor) { - if (my_device_index == communication->root()) { - input_tensor = input_tensor.contiguous(); - } - if (my_device_index == communication->root() && !communication->out()->getDeviceMesh().has(communication->root())) { output_tensor = at::empty_like(input_tensor.slice(0, 0, 1)); From b55d4e73787f3ca3b841ea2baf769021a85e1199 Mon Sep 17 00:00:00 2001 From: snordmann Date: Fri, 18 Apr 2025 02:35:58 -0700 Subject: [PATCH 17/19] minor comment --- csrc/host_ir/executor.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index b973c7f3277..12cf344e549 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -703,8 +703,7 @@ void HostIrEvaluator::handle(BinaryOp* binary_op) { at::div_out(output, lhs, rhs); break; default: - NVF_CHECK( - false, + NVF_THROW( "Unexpected operator type: ", binary_op->getBinaryOpType(), " in ", @@ -744,8 +743,7 @@ void HostIrEvaluator::handle(ReductionOp* reduction_op) { at::amin_out(output, input, reduction_axes); return; default: - NVF_CHECK( - false, + NVF_THROW( "Unexpected operator type: ", reduction_op->getReductionOpType(), " in ", From 6b479deca59efed1cc9afae42447756bb260c019 Mon Sep 17 00:00:00 2001 From: snordmann Date: Thu, 24 Apr 2025 06:19:59 -0700 Subject: [PATCH 18/19] lower as HIR only set without permute --- csrc/host_ir/executor.cpp | 18 ++++++++---------- csrc/host_ir/lower.cpp | 37 +++++++++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 12cf344e549..3a3c0921d2a 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -632,21 +632,19 @@ void HostIrEvaluator::handle(LinearOp* linear) { } void HostIrEvaluator::handle(LoadStoreOp* load_store_op) { + NVF_ERROR( + load_store_op->opType() == LoadStoreOpType::Set, + "LoadStoreOp must be a Set"); NVF_ERROR( load_store_op->out()->isA(), "out must be a TensorView"); auto* out_tv = load_store_op->out()->as(); auto in_tensor = getKnownConcreteValue(load_store_op->in()).as(); - // If output has root domain, compute and apply permutation - if (out_tv->hasRoot()) { - auto permutation = ir_utils::computePermutation( - out_tv->getRootDomain(), out_tv->getLogicalDomain()); - NVF_ERROR( - permutation.has_value(), - "The logical domain of a Set.Permute is supposed to be a permutation of the root domain: ", - out_tv->toString()); - in_tensor = in_tensor.permute(*permutation).contiguous(); - } + // If output has root domain, it means that the set op is a permute, which we + // don't support currently + NVF_ERROR( + !out_tv->hasRoot(), "the set op", load_store_op, "must not be a permute"); + if (!isKnown(load_store_op->out())) { bind(load_store_op->out(), in_tensor); } else { diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index 308e1399872..fd14096b190 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -615,16 +615,33 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( } bool HostIrLower::isLowerableAsStandaloneHostOp(Expr* expr) { - return expr->isOneOf< - MatmulOp, - SliceOp, - SelectOp, - LinearOp, - LoadStoreOp, - BinaryOp, - ReductionOp, - Communication, - P2PCommunication>(); + if (expr->isOneOf< + MatmulOp, + SliceOp, + SelectOp, + LinearOp, + BinaryOp, + ReductionOp, + Communication, + P2PCommunication>()) { + return true; + } + + // Lower as standalone op "set" ops, i.e., LoadStoreOp of "Set" type with no + // permute + if (expr->isA()) { + auto* load_store = expr->as(); + if (load_store->opType() == LoadStoreOpType::Set && + load_store->out()->isA()) { + auto* tv = load_store->out()->as(); + // If the output tensor has no root, it means it has no permute + if (!tv->hasRoot()) { + return true; + } + } + } + + return false; } bool HostIrLower::shouldMergeSegmentedGroups( From 15552c2a022ec3d65cb74af406eaa7b93422408f Mon Sep 17 00:00:00 2001 From: Jingyue Wu Date: Thu, 24 Apr 2025 22:45:53 -0700 Subject: [PATCH 19/19] Fix handle(LoadStoreOp*) --- csrc/host_ir/executor.cpp | 44 ++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 3a3c0921d2a..4ceef8927ed 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -640,17 +641,44 @@ void HostIrEvaluator::handle(LoadStoreOp* load_store_op) { auto* out_tv = load_store_op->out()->as(); auto in_tensor = getKnownConcreteValue(load_store_op->in()).as(); - // If output has root domain, it means that the set op is a permute, which we - // don't support currently - NVF_ERROR( - !out_tv->hasRoot(), "the set op", load_store_op, "must not be a permute"); - - if (!isKnown(load_store_op->out())) { - bind(load_store_op->out(), in_tensor); + at::Tensor t; + if (out_tv->hasRoot()) { + std::optional> permutation = + ir_utils::computePermutation( + out_tv->getRootDomain(), out_tv->getLogicalDomain()); + NVF_ERROR( + permutation.has_value(), + "The logical domain of a Set.Permute is supposed to be a permutation" + " of the root domain: ", + out_tv); + t = in_tensor.permute(*permutation); } else { + t = in_tensor; + } + + if (isKnown(out_tv)) { auto out_tensor = getKnownConcreteValue(load_store_op->out()).as(); - out_tensor.copy_(in_tensor); + out_tensor.copy_(t); + } else { + // For completeness, we may check if out_tv's allocation matches `t` and + // copy data if yes. For example, + // + // clang-format off + // ``` + // const auto& [sizes, strides] = inferShapeOfOutput(out_tv, expr_evaluator_); + // if (strides == t.strides()) { + // bind(out_tv, t); + // } else { + // auto out_tensor = at::empty_strided(sizes, strides, in_tensor.dtype()); + // out_tensor.copy_(t); + // bind_(out_tv, out_tensor); + // } + // ``` + // clang-format on + // + // For now, I choose to keep code simple for the limited use cases. + bind(out_tv, t); } }