From 18a27347010f550472febe2eb6d1d1a94307241d Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 15 Mar 2022 16:53:44 -0700 Subject: [PATCH] [Runtime][PipelineExecutor] Setting CPU affinity for the Runtime of pipeline. This patch add the function to set cpu affinity for the runtime of the pipeline. By using the said function, user for example can let the first runtime of the pipeline to use the cpu [0,1] only, and the second runtime to use the cpu [2, 3] only. --- python/tvm/contrib/pipeline_executor.py | 2 + src/runtime/pipeline/pipeline_struct.h | 63 +++++++++++++++++--- tests/python/relay/test_pipeline_executor.py | 18 ++++++ 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index bdf6019bc6c8..b4a853a4ec10 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -510,6 +510,7 @@ def __init__(self, mod=None): self.target = None self.name = None self.dev = None + self.cpu_affinity = "" self.idx = None self.mod = mod self.input_params = InferType()(mod)["main"].params @@ -685,6 +686,7 @@ def get_config(self): output_conf.append(output) mconf["mod_idx"] = module.idx + mconf["cpu_affinity"] = module.cpu_affinity mconf["output"] = output_conf module_connection[mod] = { diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 834a84933e44..beb4f425e93a 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -307,10 +308,11 @@ class ConfigBindings { /*! * \brief The binding information of all outputs of a module. */ -class ConfigOutputBindings { +class ConfigRuntime { public: - ConfigOutputBindings& operator=(const ConfigOutputBindings& output) { + ConfigRuntime& operator=(const ConfigRuntime& output) { output_binding_map_ = output.GetOutBindings(); + cpu_affinity_ = output.GetCPUAffinity(); return *this; } @@ -318,6 +320,16 @@ class ConfigOutputBindings { ICHECK(output_binding_map_.find(key) != output_binding_map_.end()); return output_binding_map_[key]; } + /*! + * \brief Store the CPU affinity settings. + * \param cpu_affinity The CPU affinity settings in the text form. + */ + void StoreCPUAffinity(std::string cpu_affinity) { cpu_affinity_ = cpu_affinity; } + /*! + * \brief Getting the setting of the cpu affinity. + * \param Returning the cpu affinity in text form. + */ + std::string GetCPUAffinity() const { return cpu_affinity_; } /*! * \brief Enumerating the output configuration. * \param parse_function The callback function is used to parse the binding configeration. @@ -330,7 +342,7 @@ class ConfigOutputBindings { /*!brief Return the variable "output_binding_map_".*/ std::unordered_map GetOutBindings() const { return output_binding_map_; } /*! - *\brief This function is used to verify whether ConfigOutputBindings is successfully loaded. + *\brief This function is used to verify whether ConfigRuntime is successfully loaded. *\return Return true to indicate that this class has not been successfully loaded. */ bool Empty() { return output_binding_map_.empty(); } @@ -387,6 +399,8 @@ class ConfigOutputBindings { private: /*!\brief The map of output binding, 'int' is the output interface index.*/ std::unordered_map output_binding_map_; + /*!\brief The cpu affinity setting for the tvm thread pool.*/ + std::string cpu_affinity_; }; /*! @@ -394,10 +408,19 @@ class ConfigOutputBindings { */ class ConfigPipelineExecution { public: - ConfigOutputBindings& operator[](int key) { + ConfigRuntime& operator[](int key) { ICHECK(config_.find(key) != config_.end()); return config_[key]; } + /**/ + std::string GetCPUAffinity(int runtime_idx) { + auto config = config_.find(runtime_idx); + if (config == config_.end()) { + LOG(FATAL) << "Do not finding the runtime " << runtime_idx; + } + auto config_runtime = config->second; + return config_runtime.GetCPUAffinity(); + } /*! * \brief Enumerating the binding configuration for a specified runtime. * \param parse_function The callback function is used to parse the binding configuration. @@ -439,7 +462,7 @@ class ConfigPipelineExecution { /* *!\brief Parsing the configuration. */ - void ParseConfiguration(const std::unordered_map& config) { + void ParseConfiguration(const std::unordered_map& config) { if (config.empty()) { LOG(FATAL) << "The Configuration loading not finish yet."; } @@ -465,8 +488,9 @@ class ConfigPipelineExecution { std::string key; reader->BeginObject(); int mod_idx = -1; - ConfigOutputBindings output; + ConfigRuntime output; std::string dev; + std::string cpu_affinity; while (reader->NextObjectItem(&key)) { if (key == "mod_idx") { reader->Read(&mod_idx); @@ -474,6 +498,8 @@ class ConfigPipelineExecution { reader->Read(&dev); } else if (key == "output") { reader->Read(&output); + } else if (key == "cpu_affinity") { + reader->Read(&cpu_affinity); } else { LOG(FATAL) << "do not support key " << key; } @@ -481,7 +507,9 @@ class ConfigPipelineExecution { ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; // Check if the output is successfully read. ICHECK(!output.Empty()) << "Invalid output binding result."; - // Build the mapping of mod_idx and "ConfigOutputBindings". + // Store the cpu affinity into the 'ConfigRuntime' structure. + output.StoreCPUAffinity(cpu_affinity); + // Build the mapping of mod_idx and "ConfigRuntime". config_[mod_idx] = output; } // Doing the configuration parsing after the loading finished. @@ -493,7 +521,7 @@ class ConfigPipelineExecution { *!\brief The key is the module index, this variable records all module pipeline configuration * information. */ - std::unordered_map config_; + std::unordered_map config_; /* *\brief The key is the global output index, and the map is including global outputs index and * the module outputs pair. @@ -600,6 +628,8 @@ class BackendRuntime { private: /*!\brief The index of runtime indicates the runtime position in the pipeline.*/ int runtime_idx_; + /*!*/ + std::string cpu_affinity_ = ""; /*!\brief The Runtime module of a backend graph executor.*/ Module module_; /*\brief The thread is associated with the current runtime*/ @@ -641,9 +671,11 @@ class BackendRuntime { SetPipelineState(RUNNING); if (runtime_idx_ == 0) { this->CreateParentsNotify(0, GLOBAL_MODULE_INDEX, 0); + this->SetCPUAffinity(); } else { // Only launching the worker thread for the runtimes after the first runtime. thread_ = std::thread([&]() { + this->SetCPUAffinity(); while (!this->WaitAndLoadPipelineData()) { if (!this->RunPipeline()) { break; @@ -819,6 +851,20 @@ class BackendRuntime { TVMArrayCopyFromTo(from, to, nullptr); } + /*!\brief Setting the cpu affinity for the tvm threads pool in the current BackendRuntime.*/ + void SetCPUAffinity(void) { + if (cpu_affinity_.empty()) { + return; + } + auto affinity_mode = tvm::runtime::threading::ThreadGroup::kSpecifyThreadShareAllCore; + std::istringstream istr(cpu_affinity_); + std::string affinity; + std::vector cpus; + while (getline(istr, affinity, ',')) { + cpus.push_back(std::stoi(affinity)); + } + tvm::runtime::threading::Configure(affinity_mode, 0, cpus); + } public: BackendRuntime(Module mod, int mod_idx) { @@ -852,6 +898,7 @@ class BackendRuntime { */ void InitializePipeline(ConfigPipelineExecution config, std::vector>* runtimes) { + cpu_affinity_ = config.GetCPUAffinity(runtime_idx_); // Getting the 'binding configuration' for each runtime. config.VisitRuntimeOutputConfig( [&](int output_idx, int child_idx, std::string child_input_name) { diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index ff30c2affe47..099be056e62c 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -24,6 +24,7 @@ from tvm import relay from tvm.relay import transform from tvm.contrib import graph_executor, pipeline_executor +from tvm._ffi import get_global_func def get_mannual_mod(): @@ -79,6 +80,7 @@ def get_manual_conf(mods, target): # is for mod2 input. pipe_config1 = { "mod_idx": 0, + "cpu_affinity": "0", "output": [ {"output_idx": 0, "dependencies": [{"mod_idx": 1, "input_name": "data_0"}]}, {"output_idx": 1, "dependencies": [{"mod_idx": 2, "input_name": "data_0"}]}, @@ -97,6 +99,7 @@ def get_manual_conf(mods, target): pipe_config2 = { "mod_idx": 1, + "cpu_affinity": "0", "output": [ {"output_idx": 0, "dependencies": [{"mod_idx": 2, "input_name": "data_1"}]}, ], @@ -113,6 +116,7 @@ def get_manual_conf(mods, target): pipe_config3 = { "mod_idx": 2, + "cpu_affinity": "0", "output": [{"output_idx": 0, "dependencies": [{"global_output_index": 1}]}], } mod_config[mods[2]] = { @@ -203,6 +207,13 @@ def run_modules( return final_output +def reset_cpu_affinity(affinity): + # Restore the CPU affinity into the default value. + config_threadpool = get_global_func("runtime.config_threadpool") + config_threadpool(-2, 0) + os.sched_setaffinity(0, affinity) + + def test_pipe_runtime_error_check(): # This function is used to trigger runtime error by applying wrong logic. if pipeline_executor.pipeline_executor_enabled(): @@ -266,6 +277,7 @@ def test_pipeline(): if pipeline_executor.pipeline_executor_enabled(): target_list = tvm.testing.enabled_targets() for target in target_list: + affinity = os.sched_getaffinity(0) # Get the three pipeline modules here. (mod1, mod2, mod3), dshape = get_mannual_mod() @@ -320,12 +332,15 @@ def test_pipeline(): # Set other parameters. pipe_config[mod1].target = target[0] pipe_config[mod1].dev = target[1] + pipe_config[mod1].cpu_affinity = "0" pipe_config[mod2].target = "llvm" pipe_config[mod2].dev = tvm.cpu(0) + pipe_config[mod2].cpu_affinity = "0" pipe_config[mod3].target = "llvm" pipe_config[mod3].dev = tvm.cpu(0) + pipe_config[mod3].cpu_affinity = "0" # Checking the configuration of modules dependency. mconfig = pipe_config.get_config() assert mconfig["module_connection"] == get_manual_conf([mod1, mod2, mod3], target) @@ -404,6 +419,9 @@ def test_pipeline(): assert statistic_time < 10 time.sleep(1) + # Reset the cpu affinity after a test. + reset_cpu_affinity(affinity) + if __name__ == "__main__": pytest.main([__file__])