From b0cfb5c4aa87cd1bf727e217c8cd954734ef32d4 Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 19 Oct 2021 12:58:20 -0700 Subject: [PATCH 1/5] [Runtime] Pipeline Executor Add Set and Get Input/Output interfaces. 1. Add "param" connection into pipeline config to support such case like set params into module 1 by a param name "param0" 2. Add using input name to locate backend runtime index and input index implemention. 3. Add interface like run/stop/set_input/get_output etc. 4. Add a implemention of serialized pipeline backend runtime execution for the purpose to test all the said interface. --- python/tvm/contrib/pipeline_executor.py | 208 +++++++++- src/runtime/pipeline/pipeline_executor.cc | 131 +++++- src/runtime/pipeline/pipeline_executor.h | 106 +++-- src/runtime/pipeline/pipeline_scheduler.cc | 94 ++++- src/runtime/pipeline/pipeline_scheduler.h | 36 +- src/runtime/pipeline/pipeline_struct.h | 413 +++++++++++++++++-- tests/python/relay/test_pipeline_executor.py | 124 +++++- 7 files changed, 1025 insertions(+), 87 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 37b9fed8eb91..d8e3b0bfd33f 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -17,6 +17,7 @@ """Pipeline executor that executes a series of modules in a pipeline fashion.""" import json import os +import numpy as np import tvm._ffi from tvm import relay from tvm.relay.transform import InferType @@ -49,12 +50,17 @@ def build(pipe_configs): Common interface for pipeline executor factory modules. """ libs = {} - mod_n_configs = pipe_configs.get_config() + config = pipe_configs.get_config() + if "module_connection" not in config: + raise RuntimeError('"module_connection" is missing') + + mod_n_configs = config["module_connection"] config_len = len(mod_n_configs) - string_config = [{} for _ in range(config_len)] + module_string_config = [{} for _ in range(config_len)] + # Build the backend modules then create the config of the connections in string form. for ir_mod, mod_config in mod_n_configs.items(): - mconf = mod_config["pipeline"].copy() - mod_idx = mconf["mod_idx"] + pipe_config = mod_config["pipeline"].copy() + mod_idx = pipe_config["mod_idx"] dev = mod_config["dev"] target = mod_config["target"] build_func = relay.build @@ -70,11 +76,20 @@ def build(pipe_configs): mod_name=mod_config["mod_name"], ) - mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id) + pipe_config["dev"] = "{},{}".format(dev.device_type, dev.device_id) # Create a pipeline configuration. - string_config[mod_idx] = mconf + module_string_config[mod_idx] = pipe_config libs[mod_idx] = {"lib": lib, "dev": dev} + # Merge the "input_connection", the "param_connection" and the "module_connection" into one + # configuration. + string_config = {} + if "input_connection" not in config: + raise RuntimeError('"input_connection" is missing') + string_config["input_connection"] = config["input_connection"] + string_config["param_connection"] = config["param_connection"] + string_config["module_connection"] = module_string_config + return PipelineExecutorFactoryModule(libs, string_config) @@ -93,8 +108,71 @@ def __init__(self, module): else: self.module = module # Get the packed functions from the pipeline executor. + self._run = self.module["run"] + self._stop = self.module["stop"] + self._set_input = self.module["set_input"] + self._set_param = self.module["set_param"] + self._get_input = self.module["get_input"] + self._get_output = self.module["get_output"] + self._get_num_inputs = self.module["get_num_inputs"] self._get_num_outputs = self.module["get_num_outputs"] + def run(self, sync=False): + """Run the pipeline executor.""" + self._run(sync) + + def stop(self): + """Stop the pipeline executor.""" + self._stop() + + def set_input(self, key, value): + """Set inputs to the module via "value". + Parameters + ---------- + key : str + The input key + + value : array_like. + The input value + """ + self._set_input(key, tvm.nd.array(value, tvm.cpu())) + + def set_params(self, params_name, params_data): + """Set params to the module via param name and params data. + Parameters + ---------- + params_name : str + The params name + + params_data : dict of str to NDArray + A list of params data and params key name. + """ + if params_data: + keys = list(params_data.keys()) + keys.sort(key=lambda x: -np.prod(params_data[x].shape)) + for k in keys: + self._set_param(params_name, k, tvm.nd.array(params_data[k], tvm.cpu())) + + def get_input(self, key): + """Get the input via a input name. + Parameters + ---------- + key : str + The input key + """ + self._get_input(key) + + def get_output(self): + """Get the output. + + Parameters: + ----------- + out : Array[NDArray] + A list of output data. + """ + + return self._get_output() + @property def num_outputs(self): """Get the number of outputs. @@ -105,6 +183,16 @@ def num_outputs(self): """ return self._get_num_outputs() + @property + def num_inputs(self): + """Get the number of inputs. + Returns + ------- + count : int + The number of inputs. + """ + return self._get_num_inputs() + @staticmethod def load_library(config_file_name): """Import files to create a pipeline executor. @@ -154,7 +242,7 @@ class Binding: The class who owns this interface. io_type : str - The I/O type of this interface. It can only be "input" or "output". + The I/O type of this interface. It can only be "input" or "output" or "param". name : str/integer Name, for input it is string such as "data0", for output it is an integer such as 0. @@ -199,12 +287,21 @@ def is_pipeline_executor_interface(self): return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper) def __repr__(self): - # Get all binding information. - ret = " |{}: ".format(self.name) + # Get all binding information in the form of string. + ret, _ = self.format() + return ret + + def format(self): + """Obtain binding information in the form of string and dictionary.""" + str_format = " |{}: ".format(self.name) + dict_format = {"interface_name": self.name, "connection": []} for binding in self.bindings: mname, dname = binding.get_name() - ret += "{0}:{1} ".format(mname, dname) - return ret + midx = binding.get_owner_idx() + dict_format["connection"].append({"mod_idx": midx, "interface_name": dname}) + str_format += "{0}:{1} ".format(mname, dname) + + return str_format, dict_format def check_dag_acyclic(self, start, inputs): """This is to check whether the DAG containing these input interfaces is acyclic. @@ -245,6 +342,15 @@ def connect(self, binding): if self.io_owner == binding.io_owner: raise RuntimeError(f"Can not bind itself.") + if self.io_type == "param" and not self.is_pipeline_executor_interface(): + raise RuntimeError(f'Only a pipeline executor can do "param" binding!') + + if self.io_type == "param" and binding.io_type != "param": + raise RuntimeError( + f"A global param interface can only bind with a module \ + param interface!" + ) + if not self.is_pipeline_executor_interface() and self.io_type == "input": raise RuntimeError(f"Module can only bind from output interface!") @@ -265,7 +371,11 @@ def connect(self, binding): if self.is_pipeline_executor_interface() and self.io_type == "output": raise RuntimeError(f"Global output can not be used as binding start point.") - if self.is_pipeline_executor_interface() and binding.io_type != "input": + if ( + self.is_pipeline_executor_interface() + and self.io_type == "input" + and binding.io_type != "input" + ): raise RuntimeError(f"Global input can only bind with module input.") self.bindings.append(binding) @@ -342,6 +452,7 @@ def __init__(self, mod=None): self.output_type = InferType()(mod)["main"].checked_type.ret_type self.input_bindings = PipelineConfig.BindingList(self, "input") self.output_bindings = PipelineConfig.BindingList(self, "output") + self.param_binding = PipelineConfig.Binding(self, "param", "param") def __eq__(self, other): if isinstance(other, PipelineConfig.ModuleWrapper): @@ -353,11 +464,13 @@ def __getitem__(self, key): if isinstance(key, str): if key == "input": return self.input_bindings - if key == "output": return self.output_bindings + if key == "param": + return self.param_binding + raise RuntimeError(f"{key} not found!") - raise RuntimeError(f"{key} not found!") + raise RuntimeError(f'The data type of "key" is not supported!') def get_data_type(self, key, interface_type): """Get the module interface data type according to the key value and interface type. @@ -411,14 +524,22 @@ def __init__(self): self.mod_wrapper = {} self.input_bindings = self.BindingList(self, "input") self.output_bindings = self.BindingList(self, "output") + # The mapping of global parameters and module parameters. + self.param_bindings = self.BindingList(self, "param") def __str__(self): # Get configuration information as a string. # Use topological sort to get correct module order. self.dag_topology_sort() + + # Get param dependencies. + param_dump = "Params\n" + for param_name in self.param_bindings.bindings: + inf = self.param_bindings.bindings[param_name] + param_dump += str(inf) + "\n" # Get the input dependencies. - input_dump = "Inputs\n" + input_dump = "\nInputs\n" for input_name in self.input_bindings.bindings: inf = self.input_bindings.bindings[input_name] input_dump += str(inf) + "\n" @@ -444,7 +565,7 @@ def __str__(self): for name in sorted(output.keys()): output_dump += f" |output({name}) : {output[name]}\n" - return input_dump + output_dump + connections_dump + return param_dump + input_dump + output_dump + connections_dump def __getitem__(self, key): if isinstance(key, tvm.ir.module.IRModule): @@ -457,6 +578,8 @@ def __getitem__(self, key): return self.input_bindings if key == "output": return self.output_bindings + if key == "param": + return self.param_bindings raise RuntimeError(f"{key} not found.") @@ -465,9 +588,24 @@ def get_config(self): will be used to create pipeline executor. """ + def check_data_param(data_dict): + if "interface_name" not in data_dict: + raise RuntimeError(f'"inteface_name" is missing in global config!"') + if "connection" not in data_dict: + raise RuntimeError(f'"connection" is missing!"') + # The global interface mapping should be one-to-one. + if len(data_dict["connection"]) == 0: + raise RuntimeError(f"The global interface map is empty!") + if len(data_dict["connection"]) > 1: + raise RuntimeError(f"A global interface maps multiple module interfaces!") + if "mod_idx" not in data_dict["connection"][0]: + raise RuntimeError(f'"mod_idx" is missing!') + # Use topological sort to get the correct order of modules. self.dag_topology_sort() mconfig = {} + module_connection = {} + input_connection = {} for mod in self.mod_wrapper: # Generate pipeline configuration. mconf = {} @@ -495,7 +633,7 @@ def get_config(self): mconf["mod_idx"] = module.idx mconf["output"] = output_conf - mconfig[mod] = { + module_connection[mod] = { "pipeline": mconf, "target_host": module.target_host, "mod_name": "default", @@ -505,6 +643,36 @@ def get_config(self): "dev": module.dev, } + # Create a mapping of global input and module input. + input_connection = [] + for input_name in self.input_bindings.bindings: + _, input_dict = self.input_bindings.bindings[input_name].format() + # Check the correctness of "input_dict". + check_data_param(input_dict) + if "interface_name" not in input_dict["connection"][0]: + raise RuntimeError(f'"interface_name is missing in connection config"!') + # Establish the mapping of global interface and the mapping of module interfaces. + input_map = { + "global_interface_name": input_dict["interface_name"], + "mod_idx": input_dict["connection"][0]["mod_idx"], + "module_interface_name": input_dict["connection"][0]["interface_name"], + } + input_connection.append(input_map) + + # Create a mapping of global param and module param. + param_connection = [] + for param_name in self.param_bindings.bindings: + _, param_dict = self.param_bindings.bindings[param_name].format() + check_data_param(param_dict) + param_map = { + "global_param_name": param_dict["interface_name"], + "mod_idx": param_dict["connection"][0]["mod_idx"], + } + param_connection.append(param_map) + + mconfig["module_connection"] = module_connection + mconfig["input_connection"] = input_connection + mconfig["param_connection"] = param_connection return mconfig def dag_topology_sort(self): @@ -627,13 +795,13 @@ def export_library(self, directory_path): ) # Get the graph, lib, and parameters from GraphExecutorFactoryModule. - graph, lib, params = self.pipeline_mods[lib_index]["lib"] + lib = self.pipeline_mods[lib_index]["lib"] # Export the lib, graph, and parameters to disk. lib.export_library(mconfig["lib_name"]) with open(mconfig["json_name"], "w") as file_handle: - file_handle.write(graph) + file_handle.write(lib.graph_json) with open(mconfig["params_name"], "wb") as file_handle: - file_handle.write(relay.save_param_dict(params)) + file_handle.write(relay.save_param_dict(lib.params)) load_config.append(mconfig) diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 3820ce942af0..816a90f4cf0a 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -21,6 +21,8 @@ * \file pipeline_executor.cc */ #include "pipeline_executor.h" + +#include namespace tvm { namespace runtime { /*! @@ -34,13 +36,134 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, if (name == "get_num_outputs") { return PackedFunc( [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumOutputs(); }); + } else if (name == "get_num_inputs") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumInputs(); }); + } else if (name == "set_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0])) { + this->SetInput(args[0].operator String(), args[1]); + } else { + LOG(FATAL) << "Function only support the input name value in the form of string"; + } + }); + } else if (name == "set_param") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0]) && String::CanConvertFrom(args[1])) { + this->SetParam(args[0].operator String(), args[1].operator String(), args[2]); + } else { + LOG(FATAL) << "Function only support the params name and keyin the form of string"; + } + }); + } else if (name == "get_output") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); }); + } else if (name == "get_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0])) { + *rv = this->GetInput(args[0].operator String()); + } else { + LOG(FATAL) << "Function only support the input name value in the form of string"; + } + }); + } else if (name == "run") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); + } else if (name == "stop") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); }); } else { LOG(FATAL) << "Unknown packed function: " << name; return PackedFunc(); } return nullptr; } +/*! + * \brief There are some input called pipeline global input that user need to use function + "set_input" to set the data for it, this function return the number of such global input. + \return Return the number of pipeline global input. + */ + +int PipelineExecutor::NumInputs() const { + // The number of inputs obtained from the input configuration. + size_t config_inputs_num = input_connection_config.size(), ret = 0; + // The number of inputs obtained from the graph runtime and pipeline configuration. + size_t internal_inputs_num = pipeline_config_.GetInputOutputBindingNum(); + for (auto runtime : runtimes_) { + ret += runtime->NumInputs(); + } + // Use the summary of all backend runtime module input number to minus the internal inputs + // number, then we will get the pipeline global input number + ret -= internal_inputs_num; + // Check whether these two numbers are equal. + if (config_inputs_num != ret) { + LOG(FATAL) << "The number of inputs from the configuration file is inconsistent!"; + } + return ret; +} +/*! + * \brief Return the input index and module index for a given input name. + * \param name The input name. + * \return std::pair The module index and the input index. + */ +std::pair PipelineExecutor::GetInputIndex(const std::string& name) { + std::pair index = input_connection_config[name]; + auto gruntime = runtimes_[index.first]; + return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); +} +/*! + * \brief Return the module index for a given input param name. + * \param name The params name. + * \return int The module index. + */ +int PipelineExecutor::GetParamModuleIndex(const std::string& name) { + return param_connection_config[name]; +} +/*! + * \brief set input to the graph module. + * \param input_name The input name. + * \param data_in The input data. + */ +void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { + std::pair indexs = this->GetInputIndex(input_name); + runtimes_[indexs.first]->SetInput(indexs.second, data_in); +} +/*! + * \brief get input from the graph module. + * \param input_name The input name. + * \return Return the input data for a specific input name. + */ +NDArray PipelineExecutor::GetInput(std::string input_name) { + std::pair indexs = this->GetInputIndex(input_name); + return runtimes_[indexs.first]->GetInput(indexs.second); +} +/*! + * \brief set param to a graph module. + * \param input_name The input name. + * \param data_in The input data. + */ +void PipelineExecutor::SetParam(std::string param_name, std::string param_key_name, + DLTensor* data_in) { + // Get the module index from the param name. + auto runtime = runtimes_[this->GetParamModuleIndex(param_name)]; + // Get the param index from the param key name + int index = runtime->GetInputIndex(param_key_name); + runtime->SetInput(index, data_in); +} +/*! + * \brief Run the pipeline executor. + * \param serialized_mode Whether run the pipeline executor in serialized mode. + */ +void PipelineExecutor::Run(bool serialized_mode) { + pipeline_scheduler_.PipelineRun(runtimes_, pipeline_config_, serialized_mode); +} +/*! + * \brief Stop the pipeline executor. + */ +void PipelineExecutor::Stop() { pipeline_scheduler_.PipelineStop(); } +/*! + * \brief return A list of pipeline global output data. + */ +Array PipelineExecutor::GetOutput(void) { return pipeline_scheduler_.PipelineGetOutput(); } /*! * \brief Use the mod_config information to create a graph runtime list. * \param mod_config The config information that generates by the export library function call. @@ -96,7 +219,6 @@ std::vector PipelineExecutor::CreateGraphModules(const ModuleConfig& mod } return ret; } - /*! * \brief Initialize the pipeline executor with a list of modules to be pipelined * and config in JSON format. @@ -108,11 +230,12 @@ void PipelineExecutor::Init(const std::vector& modules, const std::strin // Use JSONReader to load pipeline configuration. std::istringstream is(pipeline_json); dmlc::JSONReader reader(&is); - PipelineConfig& pipeline_config = this->LoadPipelineConfig(&reader); + ConfigPipelineExecution& pipeline_config = this->LoadConfigPipelineExecution(&reader); ICHECK(!pipeline_config.Empty()) << "The pipeline config information is empty."; + num_outputs_ = pipeline_config.GetGlobalOutputNum(); // Initialize the pipeline function class used for pipeline thread pool management - // and schedule etc. This function returns the number of output. - num_outputs_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config); + // and schedule etc. This function returns a list of backend runtime. + runtimes_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config); return; } diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index a883ba25ec08..abc977070059 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -28,8 +28,10 @@ #include #include +#include #include #include +#include #include #include "pipeline_scheduler.h" @@ -70,11 +72,56 @@ class TVM_DLL PipelineExecutor : public ModuleNode { /*! * \brief Get the number of outputs. - * * \return The number of outputs. */ int NumOutputs() const { return num_outputs_; } - + /*!\brief Return the number of inputs*/ + int NumInputs() const; + /*! + * \brief Use the input name to set the input data of pipeline executor. + * \param input_name The input name. + * \param data_in The input data. + */ + void SetInput(std::string input_name, DLTensor* data_in); + /*! + * \brief Use the input name to get the input data. + * \param input name The input name. + * \return Return input data. + */ + NDArray GetInput(std::string input_name); + /*! + * \brief Use the param name to get the specific backend runtime then use the param_key_name + * to set param data for the said backend runtime. + */ + void SetParam(std::string param_name, std::string param_key_name, DLTensor* data_in); + /*! + * \brief Run the pipeline executor. + * \param serialized_mode Whether run the pipeline executor in serialized mode. + */ + void Run(bool serialized_mode); + /*! + * \brief Stop the pipeline executor. + */ + void Stop(); + /*! + * \brief Get a list output of pipeline. For one input data the pipeline may generate multiple + * outputs, this function will return all of these outputs in a list. + * \return A list of output data. + */ + Array GetOutput(); + /*! + * \brief A pipeline input with a specific name correspond with a input of a specific + * backend module, this function return a module index and a input index in "pair" + * form for a input name. + * return Return a module index and a input index. + */ + std::pair GetInputIndex(const std::string& name); + /*! + * \brief A pipeline params with a specific name correspond with the params of a specific + * backend module, this function return the module index for the params name. + * return Return backend runtime module index. + */ + int GetParamModuleIndex(const std::string& name); /*!\brief Load the module files information.*/ ModuleConfig& LoadModuleConfig(dmlc::JSONReader* reader) { reader->BeginArray(); @@ -112,41 +159,40 @@ class TVM_DLL PipelineExecutor : public ModuleNode { } private: + /*!\brief Json loader.*/ + ConfigPipelineExecution& LoadConfigPipelineExecution(dmlc::JSONReader* reader) { + reader->BeginObject(); + std::string key; + while (reader->NextObjectItem(&key)) { + if (key == "module_connection") { + reader->Read(&pipeline_config_); + } else if (key == "input_connection") { + reader->Read(&input_connection_config); + } else if (key == "param_connection") { + reader->Read(¶m_connection_config); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + return pipeline_config_; + } + /*!\brief The class used to execute and schedule the pipeline logic.*/ PipelineScheduler pipeline_scheduler_; /*!\brief The dependency information of each graph runtime module of the pipeline.*/ - PipelineConfig pipeline_config_; + ConfigPipelineExecution pipeline_config_; + /*!\brief The mapping of global input and backend runtime module input.*/ + InputConnectionConfig input_connection_config; + /*!\brief The mapping of global params and backend runtime module params.*/ + ParamConnectionConfig param_connection_config; /*!\brief The module information used to create the graph runtimes.*/ ModuleConfig mod_config_; + /*!The list of backend runtime module.*/ + std::vector> runtimes_; /*!\brief How many outputs are in this pipeline executor.*/ size_t num_outputs_ = 0; - /*!\brief Json loader.*/ - PipelineConfig& LoadPipelineConfig(dmlc::JSONReader* reader) { - reader->BeginArray(); - while (reader->NextArrayItem()) { - std::string key; - reader->BeginObject(); - int mod_idx = -1; - OutputMap output; - std::string dev; - while (reader->NextObjectItem(&key)) { - if (key == "mod_idx") { - reader->Read(&mod_idx); - } else if (key == "dev") { - reader->Read(&dev); - } else if (key == "output") { - reader->Read(&output); - } else { - LOG(FATAL) << "do not support key " << key; - } - } - ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; - // Check if the output is successfully read. - ICHECK(!output.Empty()) << "Invalid output binding result."; - pipeline_config_.Insert(mod_idx, output); - } - return pipeline_config_; - } + /*!\brief The list of pipeline output*/ + Array output_array; }; } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 82caf855a479..cc203bb59c05 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -18,6 +18,7 @@ */ #include "pipeline_scheduler.h" +#include #include #include namespace tvm { @@ -26,12 +27,97 @@ namespace runtime { * \brief Initialize the pipeline. * \param modules The list of graph executor modules. * \param pipeline_conf The dependency information of each graph executor module. + * \return Return a list of backend runtime module. */ -size_t PipelineScheduler::PipelineInit(const std::vector& modules, - const PipelineConfig& pipeline_config) { +std::vector> PipelineScheduler::PipelineInit( + const std::vector& modules, ConfigPipelineExecution pipeline_config) { + std::vector> runtimes; graph_modules_ = modules; - int num_output = pipeline_config.GetGlobalOutputNum(); - return num_output; + for (size_t i = 0; i < graph_modules_.size(); i++) { + auto runItem = std::make_shared(graph_modules_[i], i); + runtimes.push_back(runItem); + } + // Initialize the outputs array. + auto& global_output_map = pipeline_config.GetGlobalConfigOutputBindings(); + for (size_t i = 0; i < global_output_map.size(); i++) { + if (global_output_map.find(i) == global_output_map.end()) { + LOG(FATAL) << "Not find global output index " << i; + } + ModuleOutputPair& output_pair = global_output_map[i]; + NDArray output = runtimes[output_pair.mod_idx]->CreateFromOutput(output_pair.output_idx); + output_array.push_back(output); + } + return runtimes; } + +/*! + * \brief Exeute in the serialized mode. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + */ +void PipelineScheduler::PipelineRunSerial( + const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config) { + for (size_t i = 0; i < runtimes.size(); i++) { + // The offset in vector is the runtime execution order, this offset value should + // be same with the the value of "runtime_idx" in runtime. + if (static_cast(i) != runtimes[i]->GetModuleIndex()) { + LOG(FATAL) << "runtime index " << runtimes[i]->GetModuleIndex() + << " is not same as vector offset value " << i; + } + + if (!pipeline_config.FindModuleInConfig(i)) { + LOG(FATAL) << "Not find the configuration for the module " << i; + } + + runtimes[i]->Run(); + // Check if there is any output need to be forward to other graph module or to be as + // global output. + int outputs_num = runtimes[i]->NumOutputs(); + for (int j = 0; j < outputs_num; j++) { + ConfigBindings& out_binding = pipeline_config[i][j]; + std::unordered_map& input_connections = out_binding.Get(); + NDArray output = runtimes[i]->GetOutput(j); + for (auto bind : input_connections) { + // If the value of "bind.first" less then 0 then this is not a graph module binding. + if (bind.first < 0) continue; + // Set input data for the graph module. + runtimes[bind.first]->SetInput(bind.second, const_cast(output.operator->())); + } + // Store the output. + if (out_binding.IsGlobalOutput()) { + int global_idx = out_binding.GetGlobalOutputIndex(); + TVMArrayCopyFromTo(const_cast(output.operator->()), + const_cast(output_array[global_idx].operator->()), nullptr); + } + } + } +} +/*! + * \brief Execute pipeline. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + * \param serialize_mode If the execution is serialized. + */ +void PipelineScheduler::PipelineRun(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config, bool serialize_mode) { + if (!serialize_mode) { + // TODO(huajsj) remove this check after all of pipeline features in. + LOG(FATAL) << "Currently Only supports serialized mode."; + } else { + PipelineRunSerial(runtimes, pipeline_config); + } +} +/*! + * \brief Stop the pipeline exection. + */ +void PipelineScheduler::PipelineStop() { + // TODO(huajsj) Remove this. + std::cout << __FUNCTION__ << std::endl; +} +/*! + * \brief Get a list of outputs of the pipeline execution. + */ +Array PipelineScheduler::PipelineGetOutput() { return output_array; } } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h index 5ee127edffa3..8440bdc1a1a2 100644 --- a/src/runtime/pipeline/pipeline_scheduler.h +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -28,6 +28,7 @@ #include #include "pipeline_struct.h" + namespace tvm { namespace runtime { /*! @@ -40,13 +41,40 @@ class PipelineScheduler { * \brief Initialize the pipeline. * \param modules The list of graph executor module. * \param pipeline_config The dependency information of each graph executor module. + * \return Return a list of backend runtime module. + */ + std::vector> PipelineInit( + const std::vector& modules, ConfigPipelineExecution pipeline_config); + /*! + * \brief Execute pipeline. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + * \param serialize_mode If the execution is serialized. + */ + void PipelineRun(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config, bool serialize_mode = false); + /*! + * \brief Exeute in the serialized mode. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + */ + void PipelineRunSerial(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config); + /*! + * \brief Stop the pipeline exection. + */ + void PipelineStop(); + /*! + * \brief Get a list of outputs of the pipeline execution. */ - size_t PipelineInit(const std::vector& modules, const PipelineConfig& pipeline_config); + Array PipelineGetOutput(); private: /*!\brief The list of graph executors.*/ - std::vector graph_modules_; + std::vector graph_modules_; + /*!\brief A list of NDArray used to record the outputs.*/ + Array output_array; }; -} // namespace runtime -} // namespace tvm +}; // namespace runtime +}; // namespace tvm #endif // TVM_RUNTIME_PIPELINE_PIPELINE_SCHEDULER_H_ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 3cc9621702c1..a12a0064cd12 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -21,15 +21,41 @@ #include #include #include +#include +#include #include #include #include +#include #include +namespace tvm { +namespace runtime { +#define GLOBAL_MODULE_INDEX -1 +/*! + *\brief The mapping of a module output and a global output in the graph module. + */ +struct GlobalOutputPair { + int mod_output_idx; + int global_output_idx; + GlobalOutputPair(const int idx, const int gidx) : mod_output_idx(idx), global_output_idx(gidx) {} + GlobalOutputPair() {} +}; + +/*! + *\brief Use the module index and the output index to specify a module output. + */ +struct ModuleOutputPair { + int mod_idx; + int output_idx; + ModuleOutputPair(const int midx, const int idx) : mod_idx(midx), output_idx(idx) {} + ModuleOutputPair() {} +}; /*! * \brief All binding information of a output interface. */ -struct OutputBindings { +class ConfigBindings { + private: /*!\brief Output interface binding information, 'int' is the index of the module that * uses this output data as the input interface data, 'string' is the input interface name * of the module. @@ -37,8 +63,32 @@ struct OutputBindings { std::unordered_map bindings; /*! The index value of the global interface to which the current output are bound.*/ int global_output_index = std::numeric_limits::min(); + + public: + /*! + *\brief Return the memeber variable "bindings". + */ + std::unordered_map& Get() { return bindings; } + /*!\brief Get the value of global outpu index.*/ + int GetGlobalOutputIndex() const { return global_output_index; } /*!\brief Whether this binding is bound to the PipelineExecutor output interface.*/ bool IsGlobalOutput() const { return global_output_index >= 0; } + + /*! + *\brief The number of bindings of input and output. one input only can bind with one + * specific output, hence this number also is the number that how many module input data + * source is internal moudle output. + * return The number of binding in this module. + */ + size_t GetInputOutputBindingNum(void) { + size_t ret = 0; + for (auto connection : bindings) { + // Filter out the global output. + if (connection.first == GLOBAL_MODULE_INDEX) continue; + ret++; + } + return ret; + } /*! * \brief Create a module interface map from JSONReader. * \param reader JSON reader. @@ -81,25 +131,39 @@ struct OutputBindings { } } }; - /*! * \brief The binding information of all outputs of a module. */ -struct OutputMap { +class ConfigOutputBindings { + private: /*! \brief Output binding map, 'int' is output interface index.*/ - std::unordered_map output_binding_map; - OutputMap& operator=(const OutputMap& output) { + std::unordered_map output_binding_map; + + public: + ConfigOutputBindings& operator=(const ConfigOutputBindings& output) { output_binding_map = output.output_binding_map; return *this; } - /*!\brief This function is used to verify whether OutputMap is successfully loaded. - * \return Return true to indicate that this class has not been successfully loaded. + ConfigBindings& operator[](const int key) { + ICHECK(output_binding_map.find(key) != output_binding_map.end()); + return output_binding_map[key]; + } + /*! + *\brief Check if there is a output with the specify index in this map. + */ + bool FindOutputInMap(int output_idx) { + return output_binding_map.find(output_idx) != output_binding_map.end(); + } + /*! + *\brief This function is used to verify whether ConfigOutputBindings is successfully loaded. + *\return Return true to indicate that this class has not been successfully loaded. */ bool Empty() { return output_binding_map.empty(); } - /*! \brief The pipeline outputs is the final outputs of pipeline, this function is used to - * get how many pipeline outputs are in this Outputmap - * \return Number of pipeline outputs. + /*! + * \brief The pipeline outputs is the final outputs of pipeline, this function is used to + * get how many pipeline outputs are in this Outputmap + * \return Number of pipeline outputs. */ size_t GetGlobalOutputNum(void) const { size_t num_output = 0; @@ -108,7 +172,29 @@ struct OutputMap { } return num_output; } - + /*! + *\brief Get the mapping of all global outputs and module outputs in this module. + *\return A list of "GlobalOutputPair". + */ + std::vector GetGlobalConfigOutputBindings(void) { + std::vector ret; + for (auto bindings : output_binding_map) { + if (bindings.second.IsGlobalOutput()) { + ret.push_back(GlobalOutputPair(bindings.first, bindings.second.GetGlobalOutputIndex())); + } + } + return ret; + } + /*! + *\brief How many inputs are binding with a backend module output in this module. + */ + size_t GetInputOutputBindingNum() { + size_t ret = 0; + for (auto x : output_binding_map) { + ret += x.second.GetInputOutputBindingNum(); + } + return ret; + } /*! * \brief Create a output binding map from JSONReader. * \param reader Json reader. @@ -119,7 +205,7 @@ struct OutputMap { std::string key; reader->BeginObject(); int output_idx = -1; - OutputBindings binding; + ConfigBindings binding; while (reader->NextObjectItem(&key)) { if (key == "output_idx") { reader->Read(&output_idx); @@ -134,37 +220,316 @@ struct OutputMap { } } }; + +/*! + * \brief A map of the global module input interfaces and the graph modudles input interfaces. + */ +struct InputConnectionConfig { + /*!\brief The key is the name of global module input interfaces. the value is the pair of + * the index of a graph module and the name of a graph module input interface. + */ + std::unordered_map> input_connection; + bool Empty() { return input_connection.empty(); } + std::pair operator[](const std::string key) { + if (input_connection.find(key) == input_connection.end()) { + LOG(FATAL) << "Not find the key " << key; + } + return input_connection[key]; + } + + size_t size() const { return input_connection.size(); } + /*! + * \brief Create a input connection config from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + reader->BeginObject(); + std::string key; + std::string global_interface_name; + std::string module_interface_name; + int mod_idx = -1; + while (reader->NextObjectItem(&key)) { + if (key == "global_interface_name") { + reader->Read(&global_interface_name); + } else if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "module_interface_name") { + reader->Read(&module_interface_name); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + ICHECK(!global_interface_name.empty()) << "Invalid global interface name value"; + ICHECK(!module_interface_name.empty()) << "Invalid module interface name value"; + input_connection[global_interface_name] = make_pair(mod_idx, module_interface_name); + } + } +}; +/*! + * \brief A map of the global module param interfaces and the graph modudles param. + */ +struct ParamConnectionConfig { + /*!\brief The key is the name of global module param interfaces. the value is the + * index of a graph module. + */ + std::unordered_map param_connection; + bool Empty() { return param_connection.empty(); } + int operator[](const std::string key) { + if (param_connection.find(key) == param_connection.end()) { + LOG(FATAL) << "do not support key " << key; + } + return param_connection[key]; + } + /*! + * \brief Create a param connection config from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + reader->BeginObject(); + std::string key; + std::string global_param_name; + int mod_idx = -1; + while (reader->NextObjectItem(&key)) { + if (key == "global_param_name") { + reader->Read(&global_param_name); + } else if (key == "mod_idx") { + reader->Read(&mod_idx); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + ICHECK(!global_param_name.empty()) << "Invalid global param name value"; + param_connection[global_param_name] = mod_idx; + } + } +}; /*! * \brief The binding or dependency information of each module output interface. */ -struct PipelineConfig { - /*!\brief The key is the module index, this variable records all module pipeline configuration +class ConfigPipelineExecution { + private: + /* + *!\brief The key is the module index, this variable records all module pipeline configuration * information. */ - std::unordered_map config; - OutputMap& operator[](int key) { + std::unordered_map config; + /* + *\brief The key is the global output index, this variable records the mapping of global output + * and the module output. + */ + std::unordered_map global_output_map; + /* + *\brief The number of binding of module outputs and inputs. + */ + size_t module_input_output_binding_total_num; + + public: + ConfigOutputBindings& operator[](int key) { ICHECK(config.find(key) != config.end()); return config[key]; } + /*! + *\brief Check if the module index existing in the "config". + */ + bool FindModuleInConfig(int mod_idx) { return config.find(mod_idx) != config.end(); } + /*! + *\brief Build the mapping of key and "ConfigOutputBindings", key is module index. + */ + void Insert(int key, const ConfigOutputBindings& map) { config[key] = map; } - void Insert(int key, const OutputMap& map) { config[key] = map; } - - /*!\brief This function is used to verify whether config is loaded successfully. + /* + *!\brief This function is used to verify whether config is loaded successfully. * \return Return true to indicate that this class has not been successfully loaded. */ bool Empty() { return config.empty(); } - /*! * \brief Get the number of global outputs. * \return The number of outputs the entire pipeline has. */ size_t GetGlobalOutputNum() const { - size_t num_output = 0; + // The number of pipeline outputs is the size of "global_output_map"; + return global_output_map.size(); + } + /* + *!\brief Get the map of global outputs and module outputs. + */ + std::unordered_map& GetGlobalConfigOutputBindings(void) { + return global_output_map; + } + /* + *!\brief Get the number of module output and module input bindings. + */ + size_t GetInputOutputBindingNum() const { return module_input_output_binding_total_num; } + /* + *!\brief Parse the config to construct data struct using in pipeline execution. + */ + void ParseConfiguration(const std::unordered_map& config) { + if (config.empty()) { + LOG(FATAL) << "The Configuration loading not finish yet."; + } + module_input_output_binding_total_num = 0; for (auto mod_output : config) { - num_output += mod_output.second.GetGlobalOutputNum(); + // Get the numbers of binding of input and output. + module_input_output_binding_total_num += mod_output.second.GetInputOutputBindingNum(); + // Use global output index as key to create a mapping of global index and module output. + const std::vector& global_output = + mod_output.second.GetGlobalConfigOutputBindings(); + + for (auto output : global_output) { + global_output_map[output.global_output_idx] = + ModuleOutputPair(mod_output.first, output.mod_output_idx); + } } - return num_output; + return; + } + /*! + * \brief Create a pipeline config from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + int mod_idx = -1; + ConfigOutputBindings output; + std::string dev; + while (reader->NextObjectItem(&key)) { + if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "dev") { + reader->Read(&dev); + } else if (key == "output") { + reader->Read(&output); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + // Check if the output is successfully read. + ICHECK(!output.Empty()) << "Invalid output binding result."; + Insert(mod_idx, output); + } + // Call this function after "config" loading finished. + ParseConfiguration(config); + } +}; +/* + *\brief Runtime of backend. + */ +class BackendRuntime { + private: + /*\brief The index of runtime indicate the position in the pipeline.*/ + int runtime_idx; + /*\brief The Runtime module of a backedn graph executor.*/ + Module module; + /*! + *\brief To transfer data between two different backends, we need a local + * tensor variable as a medium. This variable is a mapping of input data and local + * data. + */ + std::unordered_map input_tensor_local_copy; + /*!\brief The packed functions.*/ + tvm::runtime::PackedFunc run; + tvm::runtime::PackedFunc set_input; + tvm::runtime::PackedFunc get_input; + tvm::runtime::PackedFunc get_output; + tvm::runtime::PackedFunc get_num_output; + tvm::runtime::PackedFunc get_num_inputs; + tvm::runtime::PackedFunc get_input_index; + /*!\brief The new DLTensor have same shape, data type with a existing DLTensor.*/ + DLTensor* CreateFromDLTensor(const DLTensor* from) { + DLTensor* ret = NULL; + TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, + kDLCPU, 0, &ret); + return ret; + } + /*!\brief The new NDArray have same shape, data type with an existing DLTensor.*/ + NDArray CreateNDArrayFromDLTensor(const DLTensor* from) { + std::vector shape; + for (int i = 0; i < from->ndim; i++) { + shape.push_back(from->shape[i]); + } + auto ndarray = NDArray::Empty(shape, from->dtype, from->device); + ndarray.CreateView(shape, from->dtype); + return ndarray; + } + /* + *\brief Copy data from a DLTensor to another DLTensor. + */ + void CopyFromTo(DLTensor* from, DLTensor* to) { + // If the source device and target device is not same, we use a local DLTensor + // as a medium to do the cross device copy work. + if (!(from->device.device_type == to->device.device_type || + from->device.device_type == kDLCPU || to->device.device_type == kDLCPU)) { + DLTensor* dltensor_local = nullptr; + if (input_tensor_local_copy.find(to) == input_tensor_local_copy.end()) { + dltensor_local = CreateFromDLTensor(from); + input_tensor_local_copy[to] = dltensor_local; + } else { + dltensor_local = input_tensor_local_copy[to]; + } + TVMArrayCopyFromTo(from, dltensor_local, nullptr); + from = dltensor_local; + } + + TVMArrayCopyFromTo(from, to, nullptr); + } + + public: + BackendRuntime(Module mod, int mod_idx) { + module = mod; + runtime_idx = mod_idx; + get_input_index = module.GetFunction("get_input_index"); + get_num_output = module.GetFunction("get_num_outputs"); + get_num_inputs = module.GetFunction("get_num_inputs"); + set_input = module.GetFunction("set_input"); + get_input = module.GetFunction("get_input"); + get_output = module.GetFunction("get_output"); + run = module.GetFunction("run"); + } + BackendRuntime(void) {} + ~BackendRuntime() { + for (auto data : input_tensor_local_copy) { + TVMArrayFree(data.second); + } + } + /*!\brief Create a new NDArray which have same shape, data type with a module output. */ + NDArray CreateFromOutput(int idx) { + NDArray data = get_output(idx); + return CreateNDArrayFromDLTensor(const_cast(data.operator->())); + } + /*!\brief Return the moudle index.*/ + int GetModuleIndex() { return runtime_idx; } + /*!\brief Return the number of output*/ + int NumOutputs() const { return get_num_output(); } + /*!\brief Return the number of input*/ + int NumInputs() const { return get_num_inputs(); } + /*!\brief Use input index to set data to the runtime module.*/ + void SetInput(const int index, DLTensor* data_in) { + NDArray input = get_input(index); + DLTensor* dltensor_input = const_cast(input.operator->()); + CopyFromTo(data_in, dltensor_input); + } + /*!\brief Use the input name to set dat ato the runtime moduel. */ + void SetInput(const std::string name, DLTensor* data_in) { + int index = this->GetInputIndex(name); + SetInput(index, data_in); } + /*!\brief Use output index to get a module output.*/ + NDArray GetOutput(int index) { return get_output(index); } + /*!\brief Run the runtime.*/ + void Run() { run(); } + /*!\brief Use the index to a module input.*/ + NDArray GetInput(int index) const { return get_input(index); } + /*!\bief Use a input name to get the corresponding index of input.*/ + int GetInputIndex(const std::string& name) { return get_input_index(name); } }; /*! * \brief The information used to initialize the graph executor module, the information @@ -182,4 +547,6 @@ struct GraphModuleLoadInfo { }; /*! The Module information of each module.The 'int' is module index. */ using ModuleConfig = std::unordered_map; +}; // namespace runtime +}; // namespace tvm #endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 4a9b7eacdf65..4ed7094ac45e 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -71,6 +71,19 @@ def get_mannual_mod(): return mods, dshape +def recreate_parameters(mod): + # Get the binding parameters from a module, then create a same parameters which have different + # data value. This function can be used to test "set_param" function. + with tvm.transform.PassContext(opt_level=3): + lib = relay.build(mod, "llvm") + + mod1_customized_params = {} + for key, value in lib.params.items(): + new_value = value.numpy() + np.full(value.shape, 10).astype(value.dtype) + mod1_customized_params[key] = tvm.nd.array(new_value) + return mod1_customized_params, mod + + def get_manual_conf(mods, target): # This function is used to generate manual pipeline configuration. mod_config = {} @@ -126,6 +139,69 @@ def get_manual_conf(mods, target): return mod_config +def run_modules( + mod_configs, + dev, + target, + global_input_name, + global_input_data, + mod_set_input, + input_name, + input_data, + params_mod=None, + params=None, +): + # Use graph executor to run these modules in serialized. The resturn data is used to + # verify the result comming from pipeline executor. + mod_input = {} + final_output = {} + idx = 0 + for mod in mod_configs: + with tvm.transform.PassContext(opt_level=3): + lib = relay.build(mod, target) + + m = graph_executor.GraphModule(lib["default"](dev)) + # Get the input information from "mod_input" then set the input to the module. + if idx in mod_input: + for input in mod_input[idx]: + input = mod_input[idx][input] + m.set_input(input["index"], input["data"]) + else: + m.set_input(global_input_name, global_input_data) + + # Set input data to the module which is "mod_set_input". + if mod == mod_set_input: + m.set_input(input_name, input_data) + # If the module is "params_mod" then set the parameters to this module. + if params_mod == mod: + m.set_input(None, None, **params) + + m.run() + n = m.get_num_outputs() + # Set current output as inputs of next module. + mconfig = mod_configs[mod] + for output in mconfig["pipeline"]["output"]: + output_data = m.get_output(output["output_idx"]).numpy() + for dep in output["dependencies"]: + is_global = False + if "global_output_index" in dep: + is_global = True + name = dep["global_output_index"] + else: + mod_idx = dep["mod_idx"] + name = dep["input_name"] + if is_global: + final_output[name] = output_data + else: + if mod_idx in mod_input: + mod_input[mod_idx][name] = {"index": name, "data": output_data} + else: + mod_input[mod_idx] = {name: {"index": name, "data": output_data}} + idx = idx + 1 + + return final_output + + def test_pipe_config_check(): # This function is used to trigger runtime error by applying wrong logic connection. @@ -172,7 +248,8 @@ def test_pipeline(): for target in target_list: # Get the three pipeline modules here. (mod1, mod2, mod3), dshape = get_mannual_mod() - + # Choose a module to create new parameters. + customized_parameters, customized_mod = recreate_parameters(mod1) # Prepare batch data for pipeline computation. datas = [] for i in range(5): @@ -180,6 +257,8 @@ def test_pipeline(): pipe_config = pipeline_executor.PipelineConfig() + # The pipeline param named "param_0" will be connected to the param of mod1. + pipe_config["param"]["param_0"].connect(pipe_config[customized_mod]["param"]) # The pipeline input named "data_0" will be connected to a input named "data_0" # of mod1. pipe_config["input"]["data_0"].connect(pipe_config[mod1]["input"]["data_0"]) @@ -228,7 +307,8 @@ def test_pipeline(): pipe_config[mod3].dev = tvm.cpu(0) # Here is to check the correctness of the configuration generated by API. - assert pipe_config.get_config() == get_manual_conf([mod1, mod2, mod3], target) + mconfig = pipe_config.get_config() + assert mconfig["module_connection"] == get_manual_conf([mod1, mod2, mod3], target) # Build and create a pipeline module. with tvm.transform.PassContext(opt_level=3): @@ -248,6 +328,46 @@ def test_pipeline(): # Use the import function to create and initialize PipelineModule. pipeline_module_test = pipeline_executor.PipelineModule.load_library(config_file_name) assert pipeline_module_test.num_outputs == 2 + # Set customized params + pipeline_module_test.set_params("param_0", customized_parameters) + for data in datas: + # Get the result from the graph executor execution without setting customized + # parameters. + wrong_output = run_modules( + mconfig["module_connection"], + tvm.cpu(), + "llvm", + "data_0", + data, + mod2, + "data_1", + data, + ) + # Get the result from the graph executor execution with setting customized + # parameters. + normal_output = run_modules( + mconfig["module_connection"], + tvm.cpu(), + "llvm", + "data_0", + data, + mod2, + "data_1", + data, + customized_mod, + customized_parameters, + ) + # Get the result from the pipeline executor + pipeline_module_test.set_input("data_0", data) + pipeline_module_test.set_input("data_1", data) + # Run the pipeline executor in serialized. + pipeline_module_test.run(True) + outputs = pipeline_module_test.get_output() + for i in range(len(outputs)): + tvm.testing.assert_allclose(normal_output[i], outputs[i].numpy()) + assert not (normal_output[i] == wrong_output[i]).all() + + pipeline_module_test.stop() if __name__ == "__main__": From 460c3164bee89c6531d7327648b4fddccbf6caec Mon Sep 17 00:00:00 2001 From: huajsj Date: Thu, 11 Nov 2021 21:44:44 -0800 Subject: [PATCH 2/5] address review comments. --- python/tvm/contrib/pipeline_executor.py | 86 ++++----- src/runtime/pipeline/pipeline_executor.cc | 22 ++- src/runtime/pipeline/pipeline_scheduler.cc | 19 +- src/runtime/pipeline/pipeline_scheduler.h | 10 +- src/runtime/pipeline/pipeline_struct.h | 178 +++++++++---------- tests/python/relay/test_pipeline_executor.py | 5 +- 6 files changed, 163 insertions(+), 157 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index d8e3b0bfd33f..7c2dc57880b7 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -17,7 +17,6 @@ """Pipeline executor that executes a series of modules in a pipeline fashion.""" import json import os -import numpy as np import tvm._ffi from tvm import relay from tvm.relay.transform import InferType @@ -53,6 +52,8 @@ def build(pipe_configs): config = pipe_configs.get_config() if "module_connection" not in config: raise RuntimeError('"module_connection" is missing') + if "input_connection" not in config: + raise RuntimeError('"input_connection" is missing') mod_n_configs = config["module_connection"] config_len = len(mod_n_configs) @@ -84,8 +85,6 @@ def build(pipe_configs): # Merge the "input_connection", the "param_connection" and the "module_connection" into one # configuration. string_config = {} - if "input_connection" not in config: - raise RuntimeError('"input_connection" is missing') string_config["input_connection"] = config["input_connection"] string_config["param_connection"] = config["param_connection"] string_config["module_connection"] = module_string_config @@ -147,11 +146,9 @@ def set_params(self, params_name, params_data): params_data : dict of str to NDArray A list of params data and params key name. """ - if params_data: - keys = list(params_data.keys()) - keys.sort(key=lambda x: -np.prod(params_data[x].shape)) - for k in keys: - self._set_param(params_name, k, tvm.nd.array(params_data[k], tvm.cpu())) + keys = list(params_data.keys()) + for k in keys: + self._set_param(params_name, k, tvm.nd.array(params_data[k], tvm.cpu())) def get_input(self, key): """Get the input via a input name. @@ -159,18 +156,22 @@ def get_input(self, key): ---------- key : str The input key + + Returns + ------- + data : NDArray + Then input data. """ self._get_input(key) def get_output(self): """Get the output. - Parameters: + Returns: ----------- - out : Array[NDArray] + data : Array[NDArray] A list of output data. """ - return self._get_output() @property @@ -259,7 +260,6 @@ def __init__(self, owner, io_type, name, data_type=None): self.bindings = [] # Parents interfaces that this interface depend on. self.parents = [] - self.data_type = data_type def get_name(self): @@ -287,21 +287,43 @@ def is_pipeline_executor_interface(self): return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper) def __repr__(self): - # Get all binding information in the form of string. - ret, _ = self.format() - return ret - - def format(self): - """Obtain binding information in the form of string and dictionary.""" + # Get the binding information in the form of string. str_format = " |{}: ".format(self.name) - dict_format = {"interface_name": self.name, "connection": []} for binding in self.bindings: mname, dname = binding.get_name() + str_format += "{0}:{1} ".format(mname, dname) + + return str_format + + def check_dict(self, connection_dict): + """Check the dict form of this binding. + Parameter + --------- + connection_dict : Dict[str, Any] + The dict of input or parameters connection. + """ + if "interface_name" not in connection_dict: + raise RuntimeError(f'"inteface_name" is missing in global config!"') + if "connection" not in connection_dict: + raise RuntimeError(f'"connection" is missing!"') + # The global interface mapping should be one-to-one. + if not connection_dict["connection"]: + raise RuntimeError(f"The global interface map is empty!") + if len(connection_dict["connection"]) > 1: + raise RuntimeError(f"A global interface maps multiple module interfaces!") + if "mod_idx" not in connection_dict["connection"][0]: + raise RuntimeError(f'"mod_idx" is missing!') + + def get_binding_dict(self): + # Return the binding information in the form of dict. + dict_format = {"interface_name": self.name, "connection": []} + for binding in self.bindings: + _, dname = binding.get_name() midx = binding.get_owner_idx() dict_format["connection"].append({"mod_idx": midx, "interface_name": dname}) - str_format += "{0}:{1} ".format(mname, dname) - return str_format, dict_format + self.check_dict(dict_format) + return dict_format def check_dag_acyclic(self, start, inputs): """This is to check whether the DAG containing these input interfaces is acyclic. @@ -588,19 +610,6 @@ def get_config(self): will be used to create pipeline executor. """ - def check_data_param(data_dict): - if "interface_name" not in data_dict: - raise RuntimeError(f'"inteface_name" is missing in global config!"') - if "connection" not in data_dict: - raise RuntimeError(f'"connection" is missing!"') - # The global interface mapping should be one-to-one. - if len(data_dict["connection"]) == 0: - raise RuntimeError(f"The global interface map is empty!") - if len(data_dict["connection"]) > 1: - raise RuntimeError(f"A global interface maps multiple module interfaces!") - if "mod_idx" not in data_dict["connection"][0]: - raise RuntimeError(f'"mod_idx" is missing!') - # Use topological sort to get the correct order of modules. self.dag_topology_sort() mconfig = {} @@ -646,11 +655,9 @@ def check_data_param(data_dict): # Create a mapping of global input and module input. input_connection = [] for input_name in self.input_bindings.bindings: - _, input_dict = self.input_bindings.bindings[input_name].format() - # Check the correctness of "input_dict". - check_data_param(input_dict) + input_dict = self.input_bindings.bindings[input_name].get_binding_dict() if "interface_name" not in input_dict["connection"][0]: - raise RuntimeError(f'"interface_name is missing in connection config"!') + raise RuntimeError(f"interface_name is missing in connection config!") # Establish the mapping of global interface and the mapping of module interfaces. input_map = { "global_interface_name": input_dict["interface_name"], @@ -662,8 +669,7 @@ def check_data_param(data_dict): # Create a mapping of global param and module param. param_connection = [] for param_name in self.param_bindings.bindings: - _, param_dict = self.param_bindings.bindings[param_name].format() - check_data_param(param_dict) + param_dict = self.param_bindings.bindings[param_name].get_binding_dict() param_map = { "global_param_name": param_dict["interface_name"], "mod_idx": param_dict["connection"][0]["mod_idx"], diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 816a90f4cf0a..bc7c272fbb0f 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -77,9 +77,9 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, return nullptr; } /*! - * \brief There are some input called pipeline global input that user need to use function - "set_input" to set the data for it, this function return the number of such global input. - \return Return the number of pipeline global input. + * \brief Pipeline global inputs are the data inputs that have to be set by users with + * "set_input". This function returns the number of pipeline global inputs. + * \return Return the number of pipeline global inputs. */ int PipelineExecutor::NumInputs() const { @@ -90,19 +90,19 @@ int PipelineExecutor::NumInputs() const { for (auto runtime : runtimes_) { ret += runtime->NumInputs(); } - // Use the summary of all backend runtime module input number to minus the internal inputs - // number, then we will get the pipeline global input number + // Remove module inputs that are only used internally. ret -= internal_inputs_num; // Check whether these two numbers are equal. if (config_inputs_num != ret) { - LOG(FATAL) << "The number of inputs from the configuration file is inconsistent!"; + LOG(FATAL) << "Incorrect input number in configuration: Expected " << ret << " but got " + << config_inputs_num; } return ret; } /*! * \brief Return the input index and module index for a given input name. * \param name The input name. - * \return std::pair The module index and the input index. + * \return std::pair A pair of module index and the input index. */ std::pair PipelineExecutor::GetInputIndex(const std::string& name) { std::pair index = input_connection_config[name]; @@ -124,6 +124,10 @@ int PipelineExecutor::GetParamModuleIndex(const std::string& name) { */ void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { std::pair indexs = this->GetInputIndex(input_name); + if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { + this->Stop(); + LOG(FATAL) << "input name " << input_name << " not found."; + } runtimes_[indexs.first]->SetInput(indexs.second, data_in); } @@ -134,6 +138,10 @@ void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { */ NDArray PipelineExecutor::GetInput(std::string input_name) { std::pair indexs = this->GetInputIndex(input_name); + if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { + this->Stop(); + LOG(FATAL) << "input name " << input_name << " not found."; + } return runtimes_[indexs.first]->GetInput(indexs.second); } /*! diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index cc203bb59c05..f5a7fb9557db 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -44,18 +44,18 @@ std::vector> PipelineScheduler::PipelineInit( LOG(FATAL) << "Not find global output index " << i; } ModuleOutputPair& output_pair = global_output_map[i]; - NDArray output = runtimes[output_pair.mod_idx]->CreateFromOutput(output_pair.output_idx); + NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second); output_array.push_back(output); } return runtimes; } /*! - * \brief Exeute in the serialized mode. + * \brief Exeute in the sequential mode. * \param runtimes A list of backend runtimes module. * \param pipeline_config The dependency information of each graph executor module. */ -void PipelineScheduler::PipelineRunSerial( +void PipelineScheduler::PipelineRunSequential( const std::vector>& runtimes, ConfigPipelineExecution pipeline_config) { for (size_t i = 0; i < runtimes.size(); i++) { @@ -97,23 +97,22 @@ void PipelineScheduler::PipelineRunSerial( * \brief Execute pipeline. * \param runtimes A list of backend runtimes module. * \param pipeline_config The dependency information of each graph executor module. - * \param serialize_mode If the execution is serialized. + * \param sequential_mode If the execution is in sequential mode. */ void PipelineScheduler::PipelineRun(const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config, bool serialize_mode) { - if (!serialize_mode) { + ConfigPipelineExecution pipeline_config, bool sequential_mode) { + if (!sequential_mode) { // TODO(huajsj) remove this check after all of pipeline features in. - LOG(FATAL) << "Currently Only supports serialized mode."; + LOG(FATAL) << "Currently Only supports sequential mode."; } else { - PipelineRunSerial(runtimes, pipeline_config); + PipelineRunSequential(runtimes, pipeline_config); } } /*! * \brief Stop the pipeline exection. */ void PipelineScheduler::PipelineStop() { - // TODO(huajsj) Remove this. - std::cout << __FUNCTION__ << std::endl; + // TODO(huajsj) Add stop logic. } /*! * \brief Get a list of outputs of the pipeline execution. diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h index 8440bdc1a1a2..4e097c0206b5 100644 --- a/src/runtime/pipeline/pipeline_scheduler.h +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -49,17 +49,17 @@ class PipelineScheduler { * \brief Execute pipeline. * \param runtimes A list of backend runtimes module. * \param pipeline_config The dependency information of each graph executor module. - * \param serialize_mode If the execution is serialized. + * \param sequential_mode If the execution is in Sequential mode. */ void PipelineRun(const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config, bool serialize_mode = false); + ConfigPipelineExecution pipeline_config, bool sequential_mode = false); /*! - * \brief Exeute in the serialized mode. + * \brief Exeute in the sequential mode. * \param runtimes A list of backend runtimes module. * \param pipeline_config The dependency information of each graph executor module. */ - void PipelineRunSerial(const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config); + void PipelineRunSequential(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config); /*! * \brief Stop the pipeline exection. */ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index a12a0064cd12..9cc5b04d1a84 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -34,23 +34,14 @@ namespace runtime { #define GLOBAL_MODULE_INDEX -1 /*! *\brief The mapping of a module output and a global output in the graph module. + * The first int is a module output index, the second int is a global output index. */ -struct GlobalOutputPair { - int mod_output_idx; - int global_output_idx; - GlobalOutputPair(const int idx, const int gidx) : mod_output_idx(idx), global_output_idx(gidx) {} - GlobalOutputPair() {} -}; - +using GlobalOutputPair = std::pair; /*! *\brief Use the module index and the output index to specify a module output. + * The first int is a module index, the second int is a module output index. */ -struct ModuleOutputPair { - int mod_idx; - int output_idx; - ModuleOutputPair(const int midx, const int idx) : mod_idx(midx), output_idx(idx) {} - ModuleOutputPair() {} -}; +using ModuleOutputPair = std::pair; /*! * \brief All binding information of a output interface. */ @@ -60,19 +51,19 @@ class ConfigBindings { * uses this output data as the input interface data, 'string' is the input interface name * of the module. */ - std::unordered_map bindings; + std::unordered_map bindings_; /*! The index value of the global interface to which the current output are bound.*/ - int global_output_index = std::numeric_limits::min(); + int global_output_index_ = std::numeric_limits::min(); public: /*! - *\brief Return the memeber variable "bindings". + *\brief Return the memeber variable "bindings_". */ - std::unordered_map& Get() { return bindings; } + std::unordered_map& Get() { return bindings_; } /*!\brief Get the value of global outpu index.*/ - int GetGlobalOutputIndex() const { return global_output_index; } + int GetGlobalOutputIndex() const { return global_output_index_; } /*!\brief Whether this binding is bound to the PipelineExecutor output interface.*/ - bool IsGlobalOutput() const { return global_output_index >= 0; } + bool IsGlobalOutput() const { return global_output_index_ > GLOBAL_MODULE_INDEX; } /*! *\brief The number of bindings of input and output. one input only can bind with one @@ -82,7 +73,7 @@ class ConfigBindings { */ size_t GetInputOutputBindingNum(void) { size_t ret = 0; - for (auto connection : bindings) { + for (auto connection : bindings_) { // Filter out the global output. if (connection.first == GLOBAL_MODULE_INDEX) continue; ret++; @@ -109,8 +100,8 @@ class ConfigBindings { reader->Read(&input_name); } else if (key == "global_output_index") { // There should be only one global binding. - ICHECK(global_output_index < 0); - reader->Read(&global_output_index); + ICHECK(global_output_index_ < 0); + reader->Read(&global_output_index_); // When the key value is 'global_output_index', it means that this output is bound to // a global interface. global_binding = true; @@ -121,12 +112,12 @@ class ConfigBindings { // When this output is bound to a global interface, check if the global interface index // start from 0. if (global_binding) { - ICHECK(global_output_index >= 0); + ICHECK(global_output_index_ >= 0); } else { // When this output is bound to a graph executor module interface, check if the module // index start from 0. ICHECK(mod_idx >= 0); - bindings[mod_idx] = input_name; + bindings_[mod_idx] = input_name; } } } @@ -136,30 +127,32 @@ class ConfigBindings { */ class ConfigOutputBindings { private: - /*! \brief Output binding map, 'int' is output interface index.*/ - std::unordered_map output_binding_map; + /*!\brief Output binding map, 'int' is output interface index.*/ + std::unordered_map output_binding_map_; public: ConfigOutputBindings& operator=(const ConfigOutputBindings& output) { - output_binding_map = output.output_binding_map; + output_binding_map_ = output.GetOutBindings(); return *this; } ConfigBindings& operator[](const int key) { - ICHECK(output_binding_map.find(key) != output_binding_map.end()); - return output_binding_map[key]; + ICHECK(output_binding_map_.find(key) != output_binding_map_.end()); + return output_binding_map_[key]; } + /*!brief Return the variable "output_binding_map_".*/ + std::unordered_map GetOutBindings() const { return output_binding_map_; } /*! *\brief Check if there is a output with the specify index in this map. */ bool FindOutputInMap(int output_idx) { - return output_binding_map.find(output_idx) != output_binding_map.end(); + return output_binding_map_.find(output_idx) != output_binding_map_.end(); } /*! *\brief This function is used to verify whether ConfigOutputBindings is successfully loaded. *\return Return true to indicate that this class has not been successfully loaded. */ - bool Empty() { return output_binding_map.empty(); } + bool Empty() { return output_binding_map_.empty(); } /*! * \brief The pipeline outputs is the final outputs of pipeline, this function is used to * get how many pipeline outputs are in this Outputmap @@ -167,7 +160,7 @@ class ConfigOutputBindings { */ size_t GetGlobalOutputNum(void) const { size_t num_output = 0; - for (auto bindings : output_binding_map) { + for (auto bindings : output_binding_map_) { num_output += bindings.second.IsGlobalOutput() ? 1 : 0; } return num_output; @@ -178,7 +171,7 @@ class ConfigOutputBindings { */ std::vector GetGlobalConfigOutputBindings(void) { std::vector ret; - for (auto bindings : output_binding_map) { + for (auto bindings : output_binding_map_) { if (bindings.second.IsGlobalOutput()) { ret.push_back(GlobalOutputPair(bindings.first, bindings.second.GetGlobalOutputIndex())); } @@ -190,7 +183,7 @@ class ConfigOutputBindings { */ size_t GetInputOutputBindingNum() { size_t ret = 0; - for (auto x : output_binding_map) { + for (auto x : output_binding_map_) { ret += x.second.GetInputOutputBindingNum(); } return ret; @@ -216,7 +209,7 @@ class ConfigOutputBindings { } } ICHECK(output_idx >= 0); - output_binding_map[output_idx] = binding; + output_binding_map_[output_idx] = binding; } } }; @@ -318,54 +311,49 @@ class ConfigPipelineExecution { *!\brief The key is the module index, this variable records all module pipeline configuration * information. */ - std::unordered_map config; + std::unordered_map config_; /* *\brief The key is the global output index, this variable records the mapping of global output * and the module output. */ - std::unordered_map global_output_map; + std::unordered_map global_output_map_; /* *\brief The number of binding of module outputs and inputs. */ - size_t module_input_output_binding_total_num; + size_t module_input_output_binding_total_num_; public: ConfigOutputBindings& operator[](int key) { - ICHECK(config.find(key) != config.end()); - return config[key]; + ICHECK(config_.find(key) != config_.end()); + return config_[key]; } /*! *\brief Check if the module index existing in the "config". */ - bool FindModuleInConfig(int mod_idx) { return config.find(mod_idx) != config.end(); } - /*! - *\brief Build the mapping of key and "ConfigOutputBindings", key is module index. - */ - void Insert(int key, const ConfigOutputBindings& map) { config[key] = map; } - + bool FindModuleInConfig(int mod_idx) { return config_.find(mod_idx) != config_.end(); } /* *!\brief This function is used to verify whether config is loaded successfully. * \return Return true to indicate that this class has not been successfully loaded. */ - bool Empty() { return config.empty(); } + bool Empty() { return config_.empty(); } /*! * \brief Get the number of global outputs. * \return The number of outputs the entire pipeline has. */ size_t GetGlobalOutputNum() const { // The number of pipeline outputs is the size of "global_output_map"; - return global_output_map.size(); + return global_output_map_.size(); } /* *!\brief Get the map of global outputs and module outputs. */ std::unordered_map& GetGlobalConfigOutputBindings(void) { - return global_output_map; + return global_output_map_; } /* *!\brief Get the number of module output and module input bindings. */ - size_t GetInputOutputBindingNum() const { return module_input_output_binding_total_num; } + size_t GetInputOutputBindingNum() const { return module_input_output_binding_total_num_; } /* *!\brief Parse the config to construct data struct using in pipeline execution. */ @@ -373,17 +361,16 @@ class ConfigPipelineExecution { if (config.empty()) { LOG(FATAL) << "The Configuration loading not finish yet."; } - module_input_output_binding_total_num = 0; + module_input_output_binding_total_num_ = 0; for (auto mod_output : config) { // Get the numbers of binding of input and output. - module_input_output_binding_total_num += mod_output.second.GetInputOutputBindingNum(); + module_input_output_binding_total_num_ += mod_output.second.GetInputOutputBindingNum(); // Use global output index as key to create a mapping of global index and module output. const std::vector& global_output = mod_output.second.GetGlobalConfigOutputBindings(); for (auto output : global_output) { - global_output_map[output.global_output_idx] = - ModuleOutputPair(mod_output.first, output.mod_output_idx); + global_output_map_[output.second] = ModuleOutputPair(mod_output.first, output.first); } } return; @@ -414,10 +401,11 @@ class ConfigPipelineExecution { ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; // Check if the output is successfully read. ICHECK(!output.Empty()) << "Invalid output binding result."; - Insert(mod_idx, output); + // Build the mapping of mod_idx and "ConfigOutputBindings". + config_[mod_idx] = output; } // Call this function after "config" loading finished. - ParseConfiguration(config); + ParseConfiguration(config_); } }; /* @@ -426,25 +414,29 @@ class ConfigPipelineExecution { class BackendRuntime { private: /*\brief The index of runtime indicate the position in the pipeline.*/ - int runtime_idx; + int runtime_idx_; /*\brief The Runtime module of a backedn graph executor.*/ - Module module; + Module module_; /*! *\brief To transfer data between two different backends, we need a local * tensor variable as a medium. This variable is a mapping of input data and local * data. */ - std::unordered_map input_tensor_local_copy; + std::unordered_map input_tensor_local_copy_; /*!\brief The packed functions.*/ - tvm::runtime::PackedFunc run; - tvm::runtime::PackedFunc set_input; - tvm::runtime::PackedFunc get_input; - tvm::runtime::PackedFunc get_output; - tvm::runtime::PackedFunc get_num_output; - tvm::runtime::PackedFunc get_num_inputs; - tvm::runtime::PackedFunc get_input_index; - /*!\brief The new DLTensor have same shape, data type with a existing DLTensor.*/ - DLTensor* CreateFromDLTensor(const DLTensor* from) { + tvm::runtime::PackedFunc run_; + tvm::runtime::PackedFunc set_input_; + tvm::runtime::PackedFunc get_input_; + tvm::runtime::PackedFunc get_output_; + tvm::runtime::PackedFunc get_num_output_; + tvm::runtime::PackedFunc get_num_inputs_; + tvm::runtime::PackedFunc get_input_index_; + /*! + * \brief The new DLTensor have same shape, data type with a existing DLTensor and + * the new DLTensor device type is CPU. This function is used to do cross device data + * copy. + */ + inline DLTensor* CopyDLTensorToCPU(const DLTensor* from) { DLTensor* ret = NULL; TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, kDLCPU, 0, &ret); @@ -466,14 +458,14 @@ class BackendRuntime { void CopyFromTo(DLTensor* from, DLTensor* to) { // If the source device and target device is not same, we use a local DLTensor // as a medium to do the cross device copy work. - if (!(from->device.device_type == to->device.device_type || - from->device.device_type == kDLCPU || to->device.device_type == kDLCPU)) { + if (from->device.device_type != to->device.device_type && from->device.device_type != kDLCPU && + to->device.device_type != kDLCPU) { DLTensor* dltensor_local = nullptr; - if (input_tensor_local_copy.find(to) == input_tensor_local_copy.end()) { - dltensor_local = CreateFromDLTensor(from); - input_tensor_local_copy[to] = dltensor_local; + if (input_tensor_local_copy_.find(to) == input_tensor_local_copy_.end()) { + dltensor_local = CopyDLTensorToCPU(from); + input_tensor_local_copy_[to] = dltensor_local; } else { - dltensor_local = input_tensor_local_copy[to]; + dltensor_local = input_tensor_local_copy_[to]; } TVMArrayCopyFromTo(from, dltensor_local, nullptr); from = dltensor_local; @@ -484,36 +476,36 @@ class BackendRuntime { public: BackendRuntime(Module mod, int mod_idx) { - module = mod; - runtime_idx = mod_idx; - get_input_index = module.GetFunction("get_input_index"); - get_num_output = module.GetFunction("get_num_outputs"); - get_num_inputs = module.GetFunction("get_num_inputs"); - set_input = module.GetFunction("set_input"); - get_input = module.GetFunction("get_input"); - get_output = module.GetFunction("get_output"); - run = module.GetFunction("run"); + module_ = mod; + runtime_idx_ = mod_idx; + get_input_index_ = module_.GetFunction("get_input_index"); + get_num_output_ = module_.GetFunction("get_num_outputs"); + get_num_inputs_ = module_.GetFunction("get_num_inputs"); + set_input_ = module_.GetFunction("set_input"); + get_input_ = module_.GetFunction("get_input"); + get_output_ = module_.GetFunction("get_output"); + run_ = module_.GetFunction("run"); } BackendRuntime(void) {} ~BackendRuntime() { - for (auto data : input_tensor_local_copy) { + for (auto data : input_tensor_local_copy_) { TVMArrayFree(data.second); } } /*!\brief Create a new NDArray which have same shape, data type with a module output. */ NDArray CreateFromOutput(int idx) { - NDArray data = get_output(idx); + NDArray data = get_output_(idx); return CreateNDArrayFromDLTensor(const_cast(data.operator->())); } /*!\brief Return the moudle index.*/ - int GetModuleIndex() { return runtime_idx; } + int GetModuleIndex() { return runtime_idx_; } /*!\brief Return the number of output*/ - int NumOutputs() const { return get_num_output(); } + int NumOutputs() const { return get_num_output_(); } /*!\brief Return the number of input*/ - int NumInputs() const { return get_num_inputs(); } + int NumInputs() const { return get_num_inputs_(); } /*!\brief Use input index to set data to the runtime module.*/ void SetInput(const int index, DLTensor* data_in) { - NDArray input = get_input(index); + NDArray input = get_input_(index); DLTensor* dltensor_input = const_cast(input.operator->()); CopyFromTo(data_in, dltensor_input); } @@ -523,13 +515,13 @@ class BackendRuntime { SetInput(index, data_in); } /*!\brief Use output index to get a module output.*/ - NDArray GetOutput(int index) { return get_output(index); } + NDArray GetOutput(int index) { return get_output_(index); } /*!\brief Run the runtime.*/ - void Run() { run(); } + void Run() { run_(); } /*!\brief Use the index to a module input.*/ - NDArray GetInput(int index) const { return get_input(index); } + NDArray GetInput(int index) const { return get_input_(index); } /*!\bief Use a input name to get the corresponding index of input.*/ - int GetInputIndex(const std::string& name) { return get_input_index(name); } + int GetInputIndex(const std::string& name) { return get_input_index_(name); } }; /*! * \brief The information used to initialize the graph executor module, the information diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 4ed7094ac45e..6add54d16415 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -72,8 +72,8 @@ def get_mannual_mod(): def recreate_parameters(mod): - # Get the binding parameters from a module, then create a same parameters which have different - # data value. This function can be used to test "set_param" function. + # Get the binding parameters from a module, then create the same parameters with different data. + # This function can be used to test "set_param" function. with tvm.transform.PassContext(opt_level=3): lib = relay.build(mod, "llvm") @@ -281,6 +281,7 @@ def test_pipeline(): # The mod3 output[0] will be connected to pipeline output[1]. pipe_config[mod3]["output"][0].connect(pipe_config["output"]["1"]) + print(pipe_config) # Print configueration (print(pipe_config)), the result looks like following. # # Inputs From 5bfe5af460eaf4f43ec3b2dfa8e7d6a813ed4c8f Mon Sep 17 00:00:00 2001 From: huajsj Date: Thu, 11 Nov 2021 23:25:43 -0800 Subject: [PATCH 3/5] Remove set_input and set_param array CPU target binding. --- python/tvm/contrib/pipeline_executor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 7c2dc57880b7..8c8fc414117f 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -134,7 +134,10 @@ def set_input(self, key, value): value : array_like. The input value """ - self._set_input(key, tvm.nd.array(value, tvm.cpu())) + v = self._get_input(key) + if v is None: + raise RuntimeError("Could not find '%s' in pipeline's inputs" % key) + v.copyfrom(value) def set_params(self, params_name, params_data): """Set params to the module via param name and params data. @@ -148,7 +151,7 @@ def set_params(self, params_name, params_data): """ keys = list(params_data.keys()) for k in keys: - self._set_param(params_name, k, tvm.nd.array(params_data[k], tvm.cpu())) + self._set_param(params_name, k, params_data[k]) def get_input(self, key): """Get the input via a input name. From 585060c4188f7375a7dc97a4708acc159b6ccc15 Mon Sep 17 00:00:00 2001 From: huajsj Date: Fri, 12 Nov 2021 16:55:10 -0800 Subject: [PATCH 4/5] address review comments. --- python/tvm/contrib/pipeline_executor.py | 16 ++++++++++------ src/runtime/pipeline/pipeline_struct.h | 4 ++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 8c8fc414117f..df2778d5f399 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -149,9 +149,8 @@ def set_params(self, params_name, params_data): params_data : dict of str to NDArray A list of params data and params key name. """ - keys = list(params_data.keys()) - for k in keys: - self._set_param(params_name, k, params_data[k]) + for key, val in params_data.items(): + self._set_param(params_name, key, val) def get_input(self, key): """Get the input via a input name. @@ -298,7 +297,7 @@ def __repr__(self): return str_format - def check_dict(self, connection_dict): + def check_binding_dict(self, connection_dict): """Check the dict form of this binding. Parameter --------- @@ -318,14 +317,19 @@ def check_dict(self, connection_dict): raise RuntimeError(f'"mod_idx" is missing!') def get_binding_dict(self): - # Return the binding information in the form of dict. + """Return the binding information in the form of dict. + Returns + ------- + data : Dict[str, Any] + The binding information in the form of dict. + """ dict_format = {"interface_name": self.name, "connection": []} for binding in self.bindings: _, dname = binding.get_name() midx = binding.get_owner_idx() dict_format["connection"].append({"mod_idx": midx, "interface_name": dname}) - self.check_dict(dict_format) + self.check_binding_dict(dict_format) return dict_format def check_dag_acyclic(self, start, inputs): diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 9cc5b04d1a84..b6fc3b5f0b00 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -456,8 +456,8 @@ class BackendRuntime { *\brief Copy data from a DLTensor to another DLTensor. */ void CopyFromTo(DLTensor* from, DLTensor* to) { - // If the source device and target device is not same, we use a local DLTensor - // as a medium to do the cross device copy work. + // If the source device and target device are not the same, we use a temporary DLTensor + // on CPU as the bridge. if (from->device.device_type != to->device.device_type && from->device.device_type != kDLCPU && to->device.device_type != kDLCPU) { DLTensor* dltensor_local = nullptr; From 37288fe275de9bc616be820452f9797bd8897990 Mon Sep 17 00:00:00 2001 From: huajsj Date: Thu, 2 Dec 2021 22:57:55 -0800 Subject: [PATCH 5/5] adress review comments. --- python/tvm/contrib/pipeline_executor.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index df2778d5f399..fdd5142718e6 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -125,7 +125,9 @@ def stop(self): self._stop() def set_input(self, key, value): - """Set inputs to the module via "value". + """Set the value of "value" to the global input named "value". A global input is + defined during the pipeline configurration, it is connected with a graph module input. + Parameters ---------- key : str @@ -140,7 +142,10 @@ def set_input(self, key, value): v.copyfrom(value) def set_params(self, params_name, params_data): - """Set params to the module via param name and params data. + """Set the value of "params_data" to the global params named "params_name", the global + params name is defined during the pipeline configueration creation, it is connected with + the params of a graph module which is a dictionary constructed from key and value. + Parameters ---------- params_name : str @@ -153,7 +158,7 @@ def set_params(self, params_name, params_data): self._set_param(params_name, key, val) def get_input(self, key): - """Get the input via a input name. + """Get the input via an input name. Parameters ---------- key : str @@ -162,7 +167,7 @@ def get_input(self, key): Returns ------- data : NDArray - Then input data. + The input data. """ self._get_input(key)