diff --git a/cpp/src/branch_and_bound/branch_and_bound.cpp b/cpp/src/branch_and_bound/branch_and_bound.cpp index 33a2d983c9..db8b9910dd 100644 --- a/cpp/src/branch_and_bound/branch_and_bound.cpp +++ b/cpp/src/branch_and_bound/branch_and_bound.cpp @@ -1754,7 +1754,7 @@ void branch_and_bound_t::run_scheduler() active_workers_per_strategy_[strategy]++; launched_any_task = true; -#pragma omp task affinity(worker) +#pragma omp task affinity(worker) default(none) firstprivate(worker) plunge_with(worker); } else { @@ -1775,7 +1775,7 @@ void branch_and_bound_t::run_scheduler() active_workers_per_strategy_[strategy]++; launched_any_task = true; -#pragma omp task affinity(worker) +#pragma omp task affinity(worker) default(none) firstprivate(worker) dive_with(worker); } } @@ -1800,6 +1800,8 @@ void branch_and_bound_t::run_scheduler() template void branch_and_bound_t::single_threaded_solve() { + raft::common::nvtx::range scope("BB::single_threaded_solve"); + branch_and_bound_worker_t worker(0, original_lp_, Arow_, var_types_, settings_); f_t lower_bound = get_lower_bound(); @@ -1875,19 +1877,23 @@ lp_status_t branch_and_bound_t::solve_root_relaxation( // Root node path lp_status_t root_status; - std::future root_status_future; - root_status_future = std::async(std::launch::async, - &solve_linear_program_with_advanced_basis, - std::ref(original_lp_), - exploration_stats_.start_time, - std::ref(lp_settings), - std::ref(root_relax_soln), - std::ref(basis_update), - std::ref(basic_list), - std::ref(nonbasic_list), - std::ref(root_vstatus), - std::ref(edge_norms), - nullptr); + + // Note that we need to explicitly declared `root_status` as a shared variable here since + // it is local to the thread that are executing the enclosing task. +#pragma omp task default(shared) depend(out : root_status) + { + root_status = solve_linear_program_with_advanced_basis(original_lp_, + exploration_stats_.start_time, + lp_settings, + root_relax_soln_, + basis_update, + basic_list, + nonbasic_list, + root_vstatus_, + edge_norms_, + nullptr); + } + // Wait for the root relaxation solution to be sent by the diversity manager or dual simplex // to finish while (!root_crossover_solution_set_.load(std::memory_order_acquire) && @@ -1930,7 +1936,7 @@ lp_status_t branch_and_bound_t::solve_root_relaxation( // Check if crossover was stopped by dual simplex if (crossover_status == crossover_status_t::OPTIMAL) { set_root_concurrent_halt(1); // Stop dual simplex - root_status = root_status_future.get(); // Wait for dual simplex to finish +#pragma omp taskwait depend(in : root_status) // Wait for dual simplex to finish set_root_concurrent_halt(0); // Clear the concurrent halt flag // Override the root relaxation solution with the crossover solution root_relax_soln = root_crossover_soln_; @@ -1981,14 +1987,14 @@ lp_status_t branch_and_bound_t::solve_root_relaxation( solver_name = method_to_string(root_relax_solved_by); } else { - root_status = root_status_future.get(); +#pragma omp taskwait depend(in : root_status) // Wait for the dual simplex to finish user_objective = root_relax_soln_.user_objective; iter = root_relax_soln_.iterations; root_relax_solved_by = DualSimplex; solver_name = "Dual Simplex"; } } else { - root_status = root_status_future.get(); +#pragma omp taskwait depend(in : root_status) // Wait for the dual simplex to finish user_objective = root_relax_soln_.user_objective; iter = root_relax_soln_.iterations; root_relax_solved_by = DualSimplex; @@ -2614,17 +2620,16 @@ mip_status_t branch_and_bound_t::solve(mip_solution_t& solut "| Gap | Time |\n"); } - if (settings_.deterministic) { - run_deterministic_coordinator(Arow_); - } else if (settings_.num_threads > 1) { -#pragma omp parallel num_threads(settings_.num_threads) - { -#pragma omp master +#pragma omp taskgroup + { + if (settings_.deterministic) { + run_deterministic_coordinator(Arow_); + } else if (settings_.num_threads > 1) { run_scheduler(); + } else { + single_threaded_solve(); } - } else { - single_threaded_solve(); - } + } // Implicit barrier for all tasks created within the group (RINS, B&B workers) is_running_ = false; diff --git a/cpp/src/branch_and_bound/pseudo_costs.cpp b/cpp/src/branch_and_bound/pseudo_costs.cpp index c38e98e27d..a9297de571 100644 --- a/cpp/src/branch_and_bound/pseudo_costs.cpp +++ b/cpp/src/branch_and_bound/pseudo_costs.cpp @@ -756,14 +756,15 @@ static void batch_pdlp_strong_branching_task( ws_settings.inside_mip = true; if (effective_batch_pdlp == 1) { ws_settings.concurrent_halt = &concurrent_halt; } - auto start_time = std::chrono::high_resolution_clock::now(); + auto pdlp_start_time = std::chrono::high_resolution_clock::now(); auto ws_solution = solve_lp(&pc.pdlp_warm_cache.batch_pdlp_handle, mps_model, ws_settings); if (verbose) { - auto end_time = std::chrono::high_resolution_clock::now(); + auto pdlp_end_time = std::chrono::high_resolution_clock::now(); auto duration = - std::chrono::duration_cast(end_time - start_time).count(); + std::chrono::duration_cast(pdlp_end_time - pdlp_start_time) + .count(); settings.log.printf( "Original problem solved in %d milliseconds" " and iterations: %d\n", @@ -999,7 +1000,7 @@ void strong_branching(const lp_problem_t& original_lp, basis_update_mpf_t& basis_factors, pseudo_costs_t& pc) { - constexpr bool verbose = false; + raft::common::nvtx::range scope("BB::strong_branching"); pc.resize(original_lp.num_cols); pc.strong_branch_down.assign(fractional.size(), 0); @@ -1052,75 +1053,74 @@ void strong_branching(const lp_problem_t& original_lp, basis_factors, pc); } else { -#pragma omp parallel num_threads(settings.num_threads) - { -#pragma omp single nowait - { - if (effective_batch_pdlp != 0) { -#pragma omp task - batch_pdlp_strong_branching_task(settings, - effective_batch_pdlp, - start_time, - concurrent_halt, - original_lp, - new_slacks, - root_solution.x, - fractional, - root_obj, - pc, - sb_view, - pdlp_obj_down, - pdlp_obj_up); - } + if (effective_batch_pdlp != 0) { +#pragma omp task default(shared) + batch_pdlp_strong_branching_task(settings, + effective_batch_pdlp, + start_time, + concurrent_halt, + original_lp, + new_slacks, + root_solution.x, + fractional, + root_obj, + pc, + sb_view, + pdlp_obj_down, + pdlp_obj_up); + } - if (effective_batch_pdlp != 2) { - i_t n = std::min(4 * settings.num_threads, fractional.size()); + if (effective_batch_pdlp != 2) { + i_t n = std::min(4 * settings.num_threads, fractional.size()); // Here we are creating more tasks than the number of threads // such that they can be scheduled dynamically to the threads. -#pragma omp taskloop num_tasks(n) - for (i_t k = 0; k < n; k++) { - i_t start = std::floor(k * fractional.size() / n); - i_t end = std::floor((k + 1) * fractional.size() / n); - - constexpr bool verbose = false; - if (verbose) { - settings.log.printf("Thread id %d task id %d start %d end %d. size %d\n", - omp_get_thread_num(), - k, - start, - end, - end - start); - } - - strong_branch_helper(start, - end, - start_time, - original_lp, - settings, - var_types, - fractional, - root_solution.x, - root_vstatus, - edge_norms, - root_obj, - upper_bound, - simplex_iteration_limit, - pc, - dual_simplex_obj_down, - dual_simplex_obj_up, - dual_simplex_status_down, - dual_simplex_status_up, - sb_view); - } - // DS done: signal PDLP to stop (time-limit or all work done) and wait - if (effective_batch_pdlp == 1) { concurrent_halt.store(1); } +#pragma omp taskloop num_tasks(n) default(shared) + for (i_t k = 0; k < n; k++) { + i_t start = std::floor(k * fractional.size() / n); + i_t end = std::floor((k + 1) * fractional.size() / n); + + constexpr bool verbose = false; + if (verbose) { + settings.log.printf("Thread id %d task id %d start %d end %d. size %d\n", + omp_get_thread_num(), + k, + start, + end, + end - start); } + + strong_branch_helper(start, + end, + start_time, + original_lp, + settings, + var_types, + fractional, + root_solution.x, + root_vstatus, + edge_norms, + root_obj, + upper_bound, + simplex_iteration_limit, + pc, + dual_simplex_obj_down, + dual_simplex_obj_up, + dual_simplex_status_down, + dual_simplex_status_up, + sb_view); } + // DS done: signal PDLP to stop (time-limit or all work done) and wait + if (effective_batch_pdlp == 1) { concurrent_halt.store(1); } + } + + if (effective_batch_pdlp != 0) { +#pragma omp taskwait // Wait for the batch PDLP task to finish } } settings.log.printf("Strong branching completed in %.2fs\n", toc(strong_branching_start_time)); + constexpr bool verbose = false; if (verbose) { // Collect Dual Simplex statistics i_t dual_simplex_optimal = 0, dual_simplex_infeasible = 0, dual_simplex_iter_limit = 0; @@ -1283,6 +1283,8 @@ i_t pseudo_costs_t::variable_selection(const std::vector& fractio const std::vector& solution, logger_t& log) { + raft::common::nvtx::range scope("BB::pseudocost_branching"); + i_t branch_var = fractional[0]; f_t max_score = -1; i_t num_initialized_down; @@ -1329,6 +1331,8 @@ i_t pseudo_costs_t::reliable_variable_selection( const std::vector& new_slacks, const lp_problem_t& original_lp) { + raft::common::nvtx::range scope("BB::reliability_branching"); + constexpr f_t eps = 1e-6; f_t start_time = bnb_stats.start_time; i_t branch_var = fractional[0]; @@ -1563,7 +1567,7 @@ i_t pseudo_costs_t::reliable_variable_selection( log.printf("Time limit reached\n"); if (use_pdlp) { concurrent_halt.store(1); -#pragma omp taskwait +#pragma omp taskwait // Wait for the batch PDLP task to finish } return branch_var; } @@ -1576,14 +1580,7 @@ i_t pseudo_costs_t::reliable_variable_selection( f_t dual_simplex_start_time = tic(); if (rb_mode != 2) { -#pragma omp taskloop if (num_tasks > 1) priority(task_priority) num_tasks(num_tasks) \ - shared(score_mutex, \ - sb_view, \ - dual_simplex_obj_down, \ - dual_simplex_obj_up, \ - dual_simplex_status_down, \ - dual_simplex_status_up, \ - unreliable_list) +#pragma omp taskloop if (num_tasks > 1) priority(task_priority) num_tasks(num_tasks) default(shared) for (i_t i = 0; i < num_candidates; ++i) { auto [score, j] = unreliable_list[i]; @@ -1709,7 +1706,7 @@ i_t pseudo_costs_t::reliable_variable_selection( //} if (use_pdlp) { -#pragma omp taskwait +#pragma omp taskwait // Wait for the batch PDLP task to finish i_t pdlp_applied = 0; i_t pdlp_optimal = 0; diff --git a/cpp/src/mip_heuristics/diversity/diversity_manager.cu b/cpp/src/mip_heuristics/diversity/diversity_manager.cu index b8dc3d33bf..f53648b0f8 100644 --- a/cpp/src/mip_heuristics/diversity/diversity_manager.cu +++ b/cpp/src/mip_heuristics/diversity/diversity_manager.cu @@ -603,18 +603,15 @@ solution_t diversity_manager_t::run_solver() generate_solution(timer.remaining_time(), false); if (timer.check_time_limit()) { - rins.stop_rins(); population.add_external_solutions_to_population(); return population.best_feasible(); } if (check_b_b_preemption()) { - rins.stop_rins(); population.add_external_solutions_to_population(); return population.best_feasible(); } run_fp_alone(); - rins.stop_rins(); population.add_external_solutions_to_population(); return population.best_feasible(); }; diff --git a/cpp/src/mip_heuristics/diversity/lns/rins.cu b/cpp/src/mip_heuristics/diversity/lns/rins.cu index c4331343de..9396d7158a 100644 --- a/cpp/src/mip_heuristics/diversity/lns/rins.cu +++ b/cpp/src/mip_heuristics/diversity/lns/rins.cu @@ -24,6 +24,7 @@ #include #include +#include namespace cuopt::linear_programming::detail { template @@ -36,19 +37,6 @@ rins_t::rins_t(mip_solver_context_t& context_, time_limit = context.settings.heuristic_params.rins_time_limit; } -template -rins_thread_t::~rins_thread_t() -{ - this->request_termination(); -} - -template -void rins_thread_t::run_worker() -{ - raft::common::nvtx::range fun_scope("Running RINS"); - rins_ptr->run_rins(); -} - template void rins_t::new_best_incumbent_callback(const std::vector& solution) { @@ -59,23 +47,27 @@ template void rins_t::node_callback(const std::vector& solution, f_t objective) { if (!enabled) return; - node_count++; if (node_count - node_count_at_last_improvement < settings.nodes_after_later_improvement) return; - if (node_count - node_count_at_last_rins > settings.node_freq) { // opportunistic early test w/ atomic to avoid having to take the lock - if (!rins_thread->cpu_thread_done) return; - std::lock_guard lock(rins_mutex); + if (!launch_new_task.exchange(false)) return; + bool population_ready = false; - if (rins_thread->cpu_thread_done) { + { std::lock_guard pop_lock(dm.population.write_mutex); population_ready = dm.population.current_size() > 0 && dm.population.is_feasible(); } + if (population_ready) { lp_optimal_solution = solution; - rins_thread->start_cpu_solver(); + + CUOPT_LOG_DEBUG("Launching RINS task"); +#pragma omp task default(none) + run_rins(); + } else { + launch_new_task = true; } } } @@ -83,27 +75,19 @@ void rins_t::node_callback(const std::vector& solution, f_t objec template void rins_t::enable() { - rins_thread = std::make_unique>(); - rins_thread->rins_ptr = this; - seed = cuopt::seed_generator::get_seed(); + seed = cuopt::seed_generator::get_seed(); problem_ptr->handle_ptr->sync_stream(); problem_copy = std::make_unique>(*problem_ptr, &rins_handle); enabled = true; } -template -void rins_t::stop_rins() -{ - enabled = false; - if (rins_thread) rins_thread->request_termination(); - rins_thread.reset(); -} - template void rins_t::run_rins() { - if (total_calls == 0) RAFT_CUDA_TRY(cudaSetDevice(context.handle_ptr->get_device())); + raft::common::nvtx::range fun_scope("Running RINS"); + scope_guard guard([this]() { this->launch_new_task = true; }); + RAFT_CUDA_TRY(cudaSetDevice(context.handle_ptr->get_device())); cuopt_assert(lp_optimal_solution.size() == problem_copy->n_variables, "Assignment size mismatch"); cuopt_assert(problem_copy->handle_ptr == &rins_handle, "Handle mismatch"); // Do not make assertions based on problem_ptr. The original problem may have been modified within @@ -229,18 +213,20 @@ void rins_t::run_rins() solution_t fj_solution(fixed_problem); fj_solution.copy_new_assignment(cuopt::host_copy(fixed_assignment, rins_handle.get_stream())); std::vector default_weights(fixed_problem.n_constraints, 1.); - cpu_fj_thread_t cpu_fj_thread; - cpu_fj_thread.fj_cpu = fj.create_cpu_climber(fj_solution, - default_weights, - default_weights, - 0., - context.preempt_heuristic_solver_, - fj_settings_t{}, - true); - cpu_fj_thread.fj_ptr = &fj; - cpu_fj_thread.fj_cpu->log_prefix = "[RINS] "; - cpu_fj_thread.time_limit = time_limit; - cpu_fj_thread.start_cpu_solver(); + + std::unique_ptr> fj_cpu = + fj.create_cpu_climber(fj_solution, + default_weights, + default_weights, + 0., + context.preempt_heuristic_solver_, + fj_settings_t{}, + true); + fj_cpu->log_prefix = "[RINS] "; + + CUOPT_LOG_DEBUG("Launching CPUFJ (RINS) task"); +#pragma omp task shared(fj_cpu) firstprivate(time_limit) default(none) + cpufj_solve(fj_cpu.get(), time_limit); f_t lower_bound = context.branch_and_bound_ptr ? context.branch_and_bound_ptr->get_lower_bound() : -std::numeric_limits::infinity(); @@ -311,13 +297,13 @@ void rins_t::run_rins() static_cast(context.settings.heuristic_params.rins_max_time_limit)); } - cpu_fj_thread.stop_cpu_solver(); - bool fj_solution_found = cpu_fj_thread.wait_for_cpu_solver(); - CUOPT_LOG_DEBUG("RINS FJ ran for %d iterations", cpu_fj_thread.fj_cpu->iterations); - if (fj_solution_found) { - CUOPT_LOG_DEBUG("RINS FJ solution found. Objective %.16e", - cpu_fj_thread.fj_cpu->h_best_objective); - rins_solution_queue.push_back(cpu_fj_thread.fj_cpu->h_best_assignment); +#pragma omp taskwait // Wait for the CPU FJ (RINS) to finish + CUOPT_LOG_DEBUG("CPUFJ (RINS) task was stopped"); + + CUOPT_LOG_DEBUG("RINS FJ ran for %d iterations", fj_cpu->iterations); + if (fj_cpu->feasible_found) { + CUOPT_LOG_DEBUG("RINS FJ solution found. Objective %.16e", fj_cpu->h_best_objective); + rins_solution_queue.push_back(fj_cpu->h_best_assignment); } // Thread will be automatically terminated and joined by destructor @@ -357,12 +343,10 @@ void rins_t::run_rins() } #if MIP_INSTANTIATE_FLOAT -template class rins_thread_t; template class rins_t; #endif #if MIP_INSTANTIATE_DOUBLE -template class rins_thread_t; template class rins_t; #endif diff --git a/cpp/src/mip_heuristics/diversity/lns/rins.cuh b/cpp/src/mip_heuristics/diversity/lns/rins.cuh index 0a9133f848..b1b62bd1ae 100644 --- a/cpp/src/mip_heuristics/diversity/lns/rins.cuh +++ b/cpp/src/mip_heuristics/diversity/lns/rins.cuh @@ -17,19 +17,11 @@ #pragma once -#include #include #include -#include -#include +#include -#include -#include -#include -#include -#include -#include #include namespace cuopt::linear_programming::detail { @@ -52,18 +44,6 @@ struct rins_settings_t { template class rins_t; -template -struct rins_thread_t : public cpu_worker_thread_base_t> { - ~rins_thread_t(); - - void run_worker(); - void on_terminate() {} - void on_start() {} - bool get_result() { return true; } - - rins_t* rins_ptr{nullptr}; -}; - template class rins_t { public: @@ -74,7 +54,6 @@ class rins_t { void node_callback(const std::vector& solution, f_t objective); void new_best_incumbent_callback(const std::vector& solution); void enable(); - void stop_rins(); void run_rins(); @@ -96,15 +75,13 @@ class rins_t { f_t time_limit{10.}; i_t seed; - std::atomic enabled{false}; - std::atomic lower_bound{0.}; - - std::atomic node_count{0}; - std::atomic node_count_at_last_rins{0}; - std::atomic node_count_at_last_improvement{0}; - std::mutex rins_mutex; + omp_atomic_t enabled{false}; + omp_atomic_t lower_bound{0.}; - std::unique_ptr> rins_thread; + omp_atomic_t node_count{0}; + omp_atomic_t node_count_at_last_rins{0}; + omp_atomic_t node_count_at_last_improvement{0}; + omp_atomic_t launch_new_task{true}; }; } // namespace cuopt::linear_programming::detail diff --git a/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cu b/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cu index 8109653e6f..de905f7592 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cu +++ b/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cu @@ -7,9 +7,7 @@ #include "early_cpufj.cuh" -#include #include -#include namespace cuopt::linear_programming::detail { @@ -32,40 +30,39 @@ early_cpufj_t::~early_cpufj_t() template void early_cpufj_t::start() { - if (cpu_fj_thread_) { return; } + if (fj_cpu_) { return; } this->preemption_flag_.store(false); this->start_time_ = std::chrono::steady_clock::now(); - cpu_fj_thread_ = std::make_unique>(); - cpu_fj_thread_->fj_cpu = - init_fj_cpu_standalone(*this->problem_ptr_, *this->solution_ptr_, preemption_flag_); - cpu_fj_thread_->time_limit = std::numeric_limits::infinity(); + fj_cpu_ = init_fj_cpu_standalone(*this->problem_ptr_, *this->solution_ptr_, preemption_flag_); - cpu_fj_thread_->fj_cpu->log_prefix = "[Early CPUFJ] "; + fj_cpu_->log_prefix = "[Early CPUFJ] "; - cpu_fj_thread_->fj_cpu->improvement_callback = - [this](f_t solver_obj, const std::vector& assignment, double) { - this->try_update_best(solver_obj, assignment); - }; + fj_cpu_->improvement_callback = [this](f_t solver_obj, + const std::vector& assignment, + double) { this->try_update_best(solver_obj, assignment); }; - cpu_fj_thread_->start_cpu_solver(); + CUOPT_LOG_DEBUG("Launching early CPUFJ task"); +#pragma omp task shared(fj_cpu_) depend(out : *fj_cpu_) default(none) + cpufj_solve(fj_cpu_.get()); } template void early_cpufj_t::stop() { - if (!cpu_fj_thread_) { return; } + if (!fj_cpu_) { return; } preemption_flag_.store(true); - cpu_fj_thread_->stop_cpu_solver(); - cpu_fj_thread_->wait_for_cpu_solver(); + + fj_cpu_->halted = true; +#pragma omp taskwait depend(in : *fj_cpu_) // Wait for the early CPUFJ task to finish CUOPT_LOG_DEBUG("[Early CPUFJ] Stopped after %d iterations, solution_found=%d", - cpu_fj_thread_->fj_cpu ? cpu_fj_thread_->fj_cpu->iterations : 0, + fj_cpu_ ? fj_cpu_->iterations : 0, this->solution_found_); - cpu_fj_thread_.reset(); + fj_cpu_.reset(); } #if MIP_INSTANTIATE_FLOAT diff --git a/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cuh b/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cuh index 911e846551..fd85e4b9f3 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cuh +++ b/cpp/src/mip_heuristics/feasibility_jump/early_cpufj.cuh @@ -8,15 +8,13 @@ #pragma once #include +#include #include #include namespace cuopt::linear_programming::detail { -template -struct cpu_fj_thread_t; - template class early_cpufj_t : public early_heuristic_t> { public: @@ -32,7 +30,7 @@ class early_cpufj_t : public early_heuristic_t void stop(); private: - std::unique_ptr> cpu_fj_thread_; + std::unique_ptr> fj_cpu_; std::atomic preemption_flag_{false}; }; diff --git a/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cu b/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cu index 3f77427d87..4615fa8dc3 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cu +++ b/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cu @@ -38,7 +38,7 @@ early_gpufj_t::~early_gpufj_t() template void early_gpufj_t::start() { - if (worker_thread_) { return; } + if (fj_ptr_) { return; } this->start_time_ = std::chrono::steady_clock::now(); @@ -57,29 +57,26 @@ void early_gpufj_t::start() this->try_update_best(solver_obj, h_assignment); }; - worker_thread_ = std::make_unique(&early_gpufj_t::run_worker, this); -} + CUOPT_LOG_DEBUG("Launching early GPUFJ task"); -template -void early_gpufj_t::run_worker() -{ - RAFT_CUDA_TRY(cudaSetDevice(this->device_id_)); - fj_ptr_->solve(*this->solution_ptr_); +#pragma omp task default(none) shared(fj_ptr_) depend(out : *fj_ptr_) + { + RAFT_CUDA_TRY(cudaSetDevice(this->device_id_)); + fj_ptr_->solve(*this->solution_ptr_); + } } template void early_gpufj_t::stop() { - if (!worker_thread_) { return; } + if (!fj_ptr_) { return; } context_ptr_->preempt_heuristic_solver_.store(true); - - if (worker_thread_->joinable()) { worker_thread_->join(); } +#pragma omp taskwait depend(in : *fj_ptr_) // Wait for the early GPU FJ task to finish CUOPT_LOG_DEBUG("[Early GPU FJ] Stopped, solution_found=%d", this->solution_found_); fj_ptr_.reset(); - worker_thread_.reset(); } #if MIP_INSTANTIATE_FLOAT diff --git a/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cuh b/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cuh index 4a7769143e..e5ceaaeb61 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cuh +++ b/cpp/src/mip_heuristics/feasibility_jump/early_gpufj.cuh @@ -10,7 +10,6 @@ #include #include -#include namespace cuopt::linear_programming::detail { @@ -35,11 +34,8 @@ class early_gpufj_t : public early_heuristic_t void stop(); private: - void run_worker(); - std::unique_ptr> context_ptr_; std::unique_ptr> fj_ptr_; - std::unique_ptr worker_thread_; }; } // namespace cuopt::linear_programming::detail diff --git a/cpp/src/mip_heuristics/feasibility_jump/feasibility_jump.cuh b/cpp/src/mip_heuristics/feasibility_jump/feasibility_jump.cuh index 50b451a86e..33d1ac527f 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/feasibility_jump.cuh +++ b/cpp/src/mip_heuristics/feasibility_jump/feasibility_jump.cuh @@ -216,8 +216,6 @@ class fj_t { std::atomic& preemption_flag, fj_settings_t settings = fj_settings_t{}, bool randomize_params = false); - bool cpu_solve(fj_cpu_climber_t& fj_cpu, - f_t time_limit = +std::numeric_limits::infinity()); i_t alloc_max_climbers(i_t desired_climbers); void resize_vectors(const raft::handle_t* handle_ptr); void device_init(const rmm::cuda_stream_view& stream); diff --git a/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cu b/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cu index 4eaa5b6a21..ae0057ba5f 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cu +++ b/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cu @@ -1417,45 +1417,45 @@ std::unique_ptr> fj_t::create_cpu_climber( } template -static bool cpufj_solve_loop(fj_cpu_climber_t& fj_cpu, f_t in_time_limit) +void cpufj_solve(fj_cpu_climber_t* fj_cpu, f_t in_time_limit) { - i_t local_mins = 0; - auto loop_start = std::chrono::high_resolution_clock::now(); - auto time_limit = std::chrono::milliseconds((int)(in_time_limit * 1000)); + i_t local_mins = 0; + auto loop_start = std::chrono::high_resolution_clock::now(); + auto time_limit = std::chrono::milliseconds(static_cast(std::floor(in_time_limit * 1000.0))); auto loop_time_start = std::chrono::high_resolution_clock::now(); // Initialize feature tracking - fj_cpu.last_feature_log_time = loop_start; - fj_cpu.prev_best_objective = fj_cpu.h_best_objective; - fj_cpu.iterations_since_best = 0; + fj_cpu->last_feature_log_time = loop_start; + fj_cpu->prev_best_objective = fj_cpu->h_best_objective; + fj_cpu->iterations_since_best = 0; - while (!fj_cpu.halted && !fj_cpu.preemption_flag.load()) { + while (!fj_cpu->halted && !fj_cpu->preemption_flag.load()) { // Check if 5 seconds have passed auto now = std::chrono::high_resolution_clock::now(); if (in_time_limit < std::numeric_limits::infinity() && now - loop_time_start > time_limit) { CUOPT_LOG_TRACE("%sTime limit of %.4f seconds reached, breaking loop at iteration %d", - fj_cpu.log_prefix.c_str(), + fj_cpu->log_prefix.c_str(), time_limit.count() / 1000.f, - fj_cpu.iterations); + fj_cpu->iterations); break; } - if (fj_cpu.iterations >= fj_cpu.settings.iteration_limit) { + if (fj_cpu->iterations >= fj_cpu->settings.iteration_limit) { CUOPT_LOG_TRACE("%sIteration limit of %d reached, breaking loop at iteration %d", - fj_cpu.log_prefix.c_str(), - fj_cpu.settings.iteration_limit, - fj_cpu.iterations); + fj_cpu->log_prefix.c_str(), + fj_cpu->settings.iteration_limit, + fj_cpu->iterations); break; } // periodically recompute the LHS and violation scores // to correct any accumulated numerical errors - cuopt_assert(fj_cpu.settings.parameters.lhs_refresh_period > 0, + cuopt_assert(fj_cpu->settings.parameters.lhs_refresh_period > 0, "lhs_refresh_period should be positive"); - if (fj_cpu.iterations % fj_cpu.settings.parameters.lhs_refresh_period == 0 || - fj_cpu.trigger_early_lhs_recomputation) { - recompute_lhs(fj_cpu); - fj_cpu.trigger_early_lhs_recomputation = false; + if (fj_cpu->iterations % fj_cpu->settings.parameters.lhs_refresh_period == 0 || + fj_cpu->trigger_early_lhs_recomputation) { + recompute_lhs(*fj_cpu); + fj_cpu->trigger_early_lhs_recomputation = false; } fj_move_t move = fj_move_t{-1, 0}; @@ -1465,153 +1465,113 @@ static bool cpufj_solve_loop(fj_cpu_climber_t& fj_cpu, f_t in_time_lim bool is_mtm_sat = false; // Perform lift moves - if (fj_cpu.violated_constraints.empty()) { - thrust::tie(move, score) = find_lift_move(fj_cpu); + if (fj_cpu->violated_constraints.empty()) { + thrust::tie(move, score) = find_lift_move(*fj_cpu); if (score > fj_staged_score_t::zero()) is_lift = true; } // Regular MTM if (!(score > fj_staged_score_t::zero())) { - thrust::tie(move, score) = find_mtm_move_viol(fj_cpu, fj_cpu.mtm_viol_samples); + thrust::tie(move, score) = find_mtm_move_viol(*fj_cpu, fj_cpu->mtm_viol_samples); if (score > fj_staged_score_t::zero()) is_mtm_viol = true; } // try with MTM in satisfied constraints - if (fj_cpu.feasible_found && !(score > fj_staged_score_t::zero())) { - thrust::tie(move, score) = find_mtm_move_sat(fj_cpu, fj_cpu.mtm_sat_samples); + if (fj_cpu->feasible_found && !(score > fj_staged_score_t::zero())) { + thrust::tie(move, score) = find_mtm_move_sat(*fj_cpu, fj_cpu->mtm_sat_samples); if (score > fj_staged_score_t::zero()) is_mtm_sat = true; } // if we're in the feasible region but haven't found improvements in the last n iterations, // perturb bool should_perturb = false; - if (fj_cpu.violated_constraints.empty() && - fj_cpu.iterations - fj_cpu.last_feasible_entrance_iter > fj_cpu.perturb_interval) { - should_perturb = true; - fj_cpu.last_feasible_entrance_iter = fj_cpu.iterations; + if (fj_cpu->violated_constraints.empty() && + fj_cpu->iterations - fj_cpu->last_feasible_entrance_iter > fj_cpu->perturb_interval) { + should_perturb = true; + fj_cpu->last_feasible_entrance_iter = fj_cpu->iterations; } if (score > fj_staged_score_t::zero() && !should_perturb) { - apply_move(fj_cpu, move.var_idx, move.value, false); + apply_move(*fj_cpu, move.var_idx, move.value, false); // Track move types - if (is_lift) fj_cpu.n_lift_moves_window++; - if (is_mtm_viol) fj_cpu.n_mtm_viol_moves_window++; - if (is_mtm_sat) fj_cpu.n_mtm_sat_moves_window++; + if (is_lift) fj_cpu->n_lift_moves_window++; + if (is_mtm_viol) fj_cpu->n_mtm_viol_moves_window++; + if (is_mtm_sat) fj_cpu->n_mtm_sat_moves_window++; } else { // Local Min - update_weights(fj_cpu); + update_weights(*fj_cpu); if (should_perturb) { - perturb(fj_cpu); - for (size_t i = 0; i < fj_cpu.cached_mtm_moves.size(); i++) - fj_cpu.cached_mtm_moves[i].first = 0; + perturb(*fj_cpu); + for (size_t i = 0; i < fj_cpu->cached_mtm_moves.size(); i++) + fj_cpu->cached_mtm_moves[i].first = 0; } thrust::tie(move, score) = - find_mtm_move_viol(fj_cpu, 1, true); // pick a single random violated constraint + find_mtm_move_viol(*fj_cpu, 1, true); // pick a single random violated constraint i_t var_idx = move.var_idx >= 0 ? move.var_idx : 0; f_t delta = move.var_idx >= 0 ? move.value : 0; - apply_move(fj_cpu, var_idx, delta, true); + apply_move(*fj_cpu, var_idx, delta, true); ++local_mins; - ++fj_cpu.n_local_minima_window; + ++fj_cpu->n_local_minima_window; } // number of violated constraints is usually small (<100). recomputing from all LHSs is cheap // and more numerically precise than just adding to the accumulator in apply_move - fj_cpu.total_violations = 0; - for (auto cstr_idx : fj_cpu.violated_constraints) { - fj_cpu.total_violations += fj_cpu.view.excess_score(cstr_idx, fj_cpu.h_lhs[cstr_idx]); + fj_cpu->total_violations = 0; + for (auto cstr_idx : fj_cpu->violated_constraints) { + fj_cpu->total_violations += fj_cpu->view.excess_score(cstr_idx, fj_cpu->h_lhs[cstr_idx]); } - if (fj_cpu.iterations % fj_cpu.log_interval == 0) { + if (fj_cpu->iterations % fj_cpu->log_interval == 0) { CUOPT_LOG_TRACE( "%sCPUFJ iteration: %d/%d, local mins: %d, best_objective: %g, viol: %zu, obj weight %g, " "maxw %g", - fj_cpu.log_prefix.c_str(), - fj_cpu.iterations, - fj_cpu.settings.iteration_limit != std::numeric_limits::max() - ? fj_cpu.settings.iteration_limit + fj_cpu->log_prefix.c_str(), + fj_cpu->iterations, + fj_cpu->settings.iteration_limit != std::numeric_limits::max() + ? fj_cpu->settings.iteration_limit : -1, local_mins, - fj_cpu.pb_ptr->get_user_obj_from_solver_obj(fj_cpu.h_best_objective), - fj_cpu.violated_constraints.size(), - fj_cpu.h_objective_weight, - fj_cpu.max_weight); + fj_cpu->pb_ptr->get_user_obj_from_solver_obj(fj_cpu->h_best_objective), + fj_cpu->violated_constraints.size(), + fj_cpu->h_objective_weight, + fj_cpu->max_weight); } // send current solution to callback every 3000 steps for diversity - if (fj_cpu.iterations % fj_cpu.diversity_callback_interval == 0) { - if (fj_cpu.diversity_callback) { - fj_cpu.diversity_callback(fj_cpu.h_incumbent_objective, fj_cpu.h_assignment); + if (fj_cpu->iterations % fj_cpu->diversity_callback_interval == 0) { + if (fj_cpu->diversity_callback) { + fj_cpu->diversity_callback(fj_cpu->h_incumbent_objective, fj_cpu->h_assignment); } } // Print timing statistics every N iterations #if CPUFJ_TIMING_TRACE - if (fj_cpu.iterations % fj_cpu.timing_stats_interval == 0 && fj_cpu.iterations > 0) { - print_timing_stats(fj_cpu); + if (fj_cpu->iterations % fj_cpu->timing_stats_interval == 0 && fj_cpu->iterations > 0) { + print_timing_stats(*fj_cpu); } #endif - if (fj_cpu.iterations % 100 == 0 && fj_cpu.iterations > 0) { + if (fj_cpu->iterations % 100 == 0 && fj_cpu->iterations > 0) { // Collect memory statistics - auto [loads, stores] = fj_cpu.memory_aggregator.collect(); - double biased_work = (loads + stores) * fj_cpu.work_unit_bias / 1e10; - fj_cpu.work_units_elapsed += biased_work; + auto [loads, stores] = fj_cpu->memory_aggregator.collect(); + double biased_work = (loads + stores) * fj_cpu->work_unit_bias / 1e10; + fj_cpu->work_units_elapsed += biased_work; - if (fj_cpu.producer_sync != nullptr) { fj_cpu.producer_sync->notify_progress(); } + if (fj_cpu->producer_sync != nullptr) { fj_cpu->producer_sync->notify_progress(); } } - cuopt_func_call(sanity_checks(fj_cpu)); - fj_cpu.iterations++; - fj_cpu.iterations_since_best++; + cuopt_func_call(sanity_checks(*fj_cpu)); + fj_cpu->iterations++; + fj_cpu->iterations_since_best++; } auto loop_end = std::chrono::high_resolution_clock::now(); double total_time = std::chrono::duration_cast>(loop_end - loop_start).count(); - double avg_time_per_iter = total_time / fj_cpu.iterations; + double avg_time_per_iter = fj_cpu->iterations > 0 ? total_time / fj_cpu->iterations : 0; CUOPT_LOG_TRACE("%sCPUFJ Average time per iteration: %.8fms", - fj_cpu.log_prefix.c_str(), + fj_cpu->log_prefix.c_str(), avg_time_per_iter * 1000.0); #if CPUFJ_TIMING_TRACE // Print final timing statistics CUOPT_LOG_TRACE("=== Final Timing Statistics ==="); - print_timing_stats(fj_cpu); + print_timing_stats(*fj_cpu); #endif - - return fj_cpu.feasible_found; -} - -template -bool fj_t::cpu_solve(fj_cpu_climber_t& fj_cpu, f_t in_time_limit) -{ - raft::common::nvtx::range scope("fj_cpu"); - return cpufj_solve_loop(fj_cpu, in_time_limit); -} - -template -cpu_fj_thread_t::~cpu_fj_thread_t() -{ - this->request_termination(); -} - -template -void cpu_fj_thread_t::run_worker() -{ - cpu_fj_solution_found = cpufj_solve_loop(*fj_cpu, time_limit); -} - -template -void cpu_fj_thread_t::on_terminate() -{ - if (fj_cpu) fj_cpu->halted = true; -} - -template -void cpu_fj_thread_t::on_start() -{ - cuopt_assert(fj_cpu != nullptr, "fj_cpu must not be null"); - fj_cpu->halted = false; -} - -template -void cpu_fj_thread_t::stop_cpu_solver() -{ - fj_cpu->halted = true; } template @@ -1635,7 +1595,7 @@ std::unique_ptr> init_fj_cpu_standalone( #if MIP_INSTANTIATE_FLOAT template class fj_t; -template class cpu_fj_thread_t; +template void cpufj_solve(fj_cpu_climber_t* fj_cpu, float in_time_limit); template std::unique_ptr> init_fj_cpu_standalone( problem_t& problem, solution_t& solution, @@ -1645,7 +1605,7 @@ template std::unique_ptr> init_fj_cpu_standalone( #if MIP_INSTANTIATE_DOUBLE template class fj_t; -template class cpu_fj_thread_t; +template void cpufj_solve(fj_cpu_climber_t* fj_cpu, double in_time_limit); template std::unique_ptr> init_fj_cpu_standalone( problem_t& problem, solution_t& solution, diff --git a/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cuh b/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cuh index 3263609a2b..76bf158f9e 100644 --- a/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cuh +++ b/cpp/src/mip_heuristics/feasibility_jump/fj_cpu.cuh @@ -8,16 +8,12 @@ #pragma once #include -#include #include #include -#include -#include #include #include #include -#include #include #include @@ -126,7 +122,7 @@ struct fj_cpu_climber_t { // vector is actually likely beneficial here since we're memory bound std::vector flip_move_computed; - ; + // CSR nnz offset -> (delta, score) std::vector> cached_mtm_moves; @@ -194,21 +190,8 @@ struct fj_cpu_climber_t { }; template -struct cpu_fj_thread_t : public cpu_worker_thread_base_t> { - ~cpu_fj_thread_t(); - - void run_worker(); - void on_terminate(); - void on_start(); - bool get_result() { return cpu_fj_solution_found; } - - void stop_cpu_solver(); - - std::atomic cpu_fj_solution_found{false}; - f_t time_limit{+std::numeric_limits::infinity()}; - std::unique_ptr> fj_cpu; - fj_t* fj_ptr{nullptr}; -}; +void cpufj_solve(fj_cpu_climber_t* fj_cpu, + f_t in_time_limit = std::numeric_limits::infinity()); // Standalone CPUFJ init for running without full fj_t infrastructure (avoids GPU allocations). // Used for early CPUFJ during presolve. diff --git a/cpp/src/mip_heuristics/local_search/local_search.cu b/cpp/src/mip_heuristics/local_search/local_search.cu index da29511d70..58918148ba 100644 --- a/cpp/src/mip_heuristics/local_search/local_search.cu +++ b/cpp/src/mip_heuristics/local_search/local_search.cu @@ -20,10 +20,6 @@ #include -#include - -#include - namespace cuopt::linear_programming::detail { template @@ -47,20 +43,11 @@ local_search_t::local_search_t(mip_solver_context_t& context problem_with_objective_cut(*context.problem_ptr, context.problem_ptr->handle_ptr) { const int n_cpufj = context.settings.heuristic_params.num_cpufj_threads; - for (int i = 0; i < n_cpufj; ++i) { - ls_cpu_fj.push_back(std::make_unique>()); - ls_cpu_fj.back()->fj_ptr = &fj; - } - scratch_cpu_fj.push_back(std::make_unique>()); - scratch_cpu_fj.back()->fj_ptr = &fj; - scratch_cpu_fj_on_lp_opt.fj_ptr = &fj; - + ls_cpu_fj.resize(n_cpufj); + scratch_cpu_fj.resize(1); fj.settings.n_of_minimums_for_exit = context.settings.heuristic_params.n_of_minimums_for_exit; } -static double local_search_best_obj = std::numeric_limits::max(); -static population_t* pop_ptr = nullptr; - template void local_search_t::start_cpufj_scratch_threads(population_t& population) { @@ -75,37 +62,40 @@ void local_search_t::start_cpufj_scratch_threads(population_t 0) solution.assign_random_within_bounds(0.4); - cpu_fj.fj_cpu = cpu_fj.fj_ptr->create_cpu_climber(solution, - default_weights, - default_weights, - 0., - context.preempt_heuristic_solver_, - fj_settings_t{}, - /*randomize=*/counter > 0); - - cpu_fj.fj_cpu->log_prefix = "******* scratch " + std::to_string(counter) + ": "; - cpu_fj.fj_cpu->improvement_callback = - [&population, problem_ptr = context.problem_ptr]( + cpu_fj = fj.create_cpu_climber(solution, + default_weights, + default_weights, + 0., + context.preempt_heuristic_solver_, + fj_settings_t{}, + /*randomize=*/counter > 0); + + cpu_fj->log_prefix = "******* scratch " + std::to_string(counter) + ": "; + cpu_fj->improvement_callback = + [this, &population, problem_ptr = context.problem_ptr]( f_t obj, const std::vector& h_vec, double /*work_units*/) { population.add_external_solution(h_vec, obj, solution_origin_t::CPUFJ); (void)problem_ptr; - if (obj < local_search_best_obj) { + if (obj < this->local_search_best_obj) { CUOPT_LOG_TRACE("******* New local search best obj %g, best overall %g", problem_ptr->get_user_obj_from_solver_obj(obj), problem_ptr->get_user_obj_from_solver_obj( population.is_feasible() ? population.best_feasible().get_objective() : std::numeric_limits::max())); - local_search_best_obj = obj; + this->local_search_best_obj = obj; } }; counter++; }; - for (auto& cpu_fj_ptr : scratch_cpu_fj) { - cpu_fj_ptr->start_cpu_solver(); + CUOPT_LOG_DEBUG("Launching %d scratch CPUFJ tasks", scratch_cpu_fj.size()); + + for (size_t i = 0; i < scratch_cpu_fj.size(); ++i) { + auto ptr = scratch_cpu_fj[i].get(); +#pragma omp task firstprivate(ptr) depend(out : *ptr) default(none) + cpufj_solve(ptr); } } @@ -121,34 +111,44 @@ void local_search_t::start_cpufj_lptopt_scratch_threads( solution_lp.copy_new_assignment( host_copy(lp_optimal_solution, context.problem_ptr->handle_ptr->get_stream())); solution_lp.round_random_nearest(500); - scratch_cpu_fj_on_lp_opt.fj_cpu = fj.create_cpu_climber( + scratch_cpu_fj_on_lp_opt = fj.create_cpu_climber( solution_lp, default_weights, default_weights, 0., context.preempt_heuristic_solver_); - scratch_cpu_fj_on_lp_opt.fj_cpu->log_prefix = "******* scratch on LP optimal: "; - scratch_cpu_fj_on_lp_opt.fj_cpu->improvement_callback = + scratch_cpu_fj_on_lp_opt->log_prefix = "******* scratch on LP optimal: "; + scratch_cpu_fj_on_lp_opt->improvement_callback = [this, &population](f_t obj, const std::vector& h_vec, double /*work_units*/) { population.add_external_solution(h_vec, obj, solution_origin_t::CPUFJ); - if (obj < local_search_best_obj) { + if (obj < this->local_search_best_obj) { CUOPT_LOG_DEBUG("******* New local search best obj %g, best overall %g", context.problem_ptr->get_user_obj_from_solver_obj(obj), context.problem_ptr->get_user_obj_from_solver_obj( population.is_feasible() ? population.best_feasible().get_objective() : std::numeric_limits::max())); - local_search_best_obj = obj; + this->local_search_best_obj = obj; } }; - // default weights - cudaDeviceSynchronize(); - scratch_cpu_fj_on_lp_opt.start_cpu_solver(); + CUOPT_LOG_DEBUG("Launching scratch CPUFJ (on LP optimal) task"); + +#pragma omp task shared(scratch_cpu_fj_on_lp_opt) default(none) \ + depend(out : *scratch_cpu_fj_on_lp_opt) + cpufj_solve(scratch_cpu_fj_on_lp_opt.get()); } template void local_search_t::stop_cpufj_scratch_threads() { - for (auto& cpu_fj_ptr : scratch_cpu_fj) { - cpu_fj_ptr->request_termination(); + for (size_t i = 0; i < scratch_cpu_fj.size(); ++i) { + scratch_cpu_fj[i]->halted = true; +#pragma omp taskwait depend(in : *scratch_cpu_fj[i]) // Wait for each scratch CPU FJ task to finish + } + + if (scratch_cpu_fj_on_lp_opt) { + scratch_cpu_fj_on_lp_opt->halted = true; +#pragma omp taskwait depend( \ + in : *scratch_cpu_fj_on_lp_opt) // Wait for the scratch CPU FJ (LP optimal) task to finish + + CUOPT_LOG_DEBUG("All scratch CPUFJ tasks were stopped"); } - scratch_cpu_fj_on_lp_opt.request_termination(); } template @@ -164,29 +164,30 @@ void local_search_t::start_cpufj_deterministic( 0.0); solution.clamp_within_bounds(); - deterministic_cpu_fj.fj_ptr = &fj; - deterministic_cpu_fj.fj_cpu = fj.create_cpu_climber(solution, - default_weights, - default_weights, - 0., - context.preempt_heuristic_solver_, - fj_settings_t{}, - /*randomize=*/true); + deterministic_cpu_fj = fj.create_cpu_climber(solution, + default_weights, + default_weights, + 0., + context.preempt_heuristic_solver_, + fj_settings_t{}, + /*randomize=*/true); - deterministic_cpu_fj.fj_cpu->log_prefix = "******* deterministic CPUFJ: "; + deterministic_cpu_fj->log_prefix = "******* deterministic CPUFJ: "; // Register with producer_sync for B&B synchronization - producer_sync_t& producer_sync = bb.get_producer_sync(); - deterministic_cpu_fj.fj_cpu->producer_sync = &producer_sync; - producer_sync.register_producer(&deterministic_cpu_fj.fj_cpu->work_units_elapsed); + producer_sync_t& producer_sync = bb.get_producer_sync(); + deterministic_cpu_fj->producer_sync = &producer_sync; + producer_sync.register_producer(&deterministic_cpu_fj->work_units_elapsed); // Set up callback to send solutions to B&B with work unit timestamps - deterministic_cpu_fj.fj_cpu->improvement_callback = + deterministic_cpu_fj->improvement_callback = [&bb](f_t obj, const std::vector& h_vec, double work_units) { bb.queue_external_solution_deterministic(h_vec, work_units); }; - deterministic_cpu_fj.start_cpu_solver(); + CUOPT_LOG_DEBUG("Launching deterministic CPUFJ task"); +#pragma omp task shared(deterministic_cpu_fj) default(none) depend(inout : *deterministic_cpu_fj) + cpufj_solve(deterministic_cpu_fj.get()); // Signal that registration is complete - B&B can now wait on producers producer_sync.registration_complete(); @@ -195,12 +196,16 @@ void local_search_t::start_cpufj_deterministic( template void local_search_t::stop_cpufj_deterministic() { - if (deterministic_cpu_fj.fj_cpu) { - if (deterministic_cpu_fj.fj_cpu->producer_sync) { - deterministic_cpu_fj.fj_cpu->producer_sync->deregister_producer( - &deterministic_cpu_fj.fj_cpu->work_units_elapsed); + if (deterministic_cpu_fj) { + if (deterministic_cpu_fj->producer_sync) { + deterministic_cpu_fj->producer_sync->deregister_producer( + &deterministic_cpu_fj->work_units_elapsed); } - deterministic_cpu_fj.request_termination(); + + deterministic_cpu_fj->halted = true; +#pragma omp taskwait depend( \ + in : *deterministic_cpu_fj) // Wait for deterministic CPU FJ task to finish + CUOPT_LOG_DEBUG("Deterministic CPUFJ task was stopped"); } } @@ -233,48 +238,50 @@ bool local_search_t::do_fj_solve(solution_t& solution, } auto h_weights = cuopt::host_copy(in_fj.cstr_weights, solution.handle_ptr->get_stream()); auto h_objective_weight = in_fj.objective_weight.value(solution.handle_ptr->get_stream()); - for (auto& cpu_fj_ptr : ls_cpu_fj) { - auto& cpu_fj = *cpu_fj_ptr; - cpu_fj.fj_cpu = cpu_fj.fj_ptr->create_cpu_climber(solution, - h_weights, - h_weights, - h_objective_weight, - context.preempt_heuristic_solver_, - fj_settings_t{}, - true); + for (auto& cpu_fj : ls_cpu_fj) { + cpu_fj = fj.create_cpu_climber(solution, + h_weights, + h_weights, + h_objective_weight, + context.preempt_heuristic_solver_, + fj_settings_t{}, + true); } auto solution_copy = solution; // Start CPU solver in background thread - for (auto& cpu_fj_ptr : ls_cpu_fj) { - cpu_fj_ptr->start_cpu_solver(); - } +#pragma omp taskgroup + { + if (ls_cpu_fj.size() > 0) { + CUOPT_LOG_DEBUG("Launching %d CPUFJ tasks", ls_cpu_fj.size()); + +#pragma omp taskloop shared(ls_cpu_fj) default(none) num_tasks(ls_cpu_fj.size()) nogroup + for (size_t i = 0; i < ls_cpu_fj.size(); ++i) { + cpufj_solve(ls_cpu_fj[i].get()); + } + } - // Run GPU solver and measure execution time - auto gpu_fj_start = std::chrono::high_resolution_clock::now(); - in_fj.settings.time_limit = timer.remaining_time(); - in_fj.solve(solution); + // Run GPU solver + in_fj.settings.time_limit = timer.remaining_time(); + in_fj.solve(solution); - // Stop CPU solver - for (auto& cpu_fj_ptr : ls_cpu_fj) { - cpu_fj_ptr->stop_cpu_solver(); - } + for (size_t i = 0; i < ls_cpu_fj.size(); ++i) { + ls_cpu_fj[i]->halted = true; + } + } // implicit barrier that waits all CPU FJ tasks to finish - auto gpu_fj_end = std::chrono::high_resolution_clock::now(); - double gpu_fj_duration = std::chrono::duration(gpu_fj_end - gpu_fj_start).count(); + CUOPT_LOG_DEBUG("All CPUFJ tasks were stopped"); solution_t solution_cpu(*solution.problem_ptr); - f_t best_cpu_obj = std::numeric_limits::max(); - // // Wait for CPU solver to finish - for (auto& cpu_fj_ptr : ls_cpu_fj) { - bool cpu_sol_found = cpu_fj_ptr->wait_for_cpu_solver(); - if (cpu_sol_found) { - f_t cpu_obj = cpu_fj_ptr->fj_cpu->h_best_objective; + + for (size_t i = 0; i < ls_cpu_fj.size(); ++i) { + if (ls_cpu_fj[i]->feasible_found) { + f_t cpu_obj = ls_cpu_fj[i]->h_best_objective; if (cpu_obj < best_cpu_obj) { best_cpu_obj = cpu_obj; - solution_cpu.copy_new_assignment(cpu_fj_ptr->fj_cpu->h_best_assignment); + solution_cpu.copy_new_assignment(ls_cpu_fj[i]->h_best_assignment); solution_cpu.compute_feasibility(); } } diff --git a/cpp/src/mip_heuristics/local_search/local_search.cuh b/cpp/src/mip_heuristics/local_search/local_search.cuh index 94493ebcb3..9befd34ab5 100644 --- a/cpp/src/mip_heuristics/local_search/local_search.cuh +++ b/cpp/src/mip_heuristics/local_search/local_search.cuh @@ -11,16 +11,10 @@ #include #include #include -#include #include +#include #include -#include -#include -#include -#include -#include - namespace cuopt::linear_programming::dual_simplex { template class branch_and_bound_t; @@ -126,12 +120,15 @@ class local_search_t { feasibility_pump_t fp; std::mt19937 rng; - std::vector>> ls_cpu_fj; - std::vector>> scratch_cpu_fj; - cpu_fj_thread_t scratch_cpu_fj_on_lp_opt; - cpu_fj_thread_t deterministic_cpu_fj; + std::vector>> ls_cpu_fj; + std::vector>> scratch_cpu_fj; + std::unique_ptr> scratch_cpu_fj_on_lp_opt; + std::unique_ptr> deterministic_cpu_fj; problem_t problem_with_objective_cut; bool cutting_plane_added_for_active_run{false}; + + omp_atomic_t local_search_best_obj{std::numeric_limits::max()}; + population_t* pop_ptr{nullptr}; }; } // namespace cuopt::linear_programming::detail diff --git a/cpp/src/mip_heuristics/presolve/bounds_presolve.cuh b/cpp/src/mip_heuristics/presolve/bounds_presolve.cuh index 8b57cc7019..ed0b91466d 100644 --- a/cpp/src/mip_heuristics/presolve/bounds_presolve.cuh +++ b/cpp/src/mip_heuristics/presolve/bounds_presolve.cuh @@ -34,7 +34,7 @@ class bound_presolve_t { struct settings_t { f_t time_limit{60.0}; i_t iteration_limit{std::numeric_limits::max()}; - i_t num_threads = -1; + i_t num_tasks = -1; bool parallel_bounds_update{true}; }; diff --git a/cpp/src/mip_heuristics/presolve/conditional_bound_strengthening.cu b/cpp/src/mip_heuristics/presolve/conditional_bound_strengthening.cu index 24cac7129f..52333b1c14 100644 --- a/cpp/src/mip_heuristics/presolve/conditional_bound_strengthening.cu +++ b/cpp/src/mip_heuristics/presolve/conditional_bound_strengthening.cu @@ -246,11 +246,14 @@ void conditional_bound_strengthening_t::select_constraint_pairs_host( std::vector constraint_pairs_h(max_pair_per_row * problem.n_constraints, {-1, -1}); std::unordered_set cnstr_pair; -#pragma omp parallel for private(cnstr_pair) - for (int cnstr = 0; cnstr < problem.n_constraints; ++cnstr) { - for (int jj = offsets[cnstr]; jj < offsets[cnstr + 1]; ++jj) { + i_t num_tasks = omp_get_num_threads() - 4; + + CUOPT_LOG_INFO("Selecting constraint pairs with %d tasks", num_tasks); +#pragma omp taskloop num_tasks(num_tasks) private(cnstr_pair) default(shared) + for (i_t cnstr = 0; cnstr < problem.n_constraints; ++cnstr) { + for (i_t jj = offsets[cnstr]; jj < offsets[cnstr + 1]; ++jj) { int var = variables[jj]; - for (int kk = reverse_offsets[var]; kk < reverse_offsets[var + 1]; ++kk) { + for (i_t kk = reverse_offsets[var]; kk < reverse_offsets[var + 1]; ++kk) { if (reverse_constraints[kk] != cnstr) { cnstr_pair.insert(reverse_constraints[kk]); } if (cnstr_pair.size() == max_pair_per_row) { break; } } @@ -263,7 +266,7 @@ void conditional_bound_strengthening_t::select_constraint_pairs_host( constraint_pairs_h[cnstr * max_pair_per_row + counter++] = {cnstr, temp}; } cnstr_pair.clear(); - } + } // implicit barrier that waits for all iterations to finish before proceeding constraint_pairs = cuopt::device_copy(constraint_pairs_h, problem.handle_ptr->get_stream()); diff --git a/cpp/src/mip_heuristics/presolve/probing_cache.cu b/cpp/src/mip_heuristics/presolve/probing_cache.cu index e45f2394ed..9d3fb65a36 100644 --- a/cpp/src/mip_heuristics/presolve/probing_cache.cu +++ b/cpp/src/mip_heuristics/presolve/probing_cache.cu @@ -22,6 +22,7 @@ #include #include +#include namespace cuopt::linear_programming::detail { @@ -860,18 +861,17 @@ bool compute_probing_cache(bound_presolve_t& bound_presolve, bound_presolve.settings.iteration_limit = 50; bound_presolve.settings.time_limit = timer.remaining_time(); - size_t num_threads = bound_presolve.settings.num_threads < 0 - ? 0.2 * omp_get_max_threads() - : bound_presolve.settings.num_threads; - num_threads = std::clamp(num_threads, 1, 8); + size_t num_tasks = bound_presolve.settings.num_tasks < 0 ? 0.2 * omp_get_num_threads() + : bound_presolve.settings.num_tasks; + num_tasks = std::clamp(num_tasks, 1, 8); // Create a vector of multi_probe_t objects std::vector> multi_probe_presolve_pool; - std::vector>> modification_vector_pool(num_threads); - std::vector>> substitution_vector_pool(num_threads); + std::vector>> modification_vector_pool(num_tasks); + std::vector>> substitution_vector_pool(num_tasks); // Initialize multi_probe_presolve_pool - for (size_t i = 0; i < num_threads; i++) { + for (size_t i = 0; i < num_tasks; i++) { multi_probe_presolve_pool.emplace_back(bound_presolve.context); multi_probe_presolve_pool[i].resize(problem); multi_probe_presolve_pool[i].compute_stats = true; @@ -890,23 +890,28 @@ bool compute_probing_cache(bound_presolve_t& bound_presolve, // are visible before any per-thread kernel can reference that memory. problem.handle_ptr->sync_stream(); -// Main parallel loop -#pragma omp parallel num_threads(num_threads) - { - for (size_t step_start = 0; step_start < priority_indices.size(); step_start += step_size) { - if (timer.check_time_limit() || early_exit || problem_is_infeasible.load()) { break; } - size_t step_end = std::min(step_start + step_size, priority_indices.size()); + CUOPT_LOG_INFO("Running probing cache with %d tasks", num_tasks); -#pragma omp for - for (size_t i = step_start; i < step_end; ++i) { - auto var_idx = priority_indices[i]; - if (timer.check_time_limit()) { continue; } + // Main parallel loop + for (size_t step_start = 0; step_start < priority_indices.size(); step_start += step_size) { + if (timer.check_time_limit() || early_exit || problem_is_infeasible.load()) { break; } + size_t step_end = std::min(step_start + step_size, priority_indices.size()); - int thread_idx = omp_get_thread_num(); - CUOPT_LOG_TRACE("Computing probing cache for var %d on thread %d", var_idx, thread_idx); +#pragma omp taskloop num_tasks(num_tasks) default(shared) + for (size_t task_id = 0; task_id < num_tasks; ++task_id) { + size_t n = step_end - step_start; + size_t begin = step_start + std::floor(static_cast(n) * task_id / num_tasks); + size_t end = step_start + std::floor(static_cast(n) * (task_id + 1) / num_tasks); + auto& multi_probe_presolve = multi_probe_presolve_pool[task_id]; + auto& modification_vector = modification_vector_pool[task_id]; + auto& substitution_vector = substitution_vector_pool[task_id]; + if (timer.check_time_limit()) { continue; } - auto& multi_probe_presolve = multi_probe_presolve_pool[thread_idx]; + for (size_t i = begin; i < end; ++i) { + auto var_idx = priority_indices[i]; + if (timer.check_time_limit()) { continue; } + CUOPT_LOG_TRACE("Computing probing cache for var %d on thread %d", var_idx, task_id); compute_cache_for_var(var_idx, bound_presolve, problem, @@ -916,30 +921,29 @@ bool compute_probing_cache(bound_presolve_t& bound_presolve, n_of_implied_singletons, n_of_cached_probings, problem_is_infeasible, - modification_vector_pool[thread_idx], - substitution_vector_pool[thread_idx], + modification_vector, + substitution_vector, timer, problem.handle_ptr->get_device()); } + } // implicit barrier that waits for all iterations to finish before proceeding + + // TODO when we have determinism, check current threads work/time counter and filter queue + // items that are smaller or equal to that + apply_modification_queue_to_problem(modification_vector_pool, problem); + // copy host bounds again, because we changed some problem bounds + raft::copy(h_var_bounds.data(), + problem.variable_bounds.data(), + h_var_bounds.size(), + problem.handle_ptr->get_stream()); + problem.handle_ptr->sync_stream(); + if (n_of_implied_singletons - last_it_implied_singletons < + (size_t)std::max(2, (min(100, problem.n_variables / 50)))) { + early_exit = true; } -#pragma omp single - { - // TODO when we have determinism, check current threads work/time counter and filter queue - // items that are smaller or equal to that - apply_modification_queue_to_problem(modification_vector_pool, problem); - // copy host bounds again, because we changed some problem bounds - raft::copy(h_var_bounds.data(), - problem.variable_bounds.data(), - h_var_bounds.size(), - problem.handle_ptr->get_stream()); - problem.handle_ptr->sync_stream(); - if (n_of_implied_singletons - last_it_implied_singletons < - (size_t)std::max(2, (min(100, problem.n_variables / 50)))) { - early_exit = true; - } - last_it_implied_singletons = n_of_implied_singletons; - } + last_it_implied_singletons = n_of_implied_singletons; } // end of step + apply_substitution_queue_to_problem(substitution_vector_pool, problem); CUOPT_LOG_DEBUG("Total number of cached probings %lu number of implied singletons %lu", n_of_cached_probings.load(), diff --git a/cpp/src/mip_heuristics/solve.cu b/cpp/src/mip_heuristics/solve.cu index be01516657..6bf6ae9ce7 100644 --- a/cpp/src/mip_heuristics/solve.cu +++ b/cpp/src/mip_heuristics/solve.cu @@ -47,6 +47,7 @@ #include #include +#include namespace cuopt::linear_programming { @@ -80,11 +81,11 @@ static void invoke_solution_callbacks( } template -mip_solution_t run_mip(detail::problem_t& problem, - mip_solver_settings_t const& settings, - timer_t& timer, - f_t& initial_upper_bound, - std::vector& initial_incumbent_assignment) +mip_solution_t run_mip_solver(detail::problem_t& problem, + mip_solver_settings_t const& settings, + timer_t& timer, + f_t& initial_upper_bound, + std::vector& initial_incumbent_assignment) { try { raft::common::nvtx::range fun_scope("run_mip"); @@ -248,8 +249,8 @@ mip_solution_t run_mip(detail::problem_t& problem, } template -mip_solution_t solve_mip(optimization_problem_t& op_problem, - mip_solver_settings_t const& settings_const) +mip_solution_t solve_mip_helper(optimization_problem_t& op_problem, + mip_solver_settings_t const& settings_const) { try { mip_solver_settings_t settings(settings_const); @@ -469,10 +470,10 @@ mip_solution_t solve_mip(optimization_problem_t& op_problem, CUOPT_LOG_INFO("Writing presolved problem to file: %s", settings.presolve_file.c_str()); presolve_result_opt->reduced_problem.write_to_mps(settings.presolve_file); } - // early_best_user_obj is in user-space. // run_mip stores it in context.initial_upper_bound and converts to target spaces as needed. - auto sol = run_mip(problem, settings, timer, early_best_user_obj, early_best_user_assignment); + auto sol = + run_mip_solver(problem, settings, timer, early_best_user_obj, early_best_user_assignment); const f_t cuopt_presolve_time = sol.get_stats().presolve_time; if (run_presolve) { @@ -567,6 +568,53 @@ mip_solution_t solve_mip(optimization_problem_t& op_problem, throw; } } +template +mip_solution_t solve_mip(optimization_problem_t& op_problem, + mip_solver_settings_t const& settings_const) +{ + std::exception_ptr exception; + i_t num_threads = 0; + if (settings_const.num_cpu_threads < 0) { + num_threads = omp_get_max_threads(); + } else { + num_threads = settings_const.num_cpu_threads; + } + + if (num_threads < 4) { + CUOPT_LOG_ERROR("The MIP solver requires at least 4 CPU threads!"); + return mip_solution_t{ + cuopt::logic_error("The number of CPU threads is below than expected.", + cuopt::error_type_t::RuntimeError), + op_problem.get_handle_ptr()->get_stream()}; + } + + // TODO: Remove this after converting deterministic B&B to use tasks. This allows + // creating a nested parallel region. + omp_set_max_active_levels(2); + + mip_solution_t sol(mip_termination_status_t::NoTermination, + solver_stats_t{}, + op_problem.get_handle_ptr()->get_stream()); + + // Creates the OpenMP thread pool. It will be shared across the entire MIP solver. +#pragma omp parallel num_threads(num_threads) default(none) \ + shared(sol, op_problem, settings_const, exception) + { +#pragma omp master + { + try { + sol = solve_mip_helper(op_problem, settings_const); + } catch (...) { + // We cannot throw inside an OpenMP parallel region. So we need to catch and then + // re-throw later. + exception = std::current_exception(); + } + } + } // Implicit barrier + + if (exception) { std::rethrow_exception(exception); } + return sol; +} template mip_solution_t solve_mip( diff --git a/cpp/src/mip_heuristics/solver.cu b/cpp/src/mip_heuristics/solver.cu index ce6b602fba..0229a8f27f 100644 --- a/cpp/src/mip_heuristics/solver.cu +++ b/cpp/src/mip_heuristics/solver.cu @@ -181,6 +181,8 @@ void extract_probing_implied_bounds( template solution_t mip_solver_t::run_solver() { + solution_t sol(*context.problem_ptr); + // we need to keep original problem const cuopt_assert(context.problem_ptr != nullptr, "invalid problem pointer"); context.problem_ptr->tolerances = context.settings.get_tolerances(); @@ -227,14 +229,12 @@ solution_t mip_solver_t::run_solver() if (!presolve_success) { CUOPT_LOG_INFO("Problem proven infeasible in presolve"); - solution_t sol(*context.problem_ptr); sol.set_problem_fully_reduced(); context.problem_ptr->post_process_solution(sol); return sol; } if (run_presolve && context.problem_ptr->empty) { CUOPT_LOG_INFO("Problem full reduced in presolve"); - solution_t sol(*context.problem_ptr); sol.set_problem_fully_reduced(); for (auto callback : context.settings.get_mip_callbacks()) { if (callback->get_type() == internals::base_solution_callback_type::GET_SOLUTION) { @@ -286,8 +286,8 @@ solution_t mip_solver_t::run_solver() } context.work_unit_scheduler_.register_context(context.gpu_heur_loop); - namespace dual_simplex = cuopt::linear_programming::dual_simplex; - std::future branch_and_bound_status_future; + namespace dual_simplex = cuopt::linear_programming::dual_simplex; + dual_simplex::mip_status_t branch_and_bound_status = dual_simplex::mip_status_t::UNSET; dual_simplex::user_problem_t branch_and_bound_problem(context.problem_ptr->handle_ptr); context.problem_ptr->recompute_objective_integrality(); if (context.problem_ptr->is_objective_integral()) { @@ -302,8 +302,9 @@ solution_t mip_solver_t::run_solver() dual_simplex::probing_implied_bound_t probing_implied_bound; - bool run_bb = !context.settings.heuristics_only; - if (run_bb) { + i_t num_threads = omp_get_num_threads(); + + if (!context.settings.heuristics_only) { // Convert the presolved problem to dual_simplex::user_problem_t op_problem_.get_host_user_problem(branch_and_bound_problem); // Resize the solution now that we know the number of columns/variables @@ -317,6 +318,7 @@ solution_t mip_solver_t::run_solver() // Fill in the settings for branch and bound branch_and_bound_settings.time_limit = timer_.get_time_limit(); branch_and_bound_settings.node_limit = context.settings.node_limit; + branch_and_bound_settings.num_threads = std::max(num_threads - 1, 1); branch_and_bound_settings.print_presolve_stats = false; branch_and_bound_settings.absolute_mip_gap_tol = context.settings.tolerances.absolute_mip_gap; branch_and_bound_settings.relative_mip_gap_tol = context.settings.tolerances.relative_mip_gap; @@ -356,21 +358,18 @@ solution_t mip_solver_t::run_solver() ? 2 : context.settings.reduced_cost_strengthening; - if (context.settings.num_cpu_threads < 0) { - branch_and_bound_settings.num_threads = std::max(1, omp_get_max_threads() - 1); - } else { - branch_and_bound_settings.num_threads = std::max(1, context.settings.num_cpu_threads); - } - // Set the branch and bound -> primal heuristics callback branch_and_bound_settings.solution_callback = std::bind(&branch_and_bound_solution_helper_t::solution_callback, &solution_helper, std::placeholders::_1, std::placeholders::_2); - // heuristic_preemption_callback is needed in both modes to properly stop the heuristic thread + + // heuristic_preemption_callback is needed in both modes to properly stop the heuristic + // thread branch_and_bound_settings.heuristic_preemption_callback = std::bind( &branch_and_bound_solution_helper_t::preempt_heuristic_solver, &solution_helper); + if (context.settings.determinism_mode == CUOPT_MODE_OPPORTUNISTIC) { branch_and_bound_settings.set_simplex_solution_callback = std::bind(&branch_and_bound_solution_helper_t::set_simplex_solution, @@ -444,33 +443,34 @@ solution_t mip_solver_t::run_solver() if (timer_.check_time_limit()) { CUOPT_LOG_INFO("Time limit reached during B&B setup"); - solution_t sol(*context.problem_ptr); context.stats.total_solve_time = timer_.elapsed_time(); context.problem_ptr->post_process_solution(sol); return sol; } - - // Fork a thread for branch and bound - // std::async and std::future allow us to get the return value of bb::solve() - // without having to manually manage the thread - // std::future.get() performs a join() operation to wait until the return status is available - branch_and_bound_status_future = std::async(std::launch::async, - &dual_simplex::branch_and_bound_t::solve, - branch_and_bound.get(), - std::ref(branch_and_bound_solution)); } - // Start the primal heuristics - context.diversity_manager_ptr = &dm; - auto sol = dm.run_solver(); - if (run_bb) { - // Wait for the branch and bound to finish - auto bb_status = branch_and_bound_status_future.get(); +#pragma omp taskgroup + { + if (!context.settings.heuristics_only) { +#pragma omp task default(shared) + { + branch_and_bound_status = branch_and_bound->solve(branch_and_bound_solution); + } + } + + // Start the primal heuristics + context.diversity_manager_ptr = &dm; + sol = dm.run_solver(); + } // implicit barrier for all tasks created in B&B and heuristics + + if (!context.settings.heuristics_only) { if (branch_and_bound_solution.lower_bound > -std::numeric_limits::infinity()) { context.stats.set_solution_bound( context.problem_ptr->get_user_obj_from_solver_obj(branch_and_bound_solution.lower_bound)); } - if (bb_status == dual_simplex::mip_status_t::INFEASIBLE) { sol.set_problem_fully_reduced(); } + if (branch_and_bound_status == dual_simplex::mip_status_t::INFEASIBLE) { + sol.set_problem_fully_reduced(); + } context.stats.num_nodes = branch_and_bound_solution.nodes_explored; context.stats.num_simplex_iterations = branch_and_bound_solution.simplex_iterations; } diff --git a/cpp/src/mip_heuristics/utilities/cpu_worker_thread.cuh b/cpp/src/mip_heuristics/utilities/cpu_worker_thread.cuh deleted file mode 100644 index 2b982e1f47..0000000000 --- a/cpp/src/mip_heuristics/utilities/cpu_worker_thread.cuh +++ /dev/null @@ -1,147 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights - * reserved. SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include - -namespace cuopt::linear_programming::detail { - -template -class cpu_worker_thread_base_t { - public: - cpu_worker_thread_base_t(); - ~cpu_worker_thread_base_t(); - - void start_cpu_solver(); - bool wait_for_cpu_solver(); - - // Derived classes MUST call this in their destructor before the base destructor runs. - // This ensures on_terminate() is called while the derived object is still fully alive. - void request_termination(); - - // Internal method for thread management - safe to call during destruction - void join_worker(); - void cpu_worker_thread(); - - std::thread cpu_worker; - std::mutex cpu_mutex; - std::condition_variable cpu_cv; - std::atomic should_stop{false}; - std::atomic cpu_thread_should_start{false}; - std::atomic cpu_thread_done{true}; - std::atomic cpu_thread_terminate{false}; -}; - -template -cpu_worker_thread_base_t::cpu_worker_thread_base_t() -{ - cpu_worker = std::thread(&cpu_worker_thread_base_t::cpu_worker_thread, this); -} - -template -cpu_worker_thread_base_t::~cpu_worker_thread_base_t() -{ - // Note: We don't call on_terminate() here since the derived object is already destroyed. - join_worker(); -} - -template -void cpu_worker_thread_base_t::cpu_worker_thread() -{ - while (!cpu_thread_terminate) { - { - std::unique_lock lock(cpu_mutex); - cpu_cv.wait(lock, [this] { return cpu_thread_should_start || cpu_thread_terminate; }); - - if (cpu_thread_terminate) break; - - cpu_thread_done = false; - cpu_thread_should_start = false; - } - - static_cast(this)->run_worker(); - - { - std::lock_guard lock(cpu_mutex); - cpu_thread_done = true; - } - cpu_cv.notify_all(); - } -} - -template -void cpu_worker_thread_base_t::request_termination() -{ - bool should_terminate = false; - { - std::lock_guard lock(cpu_mutex); - if (cpu_thread_terminate) return; - cpu_thread_terminate = true; - should_terminate = true; - static_cast(this)->on_terminate(); - } - - if (should_terminate) { - cpu_cv.notify_one(); - join_worker(); - } -} - -template -void cpu_worker_thread_base_t::join_worker() -{ - { - std::lock_guard lock(cpu_mutex); - if (!cpu_thread_terminate) { cpu_thread_terminate = true; } - } - cpu_cv.notify_one(); - - if (cpu_worker.joinable()) { cpu_worker.join(); } -} - -template -void cpu_worker_thread_base_t::start_cpu_solver() -{ - { - std::lock_guard lock(cpu_mutex); - cpu_thread_done = false; - cpu_thread_should_start = true; - static_cast(this)->on_start(); - } - cpu_cv.notify_one(); -} - -template -bool cpu_worker_thread_base_t::wait_for_cpu_solver() -{ - auto wait_start = std::chrono::high_resolution_clock::now(); - std::unique_lock lock(cpu_mutex); - cpu_cv.wait(lock, [this] { return cpu_thread_done || cpu_thread_terminate; }); - auto wait_end = std::chrono::high_resolution_clock::now(); - double wait_time = std::chrono::duration(wait_end - wait_start).count(); - if (wait_time > 1.0) { CUOPT_LOG_DEBUG("CPU thread wait time: %.2f seconds", wait_time); } - - return static_cast(this)->get_result(); -} - -} // namespace cuopt::linear_programming::detail