diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 37b9fed8eb91..fdd5142718e6 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -49,12 +49,19 @@ def build(pipe_configs): Common interface for pipeline executor factory modules. """ libs = {} - mod_n_configs = pipe_configs.get_config() + config = pipe_configs.get_config() + if "module_connection" not in config: + raise RuntimeError('"module_connection" is missing') + if "input_connection" not in config: + raise RuntimeError('"input_connection" is missing') + + mod_n_configs = config["module_connection"] config_len = len(mod_n_configs) - string_config = [{} for _ in range(config_len)] + module_string_config = [{} for _ in range(config_len)] + # Build the backend modules then create the config of the connections in string form. for ir_mod, mod_config in mod_n_configs.items(): - mconf = mod_config["pipeline"].copy() - mod_idx = mconf["mod_idx"] + pipe_config = mod_config["pipeline"].copy() + mod_idx = pipe_config["mod_idx"] dev = mod_config["dev"] target = mod_config["target"] build_func = relay.build @@ -70,11 +77,18 @@ def build(pipe_configs): mod_name=mod_config["mod_name"], ) - mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id) + pipe_config["dev"] = "{},{}".format(dev.device_type, dev.device_id) # Create a pipeline configuration. - string_config[mod_idx] = mconf + module_string_config[mod_idx] = pipe_config libs[mod_idx] = {"lib": lib, "dev": dev} + # Merge the "input_connection", the "param_connection" and the "module_connection" into one + # configuration. + string_config = {} + string_config["input_connection"] = config["input_connection"] + string_config["param_connection"] = config["param_connection"] + string_config["module_connection"] = module_string_config + return PipelineExecutorFactoryModule(libs, string_config) @@ -93,8 +107,80 @@ def __init__(self, module): else: self.module = module # Get the packed functions from the pipeline executor. + self._run = self.module["run"] + self._stop = self.module["stop"] + self._set_input = self.module["set_input"] + self._set_param = self.module["set_param"] + self._get_input = self.module["get_input"] + self._get_output = self.module["get_output"] + self._get_num_inputs = self.module["get_num_inputs"] self._get_num_outputs = self.module["get_num_outputs"] + def run(self, sync=False): + """Run the pipeline executor.""" + self._run(sync) + + def stop(self): + """Stop the pipeline executor.""" + self._stop() + + def set_input(self, key, value): + """Set the value of "value" to the global input named "value". A global input is + defined during the pipeline configurration, it is connected with a graph module input. + + Parameters + ---------- + key : str + The input key + + value : array_like. + The input value + """ + v = self._get_input(key) + if v is None: + raise RuntimeError("Could not find '%s' in pipeline's inputs" % key) + v.copyfrom(value) + + def set_params(self, params_name, params_data): + """Set the value of "params_data" to the global params named "params_name", the global + params name is defined during the pipeline configueration creation, it is connected with + the params of a graph module which is a dictionary constructed from key and value. + + Parameters + ---------- + params_name : str + The params name + + params_data : dict of str to NDArray + A list of params data and params key name. + """ + for key, val in params_data.items(): + self._set_param(params_name, key, val) + + def get_input(self, key): + """Get the input via an input name. + Parameters + ---------- + key : str + The input key + + Returns + ------- + data : NDArray + The input data. + """ + self._get_input(key) + + def get_output(self): + """Get the output. + + Returns: + ----------- + data : Array[NDArray] + A list of output data. + """ + return self._get_output() + @property def num_outputs(self): """Get the number of outputs. @@ -105,6 +191,16 @@ def num_outputs(self): """ return self._get_num_outputs() + @property + def num_inputs(self): + """Get the number of inputs. + Returns + ------- + count : int + The number of inputs. + """ + return self._get_num_inputs() + @staticmethod def load_library(config_file_name): """Import files to create a pipeline executor. @@ -154,7 +250,7 @@ class Binding: The class who owns this interface. io_type : str - The I/O type of this interface. It can only be "input" or "output". + The I/O type of this interface. It can only be "input" or "output" or "param". name : str/integer Name, for input it is string such as "data0", for output it is an integer such as 0. @@ -171,7 +267,6 @@ def __init__(self, owner, io_type, name, data_type=None): self.bindings = [] # Parents interfaces that this interface depend on. self.parents = [] - self.data_type = data_type def get_name(self): @@ -199,12 +294,48 @@ def is_pipeline_executor_interface(self): return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper) def __repr__(self): - # Get all binding information. - ret = " |{}: ".format(self.name) + # Get the binding information in the form of string. + str_format = " |{}: ".format(self.name) for binding in self.bindings: mname, dname = binding.get_name() - ret += "{0}:{1} ".format(mname, dname) - return ret + str_format += "{0}:{1} ".format(mname, dname) + + return str_format + + def check_binding_dict(self, connection_dict): + """Check the dict form of this binding. + Parameter + --------- + connection_dict : Dict[str, Any] + The dict of input or parameters connection. + """ + if "interface_name" not in connection_dict: + raise RuntimeError(f'"inteface_name" is missing in global config!"') + if "connection" not in connection_dict: + raise RuntimeError(f'"connection" is missing!"') + # The global interface mapping should be one-to-one. + if not connection_dict["connection"]: + raise RuntimeError(f"The global interface map is empty!") + if len(connection_dict["connection"]) > 1: + raise RuntimeError(f"A global interface maps multiple module interfaces!") + if "mod_idx" not in connection_dict["connection"][0]: + raise RuntimeError(f'"mod_idx" is missing!') + + def get_binding_dict(self): + """Return the binding information in the form of dict. + Returns + ------- + data : Dict[str, Any] + The binding information in the form of dict. + """ + dict_format = {"interface_name": self.name, "connection": []} + for binding in self.bindings: + _, dname = binding.get_name() + midx = binding.get_owner_idx() + dict_format["connection"].append({"mod_idx": midx, "interface_name": dname}) + + self.check_binding_dict(dict_format) + return dict_format def check_dag_acyclic(self, start, inputs): """This is to check whether the DAG containing these input interfaces is acyclic. @@ -245,6 +376,15 @@ def connect(self, binding): if self.io_owner == binding.io_owner: raise RuntimeError(f"Can not bind itself.") + if self.io_type == "param" and not self.is_pipeline_executor_interface(): + raise RuntimeError(f'Only a pipeline executor can do "param" binding!') + + if self.io_type == "param" and binding.io_type != "param": + raise RuntimeError( + f"A global param interface can only bind with a module \ + param interface!" + ) + if not self.is_pipeline_executor_interface() and self.io_type == "input": raise RuntimeError(f"Module can only bind from output interface!") @@ -265,7 +405,11 @@ def connect(self, binding): if self.is_pipeline_executor_interface() and self.io_type == "output": raise RuntimeError(f"Global output can not be used as binding start point.") - if self.is_pipeline_executor_interface() and binding.io_type != "input": + if ( + self.is_pipeline_executor_interface() + and self.io_type == "input" + and binding.io_type != "input" + ): raise RuntimeError(f"Global input can only bind with module input.") self.bindings.append(binding) @@ -342,6 +486,7 @@ def __init__(self, mod=None): self.output_type = InferType()(mod)["main"].checked_type.ret_type self.input_bindings = PipelineConfig.BindingList(self, "input") self.output_bindings = PipelineConfig.BindingList(self, "output") + self.param_binding = PipelineConfig.Binding(self, "param", "param") def __eq__(self, other): if isinstance(other, PipelineConfig.ModuleWrapper): @@ -353,11 +498,13 @@ def __getitem__(self, key): if isinstance(key, str): if key == "input": return self.input_bindings - if key == "output": return self.output_bindings + if key == "param": + return self.param_binding + raise RuntimeError(f"{key} not found!") - raise RuntimeError(f"{key} not found!") + raise RuntimeError(f'The data type of "key" is not supported!') def get_data_type(self, key, interface_type): """Get the module interface data type according to the key value and interface type. @@ -411,14 +558,22 @@ def __init__(self): self.mod_wrapper = {} self.input_bindings = self.BindingList(self, "input") self.output_bindings = self.BindingList(self, "output") + # The mapping of global parameters and module parameters. + self.param_bindings = self.BindingList(self, "param") def __str__(self): # Get configuration information as a string. # Use topological sort to get correct module order. self.dag_topology_sort() + + # Get param dependencies. + param_dump = "Params\n" + for param_name in self.param_bindings.bindings: + inf = self.param_bindings.bindings[param_name] + param_dump += str(inf) + "\n" # Get the input dependencies. - input_dump = "Inputs\n" + input_dump = "\nInputs\n" for input_name in self.input_bindings.bindings: inf = self.input_bindings.bindings[input_name] input_dump += str(inf) + "\n" @@ -444,7 +599,7 @@ def __str__(self): for name in sorted(output.keys()): output_dump += f" |output({name}) : {output[name]}\n" - return input_dump + output_dump + connections_dump + return param_dump + input_dump + output_dump + connections_dump def __getitem__(self, key): if isinstance(key, tvm.ir.module.IRModule): @@ -457,6 +612,8 @@ def __getitem__(self, key): return self.input_bindings if key == "output": return self.output_bindings + if key == "param": + return self.param_bindings raise RuntimeError(f"{key} not found.") @@ -468,6 +625,8 @@ def get_config(self): # Use topological sort to get the correct order of modules. self.dag_topology_sort() mconfig = {} + module_connection = {} + input_connection = {} for mod in self.mod_wrapper: # Generate pipeline configuration. mconf = {} @@ -495,7 +654,7 @@ def get_config(self): mconf["mod_idx"] = module.idx mconf["output"] = output_conf - mconfig[mod] = { + module_connection[mod] = { "pipeline": mconf, "target_host": module.target_host, "mod_name": "default", @@ -505,6 +664,33 @@ def get_config(self): "dev": module.dev, } + # Create a mapping of global input and module input. + input_connection = [] + for input_name in self.input_bindings.bindings: + input_dict = self.input_bindings.bindings[input_name].get_binding_dict() + if "interface_name" not in input_dict["connection"][0]: + raise RuntimeError(f"interface_name is missing in connection config!") + # Establish the mapping of global interface and the mapping of module interfaces. + input_map = { + "global_interface_name": input_dict["interface_name"], + "mod_idx": input_dict["connection"][0]["mod_idx"], + "module_interface_name": input_dict["connection"][0]["interface_name"], + } + input_connection.append(input_map) + + # Create a mapping of global param and module param. + param_connection = [] + for param_name in self.param_bindings.bindings: + param_dict = self.param_bindings.bindings[param_name].get_binding_dict() + param_map = { + "global_param_name": param_dict["interface_name"], + "mod_idx": param_dict["connection"][0]["mod_idx"], + } + param_connection.append(param_map) + + mconfig["module_connection"] = module_connection + mconfig["input_connection"] = input_connection + mconfig["param_connection"] = param_connection return mconfig def dag_topology_sort(self): @@ -627,13 +813,13 @@ def export_library(self, directory_path): ) # Get the graph, lib, and parameters from GraphExecutorFactoryModule. - graph, lib, params = self.pipeline_mods[lib_index]["lib"] + lib = self.pipeline_mods[lib_index]["lib"] # Export the lib, graph, and parameters to disk. lib.export_library(mconfig["lib_name"]) with open(mconfig["json_name"], "w") as file_handle: - file_handle.write(graph) + file_handle.write(lib.graph_json) with open(mconfig["params_name"], "wb") as file_handle: - file_handle.write(relay.save_param_dict(params)) + file_handle.write(relay.save_param_dict(lib.params)) load_config.append(mconfig) diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 3820ce942af0..bc7c272fbb0f 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -21,6 +21,8 @@ * \file pipeline_executor.cc */ #include "pipeline_executor.h" + +#include namespace tvm { namespace runtime { /*! @@ -34,13 +36,142 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, if (name == "get_num_outputs") { return PackedFunc( [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumOutputs(); }); + } else if (name == "get_num_inputs") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumInputs(); }); + } else if (name == "set_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0])) { + this->SetInput(args[0].operator String(), args[1]); + } else { + LOG(FATAL) << "Function only support the input name value in the form of string"; + } + }); + } else if (name == "set_param") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0]) && String::CanConvertFrom(args[1])) { + this->SetParam(args[0].operator String(), args[1].operator String(), args[2]); + } else { + LOG(FATAL) << "Function only support the params name and keyin the form of string"; + } + }); + } else if (name == "get_output") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); }); + } else if (name == "get_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0])) { + *rv = this->GetInput(args[0].operator String()); + } else { + LOG(FATAL) << "Function only support the input name value in the form of string"; + } + }); + } else if (name == "run") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); + } else if (name == "stop") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); }); } else { LOG(FATAL) << "Unknown packed function: " << name; return PackedFunc(); } return nullptr; } +/*! + * \brief Pipeline global inputs are the data inputs that have to be set by users with + * "set_input". This function returns the number of pipeline global inputs. + * \return Return the number of pipeline global inputs. + */ + +int PipelineExecutor::NumInputs() const { + // The number of inputs obtained from the input configuration. + size_t config_inputs_num = input_connection_config.size(), ret = 0; + // The number of inputs obtained from the graph runtime and pipeline configuration. + size_t internal_inputs_num = pipeline_config_.GetInputOutputBindingNum(); + for (auto runtime : runtimes_) { + ret += runtime->NumInputs(); + } + // Remove module inputs that are only used internally. + ret -= internal_inputs_num; + // Check whether these two numbers are equal. + if (config_inputs_num != ret) { + LOG(FATAL) << "Incorrect input number in configuration: Expected " << ret << " but got " + << config_inputs_num; + } + return ret; +} +/*! + * \brief Return the input index and module index for a given input name. + * \param name The input name. + * \return std::pair A pair of module index and the input index. + */ +std::pair PipelineExecutor::GetInputIndex(const std::string& name) { + std::pair index = input_connection_config[name]; + auto gruntime = runtimes_[index.first]; + return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); +} +/*! + * \brief Return the module index for a given input param name. + * \param name The params name. + * \return int The module index. + */ +int PipelineExecutor::GetParamModuleIndex(const std::string& name) { + return param_connection_config[name]; +} +/*! + * \brief set input to the graph module. + * \param input_name The input name. + * \param data_in The input data. + */ +void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { + std::pair indexs = this->GetInputIndex(input_name); + if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { + this->Stop(); + LOG(FATAL) << "input name " << input_name << " not found."; + } + runtimes_[indexs.first]->SetInput(indexs.second, data_in); +} +/*! + * \brief get input from the graph module. + * \param input_name The input name. + * \return Return the input data for a specific input name. + */ +NDArray PipelineExecutor::GetInput(std::string input_name) { + std::pair indexs = this->GetInputIndex(input_name); + if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { + this->Stop(); + LOG(FATAL) << "input name " << input_name << " not found."; + } + return runtimes_[indexs.first]->GetInput(indexs.second); +} +/*! + * \brief set param to a graph module. + * \param input_name The input name. + * \param data_in The input data. + */ +void PipelineExecutor::SetParam(std::string param_name, std::string param_key_name, + DLTensor* data_in) { + // Get the module index from the param name. + auto runtime = runtimes_[this->GetParamModuleIndex(param_name)]; + // Get the param index from the param key name + int index = runtime->GetInputIndex(param_key_name); + runtime->SetInput(index, data_in); +} +/*! + * \brief Run the pipeline executor. + * \param serialized_mode Whether run the pipeline executor in serialized mode. + */ +void PipelineExecutor::Run(bool serialized_mode) { + pipeline_scheduler_.PipelineRun(runtimes_, pipeline_config_, serialized_mode); +} +/*! + * \brief Stop the pipeline executor. + */ +void PipelineExecutor::Stop() { pipeline_scheduler_.PipelineStop(); } +/*! + * \brief return A list of pipeline global output data. + */ +Array PipelineExecutor::GetOutput(void) { return pipeline_scheduler_.PipelineGetOutput(); } /*! * \brief Use the mod_config information to create a graph runtime list. * \param mod_config The config information that generates by the export library function call. @@ -96,7 +227,6 @@ std::vector PipelineExecutor::CreateGraphModules(const ModuleConfig& mod } return ret; } - /*! * \brief Initialize the pipeline executor with a list of modules to be pipelined * and config in JSON format. @@ -108,11 +238,12 @@ void PipelineExecutor::Init(const std::vector& modules, const std::strin // Use JSONReader to load pipeline configuration. std::istringstream is(pipeline_json); dmlc::JSONReader reader(&is); - PipelineConfig& pipeline_config = this->LoadPipelineConfig(&reader); + ConfigPipelineExecution& pipeline_config = this->LoadConfigPipelineExecution(&reader); ICHECK(!pipeline_config.Empty()) << "The pipeline config information is empty."; + num_outputs_ = pipeline_config.GetGlobalOutputNum(); // Initialize the pipeline function class used for pipeline thread pool management - // and schedule etc. This function returns the number of output. - num_outputs_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config); + // and schedule etc. This function returns a list of backend runtime. + runtimes_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config); return; } diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index a883ba25ec08..abc977070059 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -28,8 +28,10 @@ #include #include +#include #include #include +#include #include #include "pipeline_scheduler.h" @@ -70,11 +72,56 @@ class TVM_DLL PipelineExecutor : public ModuleNode { /*! * \brief Get the number of outputs. - * * \return The number of outputs. */ int NumOutputs() const { return num_outputs_; } - + /*!\brief Return the number of inputs*/ + int NumInputs() const; + /*! + * \brief Use the input name to set the input data of pipeline executor. + * \param input_name The input name. + * \param data_in The input data. + */ + void SetInput(std::string input_name, DLTensor* data_in); + /*! + * \brief Use the input name to get the input data. + * \param input name The input name. + * \return Return input data. + */ + NDArray GetInput(std::string input_name); + /*! + * \brief Use the param name to get the specific backend runtime then use the param_key_name + * to set param data for the said backend runtime. + */ + void SetParam(std::string param_name, std::string param_key_name, DLTensor* data_in); + /*! + * \brief Run the pipeline executor. + * \param serialized_mode Whether run the pipeline executor in serialized mode. + */ + void Run(bool serialized_mode); + /*! + * \brief Stop the pipeline executor. + */ + void Stop(); + /*! + * \brief Get a list output of pipeline. For one input data the pipeline may generate multiple + * outputs, this function will return all of these outputs in a list. + * \return A list of output data. + */ + Array GetOutput(); + /*! + * \brief A pipeline input with a specific name correspond with a input of a specific + * backend module, this function return a module index and a input index in "pair" + * form for a input name. + * return Return a module index and a input index. + */ + std::pair GetInputIndex(const std::string& name); + /*! + * \brief A pipeline params with a specific name correspond with the params of a specific + * backend module, this function return the module index for the params name. + * return Return backend runtime module index. + */ + int GetParamModuleIndex(const std::string& name); /*!\brief Load the module files information.*/ ModuleConfig& LoadModuleConfig(dmlc::JSONReader* reader) { reader->BeginArray(); @@ -112,41 +159,40 @@ class TVM_DLL PipelineExecutor : public ModuleNode { } private: + /*!\brief Json loader.*/ + ConfigPipelineExecution& LoadConfigPipelineExecution(dmlc::JSONReader* reader) { + reader->BeginObject(); + std::string key; + while (reader->NextObjectItem(&key)) { + if (key == "module_connection") { + reader->Read(&pipeline_config_); + } else if (key == "input_connection") { + reader->Read(&input_connection_config); + } else if (key == "param_connection") { + reader->Read(¶m_connection_config); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + return pipeline_config_; + } + /*!\brief The class used to execute and schedule the pipeline logic.*/ PipelineScheduler pipeline_scheduler_; /*!\brief The dependency information of each graph runtime module of the pipeline.*/ - PipelineConfig pipeline_config_; + ConfigPipelineExecution pipeline_config_; + /*!\brief The mapping of global input and backend runtime module input.*/ + InputConnectionConfig input_connection_config; + /*!\brief The mapping of global params and backend runtime module params.*/ + ParamConnectionConfig param_connection_config; /*!\brief The module information used to create the graph runtimes.*/ ModuleConfig mod_config_; + /*!The list of backend runtime module.*/ + std::vector> runtimes_; /*!\brief How many outputs are in this pipeline executor.*/ size_t num_outputs_ = 0; - /*!\brief Json loader.*/ - PipelineConfig& LoadPipelineConfig(dmlc::JSONReader* reader) { - reader->BeginArray(); - while (reader->NextArrayItem()) { - std::string key; - reader->BeginObject(); - int mod_idx = -1; - OutputMap output; - std::string dev; - while (reader->NextObjectItem(&key)) { - if (key == "mod_idx") { - reader->Read(&mod_idx); - } else if (key == "dev") { - reader->Read(&dev); - } else if (key == "output") { - reader->Read(&output); - } else { - LOG(FATAL) << "do not support key " << key; - } - } - ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; - // Check if the output is successfully read. - ICHECK(!output.Empty()) << "Invalid output binding result."; - pipeline_config_.Insert(mod_idx, output); - } - return pipeline_config_; - } + /*!\brief The list of pipeline output*/ + Array output_array; }; } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 82caf855a479..f5a7fb9557db 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -18,6 +18,7 @@ */ #include "pipeline_scheduler.h" +#include #include #include namespace tvm { @@ -26,12 +27,96 @@ namespace runtime { * \brief Initialize the pipeline. * \param modules The list of graph executor modules. * \param pipeline_conf The dependency information of each graph executor module. + * \return Return a list of backend runtime module. */ -size_t PipelineScheduler::PipelineInit(const std::vector& modules, - const PipelineConfig& pipeline_config) { +std::vector> PipelineScheduler::PipelineInit( + const std::vector& modules, ConfigPipelineExecution pipeline_config) { + std::vector> runtimes; graph_modules_ = modules; - int num_output = pipeline_config.GetGlobalOutputNum(); - return num_output; + for (size_t i = 0; i < graph_modules_.size(); i++) { + auto runItem = std::make_shared(graph_modules_[i], i); + runtimes.push_back(runItem); + } + // Initialize the outputs array. + auto& global_output_map = pipeline_config.GetGlobalConfigOutputBindings(); + for (size_t i = 0; i < global_output_map.size(); i++) { + if (global_output_map.find(i) == global_output_map.end()) { + LOG(FATAL) << "Not find global output index " << i; + } + ModuleOutputPair& output_pair = global_output_map[i]; + NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second); + output_array.push_back(output); + } + return runtimes; } + +/*! + * \brief Exeute in the sequential mode. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + */ +void PipelineScheduler::PipelineRunSequential( + const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config) { + for (size_t i = 0; i < runtimes.size(); i++) { + // The offset in vector is the runtime execution order, this offset value should + // be same with the the value of "runtime_idx" in runtime. + if (static_cast(i) != runtimes[i]->GetModuleIndex()) { + LOG(FATAL) << "runtime index " << runtimes[i]->GetModuleIndex() + << " is not same as vector offset value " << i; + } + + if (!pipeline_config.FindModuleInConfig(i)) { + LOG(FATAL) << "Not find the configuration for the module " << i; + } + + runtimes[i]->Run(); + // Check if there is any output need to be forward to other graph module or to be as + // global output. + int outputs_num = runtimes[i]->NumOutputs(); + for (int j = 0; j < outputs_num; j++) { + ConfigBindings& out_binding = pipeline_config[i][j]; + std::unordered_map& input_connections = out_binding.Get(); + NDArray output = runtimes[i]->GetOutput(j); + for (auto bind : input_connections) { + // If the value of "bind.first" less then 0 then this is not a graph module binding. + if (bind.first < 0) continue; + // Set input data for the graph module. + runtimes[bind.first]->SetInput(bind.second, const_cast(output.operator->())); + } + // Store the output. + if (out_binding.IsGlobalOutput()) { + int global_idx = out_binding.GetGlobalOutputIndex(); + TVMArrayCopyFromTo(const_cast(output.operator->()), + const_cast(output_array[global_idx].operator->()), nullptr); + } + } + } +} +/*! + * \brief Execute pipeline. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + * \param sequential_mode If the execution is in sequential mode. + */ +void PipelineScheduler::PipelineRun(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config, bool sequential_mode) { + if (!sequential_mode) { + // TODO(huajsj) remove this check after all of pipeline features in. + LOG(FATAL) << "Currently Only supports sequential mode."; + } else { + PipelineRunSequential(runtimes, pipeline_config); + } +} +/*! + * \brief Stop the pipeline exection. + */ +void PipelineScheduler::PipelineStop() { + // TODO(huajsj) Add stop logic. +} +/*! + * \brief Get a list of outputs of the pipeline execution. + */ +Array PipelineScheduler::PipelineGetOutput() { return output_array; } } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h index 5ee127edffa3..4e097c0206b5 100644 --- a/src/runtime/pipeline/pipeline_scheduler.h +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -28,6 +28,7 @@ #include #include "pipeline_struct.h" + namespace tvm { namespace runtime { /*! @@ -40,13 +41,40 @@ class PipelineScheduler { * \brief Initialize the pipeline. * \param modules The list of graph executor module. * \param pipeline_config The dependency information of each graph executor module. + * \return Return a list of backend runtime module. + */ + std::vector> PipelineInit( + const std::vector& modules, ConfigPipelineExecution pipeline_config); + /*! + * \brief Execute pipeline. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + * \param sequential_mode If the execution is in Sequential mode. + */ + void PipelineRun(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config, bool sequential_mode = false); + /*! + * \brief Exeute in the sequential mode. + * \param runtimes A list of backend runtimes module. + * \param pipeline_config The dependency information of each graph executor module. + */ + void PipelineRunSequential(const std::vector>& runtimes, + ConfigPipelineExecution pipeline_config); + /*! + * \brief Stop the pipeline exection. + */ + void PipelineStop(); + /*! + * \brief Get a list of outputs of the pipeline execution. */ - size_t PipelineInit(const std::vector& modules, const PipelineConfig& pipeline_config); + Array PipelineGetOutput(); private: /*!\brief The list of graph executors.*/ - std::vector graph_modules_; + std::vector graph_modules_; + /*!\brief A list of NDArray used to record the outputs.*/ + Array output_array; }; -} // namespace runtime -} // namespace tvm +}; // namespace runtime +}; // namespace tvm #endif // TVM_RUNTIME_PIPELINE_PIPELINE_SCHEDULER_H_ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 3cc9621702c1..b6fc3b5f0b00 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -21,24 +21,65 @@ #include #include #include +#include +#include #include #include #include +#include #include +namespace tvm { +namespace runtime { +#define GLOBAL_MODULE_INDEX -1 +/*! + *\brief The mapping of a module output and a global output in the graph module. + * The first int is a module output index, the second int is a global output index. + */ +using GlobalOutputPair = std::pair; +/*! + *\brief Use the module index and the output index to specify a module output. + * The first int is a module index, the second int is a module output index. + */ +using ModuleOutputPair = std::pair; /*! * \brief All binding information of a output interface. */ -struct OutputBindings { +class ConfigBindings { + private: /*!\brief Output interface binding information, 'int' is the index of the module that * uses this output data as the input interface data, 'string' is the input interface name * of the module. */ - std::unordered_map bindings; + std::unordered_map bindings_; /*! The index value of the global interface to which the current output are bound.*/ - int global_output_index = std::numeric_limits::min(); + int global_output_index_ = std::numeric_limits::min(); + + public: + /*! + *\brief Return the memeber variable "bindings_". + */ + std::unordered_map& Get() { return bindings_; } + /*!\brief Get the value of global outpu index.*/ + int GetGlobalOutputIndex() const { return global_output_index_; } /*!\brief Whether this binding is bound to the PipelineExecutor output interface.*/ - bool IsGlobalOutput() const { return global_output_index >= 0; } + bool IsGlobalOutput() const { return global_output_index_ > GLOBAL_MODULE_INDEX; } + + /*! + *\brief The number of bindings of input and output. one input only can bind with one + * specific output, hence this number also is the number that how many module input data + * source is internal moudle output. + * return The number of binding in this module. + */ + size_t GetInputOutputBindingNum(void) { + size_t ret = 0; + for (auto connection : bindings_) { + // Filter out the global output. + if (connection.first == GLOBAL_MODULE_INDEX) continue; + ret++; + } + return ret; + } /*! * \brief Create a module interface map from JSONReader. * \param reader JSON reader. @@ -59,8 +100,8 @@ struct OutputBindings { reader->Read(&input_name); } else if (key == "global_output_index") { // There should be only one global binding. - ICHECK(global_output_index < 0); - reader->Read(&global_output_index); + ICHECK(global_output_index_ < 0); + reader->Read(&global_output_index_); // When the key value is 'global_output_index', it means that this output is bound to // a global interface. global_binding = true; @@ -71,44 +112,82 @@ struct OutputBindings { // When this output is bound to a global interface, check if the global interface index // start from 0. if (global_binding) { - ICHECK(global_output_index >= 0); + ICHECK(global_output_index_ >= 0); } else { // When this output is bound to a graph executor module interface, check if the module // index start from 0. ICHECK(mod_idx >= 0); - bindings[mod_idx] = input_name; + bindings_[mod_idx] = input_name; } } } }; - /*! * \brief The binding information of all outputs of a module. */ -struct OutputMap { - /*! \brief Output binding map, 'int' is output interface index.*/ - std::unordered_map output_binding_map; - OutputMap& operator=(const OutputMap& output) { - output_binding_map = output.output_binding_map; +class ConfigOutputBindings { + private: + /*!\brief Output binding map, 'int' is output interface index.*/ + std::unordered_map output_binding_map_; + + public: + ConfigOutputBindings& operator=(const ConfigOutputBindings& output) { + output_binding_map_ = output.GetOutBindings(); return *this; } - /*!\brief This function is used to verify whether OutputMap is successfully loaded. - * \return Return true to indicate that this class has not been successfully loaded. + ConfigBindings& operator[](const int key) { + ICHECK(output_binding_map_.find(key) != output_binding_map_.end()); + return output_binding_map_[key]; + } + /*!brief Return the variable "output_binding_map_".*/ + std::unordered_map GetOutBindings() const { return output_binding_map_; } + /*! + *\brief Check if there is a output with the specify index in this map. */ - bool Empty() { return output_binding_map.empty(); } - /*! \brief The pipeline outputs is the final outputs of pipeline, this function is used to - * get how many pipeline outputs are in this Outputmap - * \return Number of pipeline outputs. + bool FindOutputInMap(int output_idx) { + return output_binding_map_.find(output_idx) != output_binding_map_.end(); + } + /*! + *\brief This function is used to verify whether ConfigOutputBindings is successfully loaded. + *\return Return true to indicate that this class has not been successfully loaded. + */ + bool Empty() { return output_binding_map_.empty(); } + /*! + * \brief The pipeline outputs is the final outputs of pipeline, this function is used to + * get how many pipeline outputs are in this Outputmap + * \return Number of pipeline outputs. */ size_t GetGlobalOutputNum(void) const { size_t num_output = 0; - for (auto bindings : output_binding_map) { + for (auto bindings : output_binding_map_) { num_output += bindings.second.IsGlobalOutput() ? 1 : 0; } return num_output; } - + /*! + *\brief Get the mapping of all global outputs and module outputs in this module. + *\return A list of "GlobalOutputPair". + */ + std::vector GetGlobalConfigOutputBindings(void) { + std::vector ret; + for (auto bindings : output_binding_map_) { + if (bindings.second.IsGlobalOutput()) { + ret.push_back(GlobalOutputPair(bindings.first, bindings.second.GetGlobalOutputIndex())); + } + } + return ret; + } + /*! + *\brief How many inputs are binding with a backend module output in this module. + */ + size_t GetInputOutputBindingNum() { + size_t ret = 0; + for (auto x : output_binding_map_) { + ret += x.second.GetInputOutputBindingNum(); + } + return ret; + } /*! * \brief Create a output binding map from JSONReader. * \param reader Json reader. @@ -119,7 +198,7 @@ struct OutputMap { std::string key; reader->BeginObject(); int output_idx = -1; - OutputBindings binding; + ConfigBindings binding; while (reader->NextObjectItem(&key)) { if (key == "output_idx") { reader->Read(&output_idx); @@ -130,41 +209,319 @@ struct OutputMap { } } ICHECK(output_idx >= 0); - output_binding_map[output_idx] = binding; + output_binding_map_[output_idx] = binding; } } }; + /*! - * \brief The binding or dependency information of each module output interface. + * \brief A map of the global module input interfaces and the graph modudles input interfaces. */ -struct PipelineConfig { - /*!\brief The key is the module index, this variable records all module pipeline configuration - * information. +struct InputConnectionConfig { + /*!\brief The key is the name of global module input interfaces. the value is the pair of + * the index of a graph module and the name of a graph module input interface. */ - std::unordered_map config; - OutputMap& operator[](int key) { - ICHECK(config.find(key) != config.end()); - return config[key]; + std::unordered_map> input_connection; + bool Empty() { return input_connection.empty(); } + std::pair operator[](const std::string key) { + if (input_connection.find(key) == input_connection.end()) { + LOG(FATAL) << "Not find the key " << key; + } + return input_connection[key]; } - void Insert(int key, const OutputMap& map) { config[key] = map; } + size_t size() const { return input_connection.size(); } + /*! + * \brief Create a input connection config from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + reader->BeginObject(); + std::string key; + std::string global_interface_name; + std::string module_interface_name; + int mod_idx = -1; + while (reader->NextObjectItem(&key)) { + if (key == "global_interface_name") { + reader->Read(&global_interface_name); + } else if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "module_interface_name") { + reader->Read(&module_interface_name); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + ICHECK(!global_interface_name.empty()) << "Invalid global interface name value"; + ICHECK(!module_interface_name.empty()) << "Invalid module interface name value"; + input_connection[global_interface_name] = make_pair(mod_idx, module_interface_name); + } + } +}; +/*! + * \brief A map of the global module param interfaces and the graph modudles param. + */ +struct ParamConnectionConfig { + /*!\brief The key is the name of global module param interfaces. the value is the + * index of a graph module. + */ + std::unordered_map param_connection; + bool Empty() { return param_connection.empty(); } + int operator[](const std::string key) { + if (param_connection.find(key) == param_connection.end()) { + LOG(FATAL) << "do not support key " << key; + } + return param_connection[key]; + } + /*! + * \brief Create a param connection config from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + reader->BeginObject(); + std::string key; + std::string global_param_name; + int mod_idx = -1; + while (reader->NextObjectItem(&key)) { + if (key == "global_param_name") { + reader->Read(&global_param_name); + } else if (key == "mod_idx") { + reader->Read(&mod_idx); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + ICHECK(!global_param_name.empty()) << "Invalid global param name value"; + param_connection[global_param_name] = mod_idx; + } + } +}; +/*! + * \brief The binding or dependency information of each module output interface. + */ +class ConfigPipelineExecution { + private: + /* + *!\brief The key is the module index, this variable records all module pipeline configuration + * information. + */ + std::unordered_map config_; + /* + *\brief The key is the global output index, this variable records the mapping of global output + * and the module output. + */ + std::unordered_map global_output_map_; + /* + *\brief The number of binding of module outputs and inputs. + */ + size_t module_input_output_binding_total_num_; - /*!\brief This function is used to verify whether config is loaded successfully. + public: + ConfigOutputBindings& operator[](int key) { + ICHECK(config_.find(key) != config_.end()); + return config_[key]; + } + /*! + *\brief Check if the module index existing in the "config". + */ + bool FindModuleInConfig(int mod_idx) { return config_.find(mod_idx) != config_.end(); } + /* + *!\brief This function is used to verify whether config is loaded successfully. * \return Return true to indicate that this class has not been successfully loaded. */ - bool Empty() { return config.empty(); } - + bool Empty() { return config_.empty(); } /*! * \brief Get the number of global outputs. * \return The number of outputs the entire pipeline has. */ size_t GetGlobalOutputNum() const { - size_t num_output = 0; + // The number of pipeline outputs is the size of "global_output_map"; + return global_output_map_.size(); + } + /* + *!\brief Get the map of global outputs and module outputs. + */ + std::unordered_map& GetGlobalConfigOutputBindings(void) { + return global_output_map_; + } + /* + *!\brief Get the number of module output and module input bindings. + */ + size_t GetInputOutputBindingNum() const { return module_input_output_binding_total_num_; } + /* + *!\brief Parse the config to construct data struct using in pipeline execution. + */ + void ParseConfiguration(const std::unordered_map& config) { + if (config.empty()) { + LOG(FATAL) << "The Configuration loading not finish yet."; + } + module_input_output_binding_total_num_ = 0; for (auto mod_output : config) { - num_output += mod_output.second.GetGlobalOutputNum(); + // Get the numbers of binding of input and output. + module_input_output_binding_total_num_ += mod_output.second.GetInputOutputBindingNum(); + // Use global output index as key to create a mapping of global index and module output. + const std::vector& global_output = + mod_output.second.GetGlobalConfigOutputBindings(); + + for (auto output : global_output) { + global_output_map_[output.second] = ModuleOutputPair(mod_output.first, output.first); + } } - return num_output; + return; + } + /*! + * \brief Create a pipeline config from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + int mod_idx = -1; + ConfigOutputBindings output; + std::string dev; + while (reader->NextObjectItem(&key)) { + if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "dev") { + reader->Read(&dev); + } else if (key == "output") { + reader->Read(&output); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + // Check if the output is successfully read. + ICHECK(!output.Empty()) << "Invalid output binding result."; + // Build the mapping of mod_idx and "ConfigOutputBindings". + config_[mod_idx] = output; + } + // Call this function after "config" loading finished. + ParseConfiguration(config_); + } +}; +/* + *\brief Runtime of backend. + */ +class BackendRuntime { + private: + /*\brief The index of runtime indicate the position in the pipeline.*/ + int runtime_idx_; + /*\brief The Runtime module of a backedn graph executor.*/ + Module module_; + /*! + *\brief To transfer data between two different backends, we need a local + * tensor variable as a medium. This variable is a mapping of input data and local + * data. + */ + std::unordered_map input_tensor_local_copy_; + /*!\brief The packed functions.*/ + tvm::runtime::PackedFunc run_; + tvm::runtime::PackedFunc set_input_; + tvm::runtime::PackedFunc get_input_; + tvm::runtime::PackedFunc get_output_; + tvm::runtime::PackedFunc get_num_output_; + tvm::runtime::PackedFunc get_num_inputs_; + tvm::runtime::PackedFunc get_input_index_; + /*! + * \brief The new DLTensor have same shape, data type with a existing DLTensor and + * the new DLTensor device type is CPU. This function is used to do cross device data + * copy. + */ + inline DLTensor* CopyDLTensorToCPU(const DLTensor* from) { + DLTensor* ret = NULL; + TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, + kDLCPU, 0, &ret); + return ret; + } + /*!\brief The new NDArray have same shape, data type with an existing DLTensor.*/ + NDArray CreateNDArrayFromDLTensor(const DLTensor* from) { + std::vector shape; + for (int i = 0; i < from->ndim; i++) { + shape.push_back(from->shape[i]); + } + auto ndarray = NDArray::Empty(shape, from->dtype, from->device); + ndarray.CreateView(shape, from->dtype); + return ndarray; + } + /* + *\brief Copy data from a DLTensor to another DLTensor. + */ + void CopyFromTo(DLTensor* from, DLTensor* to) { + // If the source device and target device are not the same, we use a temporary DLTensor + // on CPU as the bridge. + if (from->device.device_type != to->device.device_type && from->device.device_type != kDLCPU && + to->device.device_type != kDLCPU) { + DLTensor* dltensor_local = nullptr; + if (input_tensor_local_copy_.find(to) == input_tensor_local_copy_.end()) { + dltensor_local = CopyDLTensorToCPU(from); + input_tensor_local_copy_[to] = dltensor_local; + } else { + dltensor_local = input_tensor_local_copy_[to]; + } + TVMArrayCopyFromTo(from, dltensor_local, nullptr); + from = dltensor_local; + } + + TVMArrayCopyFromTo(from, to, nullptr); + } + + public: + BackendRuntime(Module mod, int mod_idx) { + module_ = mod; + runtime_idx_ = mod_idx; + get_input_index_ = module_.GetFunction("get_input_index"); + get_num_output_ = module_.GetFunction("get_num_outputs"); + get_num_inputs_ = module_.GetFunction("get_num_inputs"); + set_input_ = module_.GetFunction("set_input"); + get_input_ = module_.GetFunction("get_input"); + get_output_ = module_.GetFunction("get_output"); + run_ = module_.GetFunction("run"); + } + BackendRuntime(void) {} + ~BackendRuntime() { + for (auto data : input_tensor_local_copy_) { + TVMArrayFree(data.second); + } + } + /*!\brief Create a new NDArray which have same shape, data type with a module output. */ + NDArray CreateFromOutput(int idx) { + NDArray data = get_output_(idx); + return CreateNDArrayFromDLTensor(const_cast(data.operator->())); + } + /*!\brief Return the moudle index.*/ + int GetModuleIndex() { return runtime_idx_; } + /*!\brief Return the number of output*/ + int NumOutputs() const { return get_num_output_(); } + /*!\brief Return the number of input*/ + int NumInputs() const { return get_num_inputs_(); } + /*!\brief Use input index to set data to the runtime module.*/ + void SetInput(const int index, DLTensor* data_in) { + NDArray input = get_input_(index); + DLTensor* dltensor_input = const_cast(input.operator->()); + CopyFromTo(data_in, dltensor_input); + } + /*!\brief Use the input name to set dat ato the runtime moduel. */ + void SetInput(const std::string name, DLTensor* data_in) { + int index = this->GetInputIndex(name); + SetInput(index, data_in); } + /*!\brief Use output index to get a module output.*/ + NDArray GetOutput(int index) { return get_output_(index); } + /*!\brief Run the runtime.*/ + void Run() { run_(); } + /*!\brief Use the index to a module input.*/ + NDArray GetInput(int index) const { return get_input_(index); } + /*!\bief Use a input name to get the corresponding index of input.*/ + int GetInputIndex(const std::string& name) { return get_input_index_(name); } }; /*! * \brief The information used to initialize the graph executor module, the information @@ -182,4 +539,6 @@ struct GraphModuleLoadInfo { }; /*! The Module information of each module.The 'int' is module index. */ using ModuleConfig = std::unordered_map; +}; // namespace runtime +}; // namespace tvm #endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 4a9b7eacdf65..6add54d16415 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -71,6 +71,19 @@ def get_mannual_mod(): return mods, dshape +def recreate_parameters(mod): + # Get the binding parameters from a module, then create the same parameters with different data. + # This function can be used to test "set_param" function. + with tvm.transform.PassContext(opt_level=3): + lib = relay.build(mod, "llvm") + + mod1_customized_params = {} + for key, value in lib.params.items(): + new_value = value.numpy() + np.full(value.shape, 10).astype(value.dtype) + mod1_customized_params[key] = tvm.nd.array(new_value) + return mod1_customized_params, mod + + def get_manual_conf(mods, target): # This function is used to generate manual pipeline configuration. mod_config = {} @@ -126,6 +139,69 @@ def get_manual_conf(mods, target): return mod_config +def run_modules( + mod_configs, + dev, + target, + global_input_name, + global_input_data, + mod_set_input, + input_name, + input_data, + params_mod=None, + params=None, +): + # Use graph executor to run these modules in serialized. The resturn data is used to + # verify the result comming from pipeline executor. + mod_input = {} + final_output = {} + idx = 0 + for mod in mod_configs: + with tvm.transform.PassContext(opt_level=3): + lib = relay.build(mod, target) + + m = graph_executor.GraphModule(lib["default"](dev)) + # Get the input information from "mod_input" then set the input to the module. + if idx in mod_input: + for input in mod_input[idx]: + input = mod_input[idx][input] + m.set_input(input["index"], input["data"]) + else: + m.set_input(global_input_name, global_input_data) + + # Set input data to the module which is "mod_set_input". + if mod == mod_set_input: + m.set_input(input_name, input_data) + # If the module is "params_mod" then set the parameters to this module. + if params_mod == mod: + m.set_input(None, None, **params) + + m.run() + n = m.get_num_outputs() + # Set current output as inputs of next module. + mconfig = mod_configs[mod] + for output in mconfig["pipeline"]["output"]: + output_data = m.get_output(output["output_idx"]).numpy() + for dep in output["dependencies"]: + is_global = False + if "global_output_index" in dep: + is_global = True + name = dep["global_output_index"] + else: + mod_idx = dep["mod_idx"] + name = dep["input_name"] + if is_global: + final_output[name] = output_data + else: + if mod_idx in mod_input: + mod_input[mod_idx][name] = {"index": name, "data": output_data} + else: + mod_input[mod_idx] = {name: {"index": name, "data": output_data}} + idx = idx + 1 + + return final_output + + def test_pipe_config_check(): # This function is used to trigger runtime error by applying wrong logic connection. @@ -172,7 +248,8 @@ def test_pipeline(): for target in target_list: # Get the three pipeline modules here. (mod1, mod2, mod3), dshape = get_mannual_mod() - + # Choose a module to create new parameters. + customized_parameters, customized_mod = recreate_parameters(mod1) # Prepare batch data for pipeline computation. datas = [] for i in range(5): @@ -180,6 +257,8 @@ def test_pipeline(): pipe_config = pipeline_executor.PipelineConfig() + # The pipeline param named "param_0" will be connected to the param of mod1. + pipe_config["param"]["param_0"].connect(pipe_config[customized_mod]["param"]) # The pipeline input named "data_0" will be connected to a input named "data_0" # of mod1. pipe_config["input"]["data_0"].connect(pipe_config[mod1]["input"]["data_0"]) @@ -202,6 +281,7 @@ def test_pipeline(): # The mod3 output[0] will be connected to pipeline output[1]. pipe_config[mod3]["output"][0].connect(pipe_config["output"]["1"]) + print(pipe_config) # Print configueration (print(pipe_config)), the result looks like following. # # Inputs @@ -228,7 +308,8 @@ def test_pipeline(): pipe_config[mod3].dev = tvm.cpu(0) # Here is to check the correctness of the configuration generated by API. - assert pipe_config.get_config() == get_manual_conf([mod1, mod2, mod3], target) + mconfig = pipe_config.get_config() + assert mconfig["module_connection"] == get_manual_conf([mod1, mod2, mod3], target) # Build and create a pipeline module. with tvm.transform.PassContext(opt_level=3): @@ -248,6 +329,46 @@ def test_pipeline(): # Use the import function to create and initialize PipelineModule. pipeline_module_test = pipeline_executor.PipelineModule.load_library(config_file_name) assert pipeline_module_test.num_outputs == 2 + # Set customized params + pipeline_module_test.set_params("param_0", customized_parameters) + for data in datas: + # Get the result from the graph executor execution without setting customized + # parameters. + wrong_output = run_modules( + mconfig["module_connection"], + tvm.cpu(), + "llvm", + "data_0", + data, + mod2, + "data_1", + data, + ) + # Get the result from the graph executor execution with setting customized + # parameters. + normal_output = run_modules( + mconfig["module_connection"], + tvm.cpu(), + "llvm", + "data_0", + data, + mod2, + "data_1", + data, + customized_mod, + customized_parameters, + ) + # Get the result from the pipeline executor + pipeline_module_test.set_input("data_0", data) + pipeline_module_test.set_input("data_1", data) + # Run the pipeline executor in serialized. + pipeline_module_test.run(True) + outputs = pipeline_module_test.get_output() + for i in range(len(outputs)): + tvm.testing.assert_allclose(normal_output[i], outputs[i].numpy()) + assert not (normal_output[i] == wrong_output[i]).all() + + pipeline_module_test.stop() if __name__ == "__main__":