diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index bdf6019bc6c8..b4a853a4ec10 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -510,6 +510,7 @@ def __init__(self, mod=None): self.target = None self.name = None self.dev = None + self.cpu_affinity = "" self.idx = None self.mod = mod self.input_params = InferType()(mod)["main"].params @@ -685,6 +686,7 @@ def get_config(self): output_conf.append(output) mconf["mod_idx"] = module.idx + mconf["cpu_affinity"] = module.cpu_affinity mconf["output"] = output_conf module_connection[mod] = { diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 834a84933e44..beb4f425e93a 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -307,10 +308,11 @@ class ConfigBindings { /*! * \brief The binding information of all outputs of a module. */ -class ConfigOutputBindings { +class ConfigRuntime { public: - ConfigOutputBindings& operator=(const ConfigOutputBindings& output) { + ConfigRuntime& operator=(const ConfigRuntime& output) { output_binding_map_ = output.GetOutBindings(); + cpu_affinity_ = output.GetCPUAffinity(); return *this; } @@ -318,6 +320,16 @@ class ConfigOutputBindings { ICHECK(output_binding_map_.find(key) != output_binding_map_.end()); return output_binding_map_[key]; } + /*! + * \brief Store the CPU affinity settings. + * \param cpu_affinity The CPU affinity settings in the text form. + */ + void StoreCPUAffinity(std::string cpu_affinity) { cpu_affinity_ = cpu_affinity; } + /*! + * \brief Getting the setting of the cpu affinity. + * \param Returning the cpu affinity in text form. + */ + std::string GetCPUAffinity() const { return cpu_affinity_; } /*! * \brief Enumerating the output configuration. * \param parse_function The callback function is used to parse the binding configeration. @@ -330,7 +342,7 @@ class ConfigOutputBindings { /*!brief Return the variable "output_binding_map_".*/ std::unordered_map GetOutBindings() const { return output_binding_map_; } /*! - *\brief This function is used to verify whether ConfigOutputBindings is successfully loaded. + *\brief This function is used to verify whether ConfigRuntime is successfully loaded. *\return Return true to indicate that this class has not been successfully loaded. */ bool Empty() { return output_binding_map_.empty(); } @@ -387,6 +399,8 @@ class ConfigOutputBindings { private: /*!\brief The map of output binding, 'int' is the output interface index.*/ std::unordered_map output_binding_map_; + /*!\brief The cpu affinity setting for the tvm thread pool.*/ + std::string cpu_affinity_; }; /*! @@ -394,10 +408,19 @@ class ConfigOutputBindings { */ class ConfigPipelineExecution { public: - ConfigOutputBindings& operator[](int key) { + ConfigRuntime& operator[](int key) { ICHECK(config_.find(key) != config_.end()); return config_[key]; } + /**/ + std::string GetCPUAffinity(int runtime_idx) { + auto config = config_.find(runtime_idx); + if (config == config_.end()) { + LOG(FATAL) << "Do not finding the runtime " << runtime_idx; + } + auto config_runtime = config->second; + return config_runtime.GetCPUAffinity(); + } /*! * \brief Enumerating the binding configuration for a specified runtime. * \param parse_function The callback function is used to parse the binding configuration. @@ -439,7 +462,7 @@ class ConfigPipelineExecution { /* *!\brief Parsing the configuration. */ - void ParseConfiguration(const std::unordered_map& config) { + void ParseConfiguration(const std::unordered_map& config) { if (config.empty()) { LOG(FATAL) << "The Configuration loading not finish yet."; } @@ -465,8 +488,9 @@ class ConfigPipelineExecution { std::string key; reader->BeginObject(); int mod_idx = -1; - ConfigOutputBindings output; + ConfigRuntime output; std::string dev; + std::string cpu_affinity; while (reader->NextObjectItem(&key)) { if (key == "mod_idx") { reader->Read(&mod_idx); @@ -474,6 +498,8 @@ class ConfigPipelineExecution { reader->Read(&dev); } else if (key == "output") { reader->Read(&output); + } else if (key == "cpu_affinity") { + reader->Read(&cpu_affinity); } else { LOG(FATAL) << "do not support key " << key; } @@ -481,7 +507,9 @@ class ConfigPipelineExecution { ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; // Check if the output is successfully read. ICHECK(!output.Empty()) << "Invalid output binding result."; - // Build the mapping of mod_idx and "ConfigOutputBindings". + // Store the cpu affinity into the 'ConfigRuntime' structure. + output.StoreCPUAffinity(cpu_affinity); + // Build the mapping of mod_idx and "ConfigRuntime". config_[mod_idx] = output; } // Doing the configuration parsing after the loading finished. @@ -493,7 +521,7 @@ class ConfigPipelineExecution { *!\brief The key is the module index, this variable records all module pipeline configuration * information. */ - std::unordered_map config_; + std::unordered_map config_; /* *\brief The key is the global output index, and the map is including global outputs index and * the module outputs pair. @@ -600,6 +628,8 @@ class BackendRuntime { private: /*!\brief The index of runtime indicates the runtime position in the pipeline.*/ int runtime_idx_; + /*!*/ + std::string cpu_affinity_ = ""; /*!\brief The Runtime module of a backend graph executor.*/ Module module_; /*\brief The thread is associated with the current runtime*/ @@ -641,9 +671,11 @@ class BackendRuntime { SetPipelineState(RUNNING); if (runtime_idx_ == 0) { this->CreateParentsNotify(0, GLOBAL_MODULE_INDEX, 0); + this->SetCPUAffinity(); } else { // Only launching the worker thread for the runtimes after the first runtime. thread_ = std::thread([&]() { + this->SetCPUAffinity(); while (!this->WaitAndLoadPipelineData()) { if (!this->RunPipeline()) { break; @@ -819,6 +851,20 @@ class BackendRuntime { TVMArrayCopyFromTo(from, to, nullptr); } + /*!\brief Setting the cpu affinity for the tvm threads pool in the current BackendRuntime.*/ + void SetCPUAffinity(void) { + if (cpu_affinity_.empty()) { + return; + } + auto affinity_mode = tvm::runtime::threading::ThreadGroup::kSpecifyThreadShareAllCore; + std::istringstream istr(cpu_affinity_); + std::string affinity; + std::vector cpus; + while (getline(istr, affinity, ',')) { + cpus.push_back(std::stoi(affinity)); + } + tvm::runtime::threading::Configure(affinity_mode, 0, cpus); + } public: BackendRuntime(Module mod, int mod_idx) { @@ -852,6 +898,7 @@ class BackendRuntime { */ void InitializePipeline(ConfigPipelineExecution config, std::vector>* runtimes) { + cpu_affinity_ = config.GetCPUAffinity(runtime_idx_); // Getting the 'binding configuration' for each runtime. config.VisitRuntimeOutputConfig( [&](int output_idx, int child_idx, std::string child_input_name) { diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index ff30c2affe47..099be056e62c 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -24,6 +24,7 @@ from tvm import relay from tvm.relay import transform from tvm.contrib import graph_executor, pipeline_executor +from tvm._ffi import get_global_func def get_mannual_mod(): @@ -79,6 +80,7 @@ def get_manual_conf(mods, target): # is for mod2 input. pipe_config1 = { "mod_idx": 0, + "cpu_affinity": "0", "output": [ {"output_idx": 0, "dependencies": [{"mod_idx": 1, "input_name": "data_0"}]}, {"output_idx": 1, "dependencies": [{"mod_idx": 2, "input_name": "data_0"}]}, @@ -97,6 +99,7 @@ def get_manual_conf(mods, target): pipe_config2 = { "mod_idx": 1, + "cpu_affinity": "0", "output": [ {"output_idx": 0, "dependencies": [{"mod_idx": 2, "input_name": "data_1"}]}, ], @@ -113,6 +116,7 @@ def get_manual_conf(mods, target): pipe_config3 = { "mod_idx": 2, + "cpu_affinity": "0", "output": [{"output_idx": 0, "dependencies": [{"global_output_index": 1}]}], } mod_config[mods[2]] = { @@ -203,6 +207,13 @@ def run_modules( return final_output +def reset_cpu_affinity(affinity): + # Restore the CPU affinity into the default value. + config_threadpool = get_global_func("runtime.config_threadpool") + config_threadpool(-2, 0) + os.sched_setaffinity(0, affinity) + + def test_pipe_runtime_error_check(): # This function is used to trigger runtime error by applying wrong logic. if pipeline_executor.pipeline_executor_enabled(): @@ -266,6 +277,7 @@ def test_pipeline(): if pipeline_executor.pipeline_executor_enabled(): target_list = tvm.testing.enabled_targets() for target in target_list: + affinity = os.sched_getaffinity(0) # Get the three pipeline modules here. (mod1, mod2, mod3), dshape = get_mannual_mod() @@ -320,12 +332,15 @@ def test_pipeline(): # Set other parameters. pipe_config[mod1].target = target[0] pipe_config[mod1].dev = target[1] + pipe_config[mod1].cpu_affinity = "0" pipe_config[mod2].target = "llvm" pipe_config[mod2].dev = tvm.cpu(0) + pipe_config[mod2].cpu_affinity = "0" pipe_config[mod3].target = "llvm" pipe_config[mod3].dev = tvm.cpu(0) + pipe_config[mod3].cpu_affinity = "0" # Checking the configuration of modules dependency. mconfig = pipe_config.get_config() assert mconfig["module_connection"] == get_manual_conf([mod1, mod2, mod3], target) @@ -404,6 +419,9 @@ def test_pipeline(): assert statistic_time < 10 time.sleep(1) + # Reset the cpu affinity after a test. + reset_cpu_affinity(affinity) + if __name__ == "__main__": pytest.main([__file__])