diff --git a/csrc/fusion_segmenter.cpp b/csrc/fusion_segmenter.cpp index cf2e7b13d75..cdb241b3517 100644 --- a/csrc/fusion_segmenter.cpp +++ b/csrc/fusion_segmenter.cpp @@ -3881,15 +3881,12 @@ bool SegmentCandidateFinder::codeGenSupportedMerge( NVF_ERROR( areDirectlyConnected(group1, group2), "only support testing immediate producer-consumer groups"); - if (options_.only_segment_resharding_exprs) { - for (auto group : {group1, group2}) { - for (auto expr : group->exprs()) { - if (isResharding(expr)) { - return false; - } - } - } - return true; + // The segmemter should ideally be redesigned to be more flexible and + // decoupled from the schedulers, but for now, we just return + // `SchedulerType::None` as it is not relevant when the segmenter is + // used with a custom should-merge function. + if (options_.custom_should_merge_groups != nullptr) { + return (options_.custom_should_merge_groups)(group1, group2); } return tryMerge(segmented_fusion_.get(), runtimeInfo(), group1, group2) != SchedulerType::None; @@ -3900,7 +3897,7 @@ bool SegmentCandidateFinder::codeGenSupportedMerge( SchedulerType SegmentCandidateFinder::deriveSchedulerType( SegmentedGroup* group) { FUSER_PERF_SCOPE("SegmentCandidateFinder::deriveSchedulerType"); - if (options_.only_segment_resharding_exprs) { + if (options_.custom_should_merge_groups != nullptr) { // We don't need to generate a SchedulerType for multidevice segments at // this moment return SchedulerType::None; @@ -3920,7 +3917,7 @@ SegmentCandidateFinder::SegmentCandidateFinder( : options_(options), runtime_inputs_(inputs) { FUSER_PERF_SCOPE("SegmentCandidateFinder::SegmentCandidateFinder"); NVF_ERROR( - !options_.only_segment_resharding_exprs || + options_.custom_should_merge_groups == nullptr || (!options_.run_translate_welford && !options_.run_combine_reductions && options_.run_herrmann_merge && options_.run_final_merge), diff --git a/csrc/fusion_segmenter.h b/csrc/fusion_segmenter.h index f675ce00e1f..6d6306ded2d 100644 --- a/csrc/fusion_segmenter.h +++ b/csrc/fusion_segmenter.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -482,7 +483,12 @@ struct SegmentCandidateFinderOptions { bool run_combine_reductions = true; bool run_herrmann_merge = true; bool run_final_merge = true; - bool only_segment_resharding_exprs = false; + // if provided, this custom function will be used to determine if two groups + // should be merged. If not provided, the tryMerge function will be used. This + // option is used in the context of MultiGpus where we proceed to a first + // segmentation to scoop out communications from compute. + std::function + custom_should_merge_groups = nullptr; }; //! SegmentCandidateFinder diff --git a/csrc/host_ir/container.cpp b/csrc/host_ir/container.cpp index ca4c98f2a56..83e668770fc 100644 --- a/csrc/host_ir/container.cpp +++ b/csrc/host_ir/container.cpp @@ -26,7 +26,7 @@ HostIrContainer::HostIrContainer(int64_t num_kernel_executors) HostIrContainer::~HostIrContainer() = default; Stream* HostIrContainer::getDefaultStream() { - if (!default_stream_) { + if (default_stream_ == nullptr) { default_stream_ = IrBuilder::createInContainer(this); } return default_stream_; @@ -35,6 +35,11 @@ Stream* HostIrContainer::getDefaultStream() { std::ostream& HostIrContainer::print(std::ostream& os) const { IrMathPrinter op_exprs(os); op_exprs.handle(this); + os << "Aliases:{"; + for (const auto& alias : alias_) { + os << "\n " << alias.first << " -> " << alias.second; + } + os << "\n}\n"; return os; } diff --git a/csrc/host_ir/container.h b/csrc/host_ir/container.h index ab029f63fa2..97c72dcbf7b 100644 --- a/csrc/host_ir/container.h +++ b/csrc/host_ir/container.h @@ -41,6 +41,10 @@ class HostIrContainer final : public Fusion { //! Print to an output stream std::ostream& print(std::ostream& os) const; + void resetTopLevelExprs(std::vector exprs) { + top_level_exprs_ = std::move(exprs); + } + const std::vector& topLevelExprs() const; void pushBackTopLevelExprs(Expr* expr); @@ -55,10 +59,22 @@ class HostIrContainer final : public Fusion { Stream* getDefaultStream(); + void markAlias(TensorView* original, const TensorView* new_alias) { + while (alias_.count(original)) { + original = alias_[original]->as(); + } + alias_[new_alias] = original; + } + + const auto& alias() const { + return alias_; + } + private: std::vector top_level_exprs_; std::vector> kernel_executors_; Stream* default_stream_ = nullptr; + std::unordered_map alias_; }; } // namespace hir diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 0c7ac076ea1..89710eaae4b 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -191,32 +191,6 @@ KernelArgumentHolder HostIrExecutor::run( namespace hir { -namespace { - -at::Tensor getKnownTensorOrUndefined( - Val* val, - const ExpressionEvaluator& expr_evaluator) { - return expr_evaluator.isKnown(val) - ? expr_evaluator.evaluate(val).as() - : at::Tensor(); -} - -KernelArgumentHolder getKnownTensorOrUndefined( - const std::vector& vals, - const ExpressionEvaluator& expr_evaluator) { - std::vector tensors(vals.size()); - std::transform( - vals.begin(), - vals.end(), - tensors.begin(), - [&expr_evaluator](Val* val) -> at::Tensor { - return getKnownTensorOrUndefined(val, expr_evaluator); - }); - return KernelArgumentHolder(tensors); -} - -} // namespace - HostIrEvaluator::HostIrEvaluator( std::unique_ptr container, Communicator* communicator, @@ -238,10 +212,23 @@ HostIrEvaluator::HostIrEvaluator( {container_->getDefaultStream(), c10::cuda::getDefaultCUDAStream( static_cast(device_index))}); - expr_evaluator_.bind("numberOfStreams", params_.number_of_streams); + NVF_ERROR( + std::all_of( + container_->inputs().begin(), + container_->inputs().end(), + [this](Val* input) { return !container_->alias().count(input); }), + "Inputs cannot be aliased"); } -KernelArgumentHolder HostIrEvaluator::dispatchAndCollectOutputs() { +KernelArgumentHolder HostIrEvaluator::runWithInput( + const std::unordered_map& val_to_PValue) { + expr_evaluator_ = ExpressionEvaluator(); + expr_evaluator_.bind("numberOfStreams", params_.number_of_streams); + // process input values, converting IValue to PolymorphicValue + for (const auto& [val, pvalue] : val_to_PValue) { + bind(val, pvalue); + } + // Interpret each instruction in an "eager" way by iterate over the Host Ir // Container's top level expression list for (auto expr : container_->topLevelExprs()) { @@ -249,17 +236,15 @@ KernelArgumentHolder HostIrEvaluator::dispatchAndCollectOutputs() { } // Collect global outputs - return getKnownTensorOrUndefined(container_->outputs(), expr_evaluator_); -} - -KernelArgumentHolder HostIrEvaluator::runWithInput( - const std::unordered_map& val_to_PValue) { - // process input values, converting IValue to PolymorphicValue - for (const auto& [val, pvalue] : val_to_PValue) { - expr_evaluator_.bind(val, pvalue); - } - - return dispatchAndCollectOutputs(); + std::vector outputs(container_->outputs().size()); + std::transform( + container_->outputs().begin(), + container_->outputs().end(), + outputs.begin(), + [this](Val* val) -> at::Tensor { + return this->getKnownTensorOrUndefined(val); + }); + return KernelArgumentHolder(outputs); } std::string HostIrEvaluator::canRun() const { @@ -342,13 +327,7 @@ void HostIrEvaluator::handle(Synchronize* synchronize) { void HostIrEvaluator::handle(LaunchKernel* launch_kernel) { KernelArgumentHolder args; for (auto& input : launch_kernel->inputs()) { - NVF_ERROR( - expr_evaluator_.isKnown(input), - "No buffer associated with Val ", - input, - " for handling ", - launch_kernel->toString()); - args.push(expr_evaluator_.evaluate(input)); + args.push(getKnownConcreteValue(input)); } args.setDeviceIndex(); @@ -363,25 +342,35 @@ void HostIrEvaluator::handle(LaunchKernel* launch_kernel) { // Store the outputs in the context for (auto output_idx : arange(outputs.size())) { - expr_evaluator_.bind( - launch_kernel->outputs().at(output_idx), outputs[output_idx]); + bind(launch_kernel->outputs().at(output_idx), outputs[output_idx]); } } void HostIrEvaluator::handle(PostOnStream* post_ir) { KernelArgumentHolder input_args; for (auto& input : post_ir->inputs()) { - NVF_ERROR( - expr_evaluator_.isKnown(input), - "No buffer associated with Val ", - input, - " for handling ", - post_ir->toString()); - input_args.push(expr_evaluator_.evaluate(input)); + input_args.push(getKnownConcreteValue(input)); } input_args.setDeviceIndex(); // placeholder for storing the outputs KernelArgumentHolder outputs; + bool use_preallocated_outputs = std::all_of( + post_ir->outputs().begin(), + post_ir->outputs().end(), + [this](Val* output) { return this->isKnown(output); }); + NVF_ERROR( + use_preallocated_outputs || + std::all_of( + post_ir->outputs().begin(), + post_ir->outputs().end(), + [this](Val* output) { return !this->isKnown(output); }), + "outputs must be all or none preallocated in expr ", + post_ir); + if (use_preallocated_outputs) { + for (auto output : post_ir->outputs()) { + outputs.push(getKnownConcreteValue(output)); + } + } NVF_ERROR( post_ir->hostOpToPost()->isA(), @@ -398,16 +387,23 @@ void HostIrEvaluator::handle(PostOnStream* post_ir) { /*fusion_id=*/0, !params_.skip_auto_scheduling); } - outputs = fec_.at(hu).runFusionWithInputs(input_args); + if (use_preallocated_outputs) { + TORCH_WARN( + "FusionExecutorCache does not support with preallocated outputs, so we are copying the outputs in expr ", + post_ir); + auto tmp_outputs = fec_.at(hu).runFusionWithInputs(input_args); + for (auto output_idx : c10::irange(tmp_outputs.size())) { + outputs[output_idx].as().copy_( + tmp_outputs[output_idx].as()); + } + } else { + outputs = fec_.at(hu).runFusionWithInputs(input_args); + } } else { // This path should generally be avoided as it will likely send the fusion // held in HostUnit directly to KernelExecutor which means it will try to // compile and run a device kernel with a single thread. - if (auto it = executors_.find(hu); it != executors_.end()) { - ExecutorAbstract* ea = it->second.get(); - outputs = ExecutorDispatch::run(ea, input_args); - - } else { + if (auto it = executors_.find(hu); it == executors_.end()) { DynamicTransform::concretizeFusion(hu->fusion_to_execute(), input_args); auto it2 = executors_.insert( {hu, @@ -424,14 +420,20 @@ void HostIrEvaluator::handle(PostOnStream* post_ir) { } else { ExecutorDispatch::compile(ea, hu->fusion_to_execute()); } + } + ExecutorAbstract* ea = executors_[hu].get(); + if (use_preallocated_outputs) { + ExecutorDispatch::run(ea, input_args, outputs); + } else { outputs = ExecutorDispatch::run(ea, input_args); } } - // Store the outputs in the context - for (auto output_idx : arange(outputs.size())) { - expr_evaluator_.bind( - post_ir->outputs().at(output_idx), outputs[output_idx]); + if (!use_preallocated_outputs) { + // Store the outputs in the context + for (auto output_idx : arange(outputs.size())) { + bind(post_ir->outputs().at(output_idx), outputs[output_idx]); + } } } @@ -444,10 +446,9 @@ void HostIrEvaluator::handle(Communication* communication) { communicator_ != nullptr && communicator_->is_available(), "A valid communicator must be provided"); - at::Tensor input_tensor = - getKnownTensorOrUndefined(communication->input(0), expr_evaluator_); + at::Tensor input_tensor = getKnownTensorOrUndefined(communication->input(0)); at::Tensor output_tensor = - getKnownTensorOrUndefined(communication->output(0), expr_evaluator_); + getKnownTensorOrUndefined(communication->output(0)); CommunicatorBackend backend_type = communication->backend(); c10d::Backend* backend = @@ -471,10 +472,9 @@ void HostIrEvaluator::handle(P2PCommunication* communication) { communicator_ != nullptr && communicator_->is_available(), "A valid communicator must be provided"); - CommunicatorBackend backend_type = communication->backend(); - at::Tensor buffer = - getKnownTensorOrUndefined(communication->buffer(), expr_evaluator_); + at::Tensor buffer = getKnownTensorOrUndefined(communication->buffer()); + CommunicatorBackend backend_type = communication->backend(); if (backend_type == CommunicatorBackend::kCuda) { const P2pIpcHandle& p2p_ipc_handle = ipc_handle_cache_.get(communication); const auto current_stream = static_cast( @@ -556,11 +556,11 @@ void HostIrEvaluator::handle(ForLoop* for_loop) { for (auto i = start; i < stop; i += step) { // invalidate i and its consumers before binding - expr_evaluator_.invalidate(for_loop->index()); + invalidate(for_loop->index()); for (auto consumer : allConsumerValsOf(for_loop->index())) { - expr_evaluator_.invalidate(consumer); + invalidate(consumer); } - expr_evaluator_.bind(for_loop->index(), i); + bind(for_loop->index(), i); for (Expr* expr : for_loop->body().exprs()) { dispatch(expr); } @@ -597,15 +597,11 @@ void HostIrEvaluator::handle(MatmulOp* matmul) { TensorView* a = matmul->inA(); TensorView* b = matmul->inB(); TensorView* out = matmul->out(); - NVF_ERROR( - expr_evaluator_.isKnown(a) && expr_evaluator_.isKnown(b), - "Inputs of the matmul ", - matmul->toString(), - "must be precomputed before being retrieved"); - if (expr_evaluator_.isKnown(out)) { - auto t_a = expr_evaluator_.evaluate(a).as(); - auto t_b = expr_evaluator_.evaluate(b).as(); - auto t_out = expr_evaluator_.evaluate(out).as(); + + if (isKnown(out)) { + auto t_a = getKnownConcreteValue(a).as(); + auto t_b = getKnownConcreteValue(b).as(); + auto t_out = getKnownConcreteValue(out).as(); at::matmul_out(t_out, t_a, t_b); } else { unhandled(matmul); @@ -617,24 +613,18 @@ void HostIrEvaluator::handle(LinearOp* linear) { TensorView* weight = linear->inB()->as(); TensorView* bias = linear->bias()->as(); TensorView* out = linear->out()->as(); - NVF_ERROR( - expr_evaluator_.isKnown(in) && expr_evaluator_.isKnown(weight) && - (!linear->has_bias() || expr_evaluator_.isKnown(bias)), - "Inputs of the Linear Op ", - linear->toString(), - "must be precomputed before being retrieved"); - if (!expr_evaluator_.isKnown(out)) { + if (!isKnown(out)) { unhandled(linear); return; } - auto in_at = expr_evaluator_.evaluate(in).as(); - auto weight_at = expr_evaluator_.evaluate(weight).as(); - auto out_at = expr_evaluator_.evaluate(out).as(); + auto in_at = getKnownConcreteValue(in).as(); + auto weight_at = getKnownConcreteValue(weight).as(); + auto out_at = getKnownConcreteValue(out).as(); if (linear->has_bias()) { - auto bias_at = expr_evaluator_.evaluate(bias).as(); + auto bias_at = getKnownConcreteValue(bias).as(); at::linear_out(out_at, in_at, weight_at.squeeze(), bias_at.squeeze()); } else { at::linear_out(out_at, in_at, weight_at.squeeze()); @@ -661,25 +651,37 @@ void HostIrEvaluator::handle(kir::Allocate* allocate) { c10::nullopt, device, c10::nullopt); - - expr_evaluator_.bind(tv, tensor); + bind(tv, tensor); } void HostIrEvaluator::unhandled(Statement* stmt) { NVF_ERROR(stmt->isA(), stmt, " must be an Expr"); auto* expr = stmt->as(); - for (auto input : ir_utils::filterByType(expr->inputs())) { - NVF_ERROR( - expr_evaluator_.isKnown(input), - "input ", - input->toString(), - " of the expression ", - expr->toString(), - "must be precomputed before being retrieved"); - } - for (auto output : expr->outputs()) { - expr_evaluator_.bind( - output, expr_evaluator_.evaluate(output), /*evaluate_validate=*/true); + std::vector inputs; + for (auto input : expr->inputs()) { + if (input->isA()) { + // Tensor inputs must be already computed at this point + inputs.push_back(getKnownConcreteValue(input)); + } else { + inputs.push_back(expr_evaluator_.evaluate(input)); + } + } + + // Check that there is no pre-allocated output + NVF_ERROR( + std::all_of( + expr->outputs().begin(), + expr->outputs().end(), + [this](Val* output) { + return !this->expr_evaluator_.isKnown(output); + }), + "Do not support pre-allocated outputs for the op ", + expr); + // using ExpressionEvaluator::evaluate to evaluate the output is not valid + // here if the output or one of its producer is an alias + auto concrete_outputs = expr->evaluate(expr_evaluator_, inputs); + for (int64_t i : c10::irange(expr->outputs().size())) { + bind(expr->output(i), concrete_outputs.at(i)); } } diff --git a/csrc/host_ir/executor.h b/csrc/host_ir/executor.h index 73c723d7962..d71b74e0dda 100644 --- a/csrc/host_ir/executor.h +++ b/csrc/host_ir/executor.h @@ -139,7 +139,38 @@ class HostIrEvaluator final : public OptOutDispatch { c10::cuda::CUDAStream getCUDAStream(Stream* stream); - KernelArgumentHolder dispatchAndCollectOutputs(); + Val* getAlias(Val* val) const { + const auto& aliases = container_->alias(); + auto it = aliases.find(val); + return it != aliases.end() ? getAlias(it->second) : val; + } + + bool isKnown(Val* value) const { + return expr_evaluator_.isKnown(getAlias(value)); + } + + PolymorphicValue getKnownConcreteValue(Val* val) const { + NVF_ERROR( + isKnown(val), + "value ", + val->toString(), + "must be precomputed before being retrieved"); + return expr_evaluator_.evaluate(getAlias(val)); + } + + at::Tensor getKnownTensorOrUndefined(Val* val) const { + return isKnown(val) + ? expr_evaluator_.evaluate(getAlias(val)).as() + : at::Tensor(); + } + + void bind(Val* value, PolymorphicValue concrete_value) { + expr_evaluator_.bind(getAlias(value), concrete_value); + } + + void invalidate(Val* value) { + expr_evaluator_.invalidate(getAlias(value)); + } std::unique_ptr container_; Communicator* communicator_; diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index 6b96b23a550..1cdc21e60c0 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -6,8 +6,8 @@ */ // clang-format on #include -#include #include +#include #include #include #include @@ -614,6 +614,23 @@ std::vector HostIrLower::lowerToCollectiveBasedPipelinedGemmComm( for_loop}; } +bool HostIrLower::isLowerableAsStandaloneHostOp(Expr* expr) { + return isResharding(expr); +} + +bool HostIrLower::shouldMergeSegmentedGroups( + SegmentedGroup* group1, + SegmentedGroup* group2) { + for (auto group : {group1, group2}) { + for (Expr* expr : group->exprs()) { + if (isLowerableAsStandaloneHostOp(expr)) { + return false; + } + } + } + return true; +} + std::unique_ptr HostIrLower::lower( std::unique_ptr fusion, DeviceIdxType my_device_index) { @@ -637,7 +654,7 @@ std::unique_ptr HostIrLower::lower( .run_combine_reductions = false, .run_herrmann_merge = true, .run_final_merge = true, - .only_segment_resharding_exprs = true}; + .custom_should_merge_groups = &shouldMergeSegmentedGroups}; std::unique_ptr staged_fusion = SegmentCandidateFinder::segment( std::move(fusion), KernelArgumentHolder(), options, true); @@ -665,32 +682,18 @@ std::unique_ptr HostIrLower::lower( if (involvedDevices(group->exprs().at(0)).count(my_device_index) == 0) { continue; } - const bool is_resharding = std::any_of( - group->exprs().begin(), group->exprs().end(), [](auto expr) { - return isResharding(expr); - }); - if (is_resharding) { + // we decide whether to insert the Expr as a standalone op in the + // HostIRContainer, which will result in using ATen Op to evaluate it -- + // or, alternatively, to wrap them into a PostOnStream(HostUnit(.)) which + // will result in a kernel code generation. + if (std::all_of( + group->exprs().begin(), + group->exprs().end(), + isLowerableAsStandaloneHostOp)) { NVF_ERROR( group->exprs().size() == 1, - "Communication segments must contain only one Expr"); - for (auto* expr : HostIrLower::lower( - ir_cloner.clone(group->exprs().at(0)), my_device_index)) { - // Allocate the recv buffers of communications - if (expr->isA()) { - auto* communication = expr->as(); - TensorView* tv = communication->out(); - if (tv->getDeviceMesh().has(my_device_index)) { - auto* allocate = - IrBuilder::create(tv, MemoryType::Global); - hic->pushBackTopLevelExprs(allocate); - } - } - hic->pushBackTopLevelExprs(expr); - if (expr->isA()) { - auto wait = IrBuilder::create(expr->as()); - hic->pushBackTopLevelExprs(wait); - } - } + "Expr executed as a standalone op cannot be fused"); + hic->pushBackTopLevelExprs(ir_cloner.clone(group->exprs().at(0))); } else { auto host_unit = IrBuilder::create( staged_fusion->makeFusion(group).second); @@ -706,6 +709,32 @@ std::unique_ptr HostIrLower::lower( hic->addOutput(ir_cloner.clone(output)); } + std::vector new_top_level_exprs; + for (auto top_level_expr : hic->topLevelExprs()) { + if (!isResharding(top_level_expr)) { + new_top_level_exprs.push_back(top_level_expr); + continue; + } + for (auto* expr : HostIrLower::lower(top_level_expr, my_device_index)) { + // Allocate the recv buffers of communications + if (expr->isA()) { + auto* communication = expr->as(); + TensorView* tv = communication->out(); + if (tv->getDeviceMesh().has(my_device_index)) { + auto* allocate = + IrBuilder::create(tv, MemoryType::Global); + new_top_level_exprs.push_back(allocate); + } + } + new_top_level_exprs.push_back(expr); + if (expr->isA()) { + auto wait = IrBuilder::create(expr->as()); + new_top_level_exprs.push_back(wait); + } + } + } + hic->resetTopLevelExprs(new_top_level_exprs); + return hic; } diff --git a/csrc/host_ir/lower.h b/csrc/host_ir/lower.h index 314c880c264..5b1ecadece8 100644 --- a/csrc/host_ir/lower.h +++ b/csrc/host_ir/lower.h @@ -7,6 +7,7 @@ // clang-format on #pragma once +#include #include #include #include @@ -35,6 +36,12 @@ class HostIrLower { std::unique_ptr fusion, DeviceIdxType my_device_index); + static bool isLowerableAsStandaloneHostOp(Expr* expr); + + static bool shouldMergeSegmentedGroups( + SegmentedGroup* group1, + SegmentedGroup* group2); + private: std::vector lowerToCollectiveBasedPipelinedGemmComm(Expr* expr); const HostIrLowerParams params_; diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index da85466c8a0..6a41e47c744 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -139,7 +139,7 @@ TEST_P(HostIrTest, SingleFusion) { auto outputs = hie.runWithInput({{post_on_stream->inputs().at(0), t0}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); + EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); } /* @@ -236,7 +236,7 @@ TEST_P(HostIrTest, TwoFusions) { auto outputs = hie.runWithInput({{post_on_stream_0->inputs().at(0), t0}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); + EXPECT_TRUE(torch::allclose(ref_output, outputs[0].as())); } /* @@ -365,7 +365,7 @@ TEST_P(HostIrTest, ThreeFusions) { auto outputs = hie.runWithInput({{post_on_stream_0->inputs().at(0), t0_0}}); // validate the obtained results - GTEST_EXPECT_TRUE(torch::allclose(t2_2, outputs[0].as())); + EXPECT_TRUE(torch::allclose(t2_2, outputs[0].as())); } // This unit test the for-loop IR by implementing a program that could be @@ -456,6 +456,62 @@ TEST_P(HostIrTest, ForLoops) { EXPECT_TRUE(expected_result.equal(buffer_at)); } +TEST_P(HostIrTest, PreAllocatedOutputs) { + const std::vector input_sizes = {4, 8, 32}; + const std::vector output_sizes = { + input_sizes.at(1), input_sizes.at(2)}; + + auto get_fusion = [input_sizes]() -> std::unique_ptr { + auto fusion = std::make_unique(); + FusionGuard fg(fusion.get()); + + auto tv0 = makeConcreteTensor(input_sizes); + auto tv1 = add(tv0, tv0); + auto tv2 = sum(tv1, {0}); + fusion->addInput(tv0); + fusion->addOutput(tv2); + return fusion; + }; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto host_unit = IrBuilder::create(get_fusion()); + + IrCloner ir_cloner(hic.get()); + std::vector post_on_stream_inputs = { + ir_cloner.clone(host_unit->fusion_to_execute()->inputs().at(0))}; + std::vector post_on_stream_outputs = { + ir_cloner.clone(host_unit->fusion_to_execute()->outputs().at(0))}; + + auto post_on_stream = IrBuilder::create( + host_unit, post_on_stream_inputs, post_on_stream_outputs); + + hic->pushBackTopLevelExprs(post_on_stream); + + hic->addInput(post_on_stream->inputs().at(0)); + hic->addInput(post_on_stream->outputs().at(0)); + + HostIrEvaluatorParams params; + auto [use_fusion_executor_cache] = GetParam(); + params.use_fusion_executor_cache = use_fusion_executor_cache; + HostIrEvaluator hie(std::move(hic), nullptr, params); + + // define concrete inputs and compute ref output for validation + auto options = at::TensorOptions().device(at::kCUDA, 0); + auto input = at::randn(input_sizes, options); + auto output = at::empty(output_sizes, options); + auto ref_output = at::sum(input * 2, {0}); + + hie.runWithInput( + {{post_on_stream->inputs().at(0), input}, + {post_on_stream->outputs().at(0), output}}); + + // validate the obtained results + EXPECT_TRUE(torch::allclose(ref_output, output)) + << "Output: " << output << " Expected: " << ref_output; +} + INSTANTIATE_TEST_SUITE_P( , HostIrTest, @@ -668,7 +724,7 @@ TEST_P(StreamHostIrTest, SingleFusionMultipleStreams) { // validate the obtained results for (int i = 0; i < n_iterations; i++) { - GTEST_EXPECT_TRUE(torch::allclose(ref_output, outputs[i].as())); + EXPECT_TRUE(torch::allclose(ref_output, outputs[i].as())); } EXPECT_NE( c10::cuda::getDefaultCUDAStream(0), c10::cuda::getCurrentCUDAStream(0)); @@ -1095,7 +1151,12 @@ TEST_F(IfThenElseTest, HostIr) { hic->addOutput(output_buffer); hic->pushBackTopLevelExprs(if_then_else); - HostIrEvaluator hie(std::move(hic)); + // Need to use FusionExecutorCache, otherwise hitting error + // https://github.com/NVIDIA/Fuser/blob/4d032f74d2347fd68f5be607ef94956500eb917b/csrc/runtime/executor.cpp#L750 + HostIrEvaluator hie( + std::move(hic), + /*Communicator=*/nullptr, + {.use_fusion_executor_cache = true}); for (auto boolean : {true, false}) { auto options = @@ -1155,13 +1216,11 @@ TEST_F(AllocationTest, inHostForLoop) { TensorView* tv0 = makeConcreteTensor(sizes); tv0->setMemoryType(MemoryType::Global); auto* allocate = IrBuilder::create(tv0, MemoryType::Global); - TensorView* tv1 = abs(tv0); for_loop->body().push_back(allocate); - for_loop->body().push_back(tv1->definition()); hic->pushBackTopLevelExprs(for_loop); - hic->addOutput(tv1); + hic->addOutput(tv0); HostIrEvaluator hie(std::move(hic)); @@ -1170,6 +1229,84 @@ TEST_F(AllocationTest, inHostForLoop) { EXPECT_EQ(sizes, outputs[0].as().sizes()); } +using HirAlias = NVFuserTest; + +TEST_F(HirAlias, SetAndGet) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + TensorView* tv0 = makeConcreteTensor(sizes); + TensorView* tv1 = set(tv0); + TensorView* tv2 = makeConcreteTensor(sizes); + hic->markAlias(tv1, tv2); + TensorView* tv3 = set(tv2); + TensorView* tv4 = makeConcreteTensor(sizes); + hic->markAlias(tv3, tv4); + hic->addInput(tv0); + hic->addOutput(tv4); + hic->pushBackTopLevelExprs(tv1->definition()); + hic->pushBackTopLevelExprs(tv3->definition()); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + at::Tensor tv0_aten = at::randn(sizes, options); + + at::Tensor out_aten = hie.runWithInput({{tv0, tv0_aten}})[0].as(); + + at::Tensor expected_out = tv0_aten; + EXPECT_TRUE(out_aten.equal(expected_out)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + +TEST_F(HirAlias, SetAndGetReversedOrder) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + TensorView* tv0 = makeConcreteTensor(sizes); + TensorView* tv1 = set(tv0); + TensorView* tv2 = makeConcreteTensor(sizes); + TensorView* tv3 = set(tv2); + TensorView* tv4 = makeConcreteTensor(sizes); + hic->markAlias(tv3, tv4); + hic->markAlias(tv1, tv2); + hic->addInput(tv0); + hic->addOutput(tv4); + hic->pushBackTopLevelExprs(tv1->definition()); + hic->pushBackTopLevelExprs(tv3->definition()); + + HostIrEvaluator hie(std::move(hic)); + + auto options = at::TensorOptions().device(at::kCUDA, 0); + at::Tensor tv0_aten = at::randn(sizes, options); + + at::Tensor out_aten = hie.runWithInput({{tv0, tv0_aten}})[0].as(); + + at::Tensor expected_out = tv0_aten; + EXPECT_TRUE(out_aten.equal(expected_out)) + << "Obtained output: " << out_aten << "\n" + << "Expected output: " << expected_out; +} + +TEST_F(HirAlias, ThrowOnInputAlias) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + TensorView* tv0 = makeConcreteTensor(sizes); + TensorView* tv1 = set(tv0); + hic->markAlias(tv1, tv0); + hic->addInput(tv0); + + EXPECT_ANY_THROW(HostIrEvaluator hie(std::move(hic))); +} + } // namespace hir } // namespace nvfuser diff --git a/tests/cpp/test_resharding.cpp b/tests/cpp/test_resharding.cpp index ecf7d9b09db..4e8dc498485 100644 --- a/tests/cpp/test_resharding.cpp +++ b/tests/cpp/test_resharding.cpp @@ -53,7 +53,7 @@ class ReshardingTest : public NVFuserFixtureParamTest { .run_combine_reductions = false, .run_herrmann_merge = true, .run_final_merge = true, - .only_segment_resharding_exprs = true}; + .custom_should_merge_groups = &HostIrLower::shouldMergeSegmentedGroups}; auto segmented_fusion = SegmentCandidateFinder::segment( std::move(fusion_), KernelArgumentHolder(), options, true);