Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/tvm/contrib/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ def __init__(self, mod=None):
self.target = None
self.name = None
self.dev = None
self.cpu_affinity = ""
self.idx = None
self.mod = mod
self.input_params = InferType()(mod)["main"].params
Expand Down Expand Up @@ -685,6 +686,7 @@ def get_config(self):
output_conf.append(output)

mconf["mod_idx"] = module.idx
mconf["cpu_affinity"] = module.cpu_affinity
mconf["output"] = output_conf

module_connection[mod] = {
Expand Down
63 changes: 55 additions & 8 deletions src/runtime/pipeline/pipeline_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <dmlc/json.h>
#include <tvm/runtime/ndarray.h>
#include <tvm/runtime/packed_func.h>
#include <tvm/runtime/threading_backend.h>

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -307,17 +308,28 @@ class ConfigBindings {
/*!
* \brief The binding information of all outputs of a module.
*/
class ConfigOutputBindings {
class ConfigRuntime {
public:
ConfigOutputBindings& operator=(const ConfigOutputBindings& output) {
ConfigRuntime& operator=(const ConfigRuntime& output) {
output_binding_map_ = output.GetOutBindings();
cpu_affinity_ = output.GetCPUAffinity();
return *this;
}

ConfigBindings& operator[](const int key) {
ICHECK(output_binding_map_.find(key) != output_binding_map_.end());
return output_binding_map_[key];
}
/*!
* \brief Store the CPU affinity settings.
* \param cpu_affinity The CPU affinity settings in the text form.
*/
void StoreCPUAffinity(std::string cpu_affinity) { cpu_affinity_ = cpu_affinity; }
/*!
* \brief Getting the setting of the cpu affinity.
* \param Returning the cpu affinity in text form.
*/
std::string GetCPUAffinity() const { return cpu_affinity_; }
/*!
* \brief Enumerating the output configuration.
* \param parse_function The callback function is used to parse the binding configeration.
Expand All @@ -330,7 +342,7 @@ class ConfigOutputBindings {
/*!brief Return the variable "output_binding_map_".*/
std::unordered_map<int, ConfigBindings> GetOutBindings() const { return output_binding_map_; }
/*!
*\brief This function is used to verify whether ConfigOutputBindings is successfully loaded.
*\brief This function is used to verify whether ConfigRuntime is successfully loaded.
*\return Return true to indicate that this class has not been successfully loaded.
*/
bool Empty() { return output_binding_map_.empty(); }
Expand Down Expand Up @@ -387,17 +399,28 @@ class ConfigOutputBindings {
private:
/*!\brief The map of output binding, 'int' is the output interface index.*/
std::unordered_map<int, ConfigBindings> output_binding_map_;
/*!\brief The cpu affinity setting for the tvm thread pool.*/
std::string cpu_affinity_;
};

/*!
* \brief The binding or dependency information of each module output interface.
*/
class ConfigPipelineExecution {
public:
ConfigOutputBindings& operator[](int key) {
ConfigRuntime& operator[](int key) {
ICHECK(config_.find(key) != config_.end());
return config_[key];
}
/**/
std::string GetCPUAffinity(int runtime_idx) {
auto config = config_.find(runtime_idx);
if (config == config_.end()) {
LOG(FATAL) << "Do not finding the runtime " << runtime_idx;
}
auto config_runtime = config->second;
return config_runtime.GetCPUAffinity();
}
/*!
* \brief Enumerating the binding configuration for a specified runtime.
* \param parse_function The callback function is used to parse the binding configuration.
Expand Down Expand Up @@ -439,7 +462,7 @@ class ConfigPipelineExecution {
/*
*!\brief Parsing the configuration.
*/
void ParseConfiguration(const std::unordered_map<int, ConfigOutputBindings>& config) {
void ParseConfiguration(const std::unordered_map<int, ConfigRuntime>& config) {
if (config.empty()) {
LOG(FATAL) << "The Configuration loading not finish yet.";
}
Expand All @@ -465,23 +488,28 @@ class ConfigPipelineExecution {
std::string key;
reader->BeginObject();
int mod_idx = -1;
ConfigOutputBindings output;
ConfigRuntime output;
std::string dev;
std::string cpu_affinity;
while (reader->NextObjectItem(&key)) {
if (key == "mod_idx") {
reader->Read(&mod_idx);
} else if (key == "dev") {
reader->Read(&dev);
} else if (key == "output") {
reader->Read(&output);
} else if (key == "cpu_affinity") {
reader->Read(&cpu_affinity);
} else {
LOG(FATAL) << "do not support key " << key;
}
}
ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx;
// Check if the output is successfully read.
ICHECK(!output.Empty()) << "Invalid output binding result.";
// Build the mapping of mod_idx and "ConfigOutputBindings".
// Store the cpu affinity into the 'ConfigRuntime' structure.
output.StoreCPUAffinity(cpu_affinity);
// Build the mapping of mod_idx and "ConfigRuntime".
config_[mod_idx] = output;
}
// Doing the configuration parsing after the loading finished.
Expand All @@ -493,7 +521,7 @@ class ConfigPipelineExecution {
*!\brief The key is the module index, this variable records all module pipeline configuration
* information.
*/
std::unordered_map<int, ConfigOutputBindings> config_;
std::unordered_map<int, ConfigRuntime> config_;
/*
*\brief The key is the global output index, and the map is including global outputs index and
* the module outputs pair.
Expand Down Expand Up @@ -600,6 +628,8 @@ class BackendRuntime {
private:
/*!\brief The index of runtime indicates the runtime position in the pipeline.*/
int runtime_idx_;
/*!*/
std::string cpu_affinity_ = "";
/*!\brief The Runtime module of a backend graph executor.*/
Module module_;
/*\brief The thread is associated with the current runtime*/
Expand Down Expand Up @@ -641,9 +671,11 @@ class BackendRuntime {
SetPipelineState(RUNNING);
if (runtime_idx_ == 0) {
this->CreateParentsNotify(0, GLOBAL_MODULE_INDEX, 0);
this->SetCPUAffinity();
} else {
// Only launching the worker thread for the runtimes after the first runtime.
thread_ = std::thread([&]() {
this->SetCPUAffinity();
while (!this->WaitAndLoadPipelineData()) {
if (!this->RunPipeline()) {
break;
Expand Down Expand Up @@ -819,6 +851,20 @@ class BackendRuntime {

TVMArrayCopyFromTo(from, to, nullptr);
}
/*!\brief Setting the cpu affinity for the tvm threads pool in the current BackendRuntime.*/
void SetCPUAffinity(void) {
if (cpu_affinity_.empty()) {
return;
}
auto affinity_mode = tvm::runtime::threading::ThreadGroup::kSpecifyThreadShareAllCore;
std::istringstream istr(cpu_affinity_);
std::string affinity;
std::vector<unsigned int> cpus;
while (getline(istr, affinity, ',')) {
cpus.push_back(std::stoi(affinity));
}
tvm::runtime::threading::Configure(affinity_mode, 0, cpus);
}

public:
BackendRuntime(Module mod, int mod_idx) {
Expand Down Expand Up @@ -852,6 +898,7 @@ class BackendRuntime {
*/
void InitializePipeline(ConfigPipelineExecution config,
std::vector<std::shared_ptr<BackendRuntime>>* runtimes) {
cpu_affinity_ = config.GetCPUAffinity(runtime_idx_);
// Getting the 'binding configuration' for each runtime.
config.VisitRuntimeOutputConfig(
[&](int output_idx, int child_idx, std::string child_input_name) {
Expand Down
18 changes: 18 additions & 0 deletions tests/python/relay/test_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from tvm import relay
from tvm.relay import transform
from tvm.contrib import graph_executor, pipeline_executor
from tvm._ffi import get_global_func


def get_mannual_mod():
Expand Down Expand Up @@ -79,6 +80,7 @@ def get_manual_conf(mods, target):
# is for mod2 input.
pipe_config1 = {
"mod_idx": 0,
"cpu_affinity": "0",
"output": [
{"output_idx": 0, "dependencies": [{"mod_idx": 1, "input_name": "data_0"}]},
{"output_idx": 1, "dependencies": [{"mod_idx": 2, "input_name": "data_0"}]},
Expand All @@ -97,6 +99,7 @@ def get_manual_conf(mods, target):

pipe_config2 = {
"mod_idx": 1,
"cpu_affinity": "0",
"output": [
{"output_idx": 0, "dependencies": [{"mod_idx": 2, "input_name": "data_1"}]},
],
Expand All @@ -113,6 +116,7 @@ def get_manual_conf(mods, target):

pipe_config3 = {
"mod_idx": 2,
"cpu_affinity": "0",
"output": [{"output_idx": 0, "dependencies": [{"global_output_index": 1}]}],
}
mod_config[mods[2]] = {
Expand Down Expand Up @@ -203,6 +207,13 @@ def run_modules(
return final_output


def reset_cpu_affinity(affinity):
# Restore the CPU affinity into the default value.
config_threadpool = get_global_func("runtime.config_threadpool")
config_threadpool(-2, 0)
os.sched_setaffinity(0, affinity)


def test_pipe_runtime_error_check():
# This function is used to trigger runtime error by applying wrong logic.
if pipeline_executor.pipeline_executor_enabled():
Expand Down Expand Up @@ -266,6 +277,7 @@ def test_pipeline():
if pipeline_executor.pipeline_executor_enabled():
target_list = tvm.testing.enabled_targets()
for target in target_list:
affinity = os.sched_getaffinity(0)
# Get the three pipeline modules here.
(mod1, mod2, mod3), dshape = get_mannual_mod()

Expand Down Expand Up @@ -320,12 +332,15 @@ def test_pipeline():
# Set other parameters.
pipe_config[mod1].target = target[0]
pipe_config[mod1].dev = target[1]
pipe_config[mod1].cpu_affinity = "0"

pipe_config[mod2].target = "llvm"
pipe_config[mod2].dev = tvm.cpu(0)
pipe_config[mod2].cpu_affinity = "0"

pipe_config[mod3].target = "llvm"
pipe_config[mod3].dev = tvm.cpu(0)
pipe_config[mod3].cpu_affinity = "0"
# Checking the configuration of modules dependency.
mconfig = pipe_config.get_config()
assert mconfig["module_connection"] == get_manual_conf([mod1, mod2, mod3], target)
Expand Down Expand Up @@ -404,6 +419,9 @@ def test_pipeline():
assert statistic_time < 10
time.sleep(1)

# Reset the cpu affinity after a test.
reset_cpu_affinity(affinity)


if __name__ == "__main__":
pytest.main([__file__])