diff --git a/CMakeLists.txt b/CMakeLists.txt index 127ba50b3720..83b865f3c57b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -375,6 +375,7 @@ if(USE_PROFILER) set_source_files_properties(${RUNTIME_GRAPH_EXECUTOR_SRCS} PROPERTIES COMPILE_DEFINITIONS "TVM_GRAPH_EXECUTOR_DEBUG") + file(GLOB RUNTIME_VM_PROFILER_SRCS src/runtime/vm/profiler/*.cc) list(APPEND RUNTIME_SRCS ${RUNTIME_VM_PROFILER_SRCS}) endif(USE_PROFILER) @@ -388,6 +389,12 @@ if(GTEST_INCLUDE_DIR AND GTEST_LIB) include(GoogleTest) endif() +if(USE_PIPELINE_EXECUTOR) + message(STATUS "Build with Subgraph Executor support...") + file(GLOB RUNTIME_PIPELINE_SRCS src/runtime/pipeline/*.cc) + list(APPEND RUNTIME_SRCS ${RUNTIME_PIPELINE_SRCS}) +endif(USE_PIPELINE_EXECUTOR) + # Module rules include(cmake/modules/VTA.cmake) include(cmake/modules/StandaloneCrt.cmake) diff --git a/cmake/config.cmake b/cmake/config.cmake index 8d8186c1b4f0..b520a9627b83 100644 --- a/cmake/config.cmake +++ b/cmake/config.cmake @@ -102,6 +102,9 @@ set(USE_STACKVM_RUNTIME OFF) # Whether enable tiny embedded graph executor. set(USE_GRAPH_EXECUTOR ON) +# Whether enable subgraph runtime. +set(USE_PIPELINE_EXECUTOR OFF) + # Whether enable tiny graph executor with CUDA Graph set(USE_GRAPH_EXECUTOR_CUDA_GRAPH OFF) diff --git a/python/tvm/contrib/graph_executor.py b/python/tvm/contrib/graph_executor.py index f064f8dbee69..9321c8cc7753 100644 --- a/python/tvm/contrib/graph_executor.py +++ b/python/tvm/contrib/graph_executor.py @@ -156,6 +156,7 @@ def __init__(self, module): self._run = module["run"] self._get_output = module["get_output"] self._get_input = module["get_input"] + self._get_input_index = module["get_input_index"] self._get_num_outputs = module["get_num_outputs"] self._get_input_index = module["get_input_index"] self._get_num_inputs = module["get_num_inputs"] @@ -245,16 +246,6 @@ def get_input(self, index, out=None): def get_input_index(self, name): """Get inputs index via input name. - - Parameters - ---------- - name : str - The input key name - - Returns - ------- - index: int - The input index. -1 will be returned if the given input name is not found. """ return self._get_input_index(name) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py new file mode 100644 index 000000000000..c1f7842b3e6d --- /dev/null +++ b/python/tvm/contrib/pipeline_executor.py @@ -0,0 +1,287 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Pipeline executor that executes pipeline containing TVM PackedFunc.""" +import json +import tvm._ffi +from tvm import relay +from tvm.contrib import graph_executor + + +def pipeline_executor_enabled(): + """check if pipeline executor enabled. + Return + ------ + enable: bool + return pipeline executor get enabled or not + """ + pipeline_enabled = False + try: + pipelinecreate = tvm._ffi.get_global_func("tvm.pipeline_executor.create") + assert pipelinecreate + pipeline_enabled = True + except ValueError: + print("pipeline executor not enabled!") + + return pipeline_enabled + + +def write_file(file_name, data, mode): + """write data into file + + Parameters + ---------- + file_name: str + file name + data: str + data + mode: str + file open mode + """ + if file_name: + with open(file_name, mode) as file_handle: + file_handle.write(data) + + +def build_pipeline(mod_n_configs, export_path=None): + """build module list that can use for pipeline execution. + + Parameters + ---------- + mod_n_configs: Dict[IRModule, Dict[str, Any]] + build configuration informaton, structure like following. + {IRModule: {"target":target, + "target_host":target_host, + "params":params, + "mod_name"mod_name, + "build":build}} + export_path: str + export build result into file + + Returns + ------- + ret: List[IRModule] + list of IRModule + string_config: Dict[int, Dict[str, any]] + pipeline configuration + """ + mods = {} + config_len = len(mod_n_configs) + string_config = [{} for _ in range(config_len)] + for _, (ir_mod, mod_config) in enumerate(mod_n_configs.items()): + # init lib_name and json_name params with empty + lib_name = "" + json_name = "" + params_name = "" + # Get module configuration + assert "pipeline" in mod_config and "mod_indx" in mod_config["pipeline"] + # Get module index in pipeline configuration + mconf = mod_config["pipeline"].copy() + # Get mod device config + dev = mod_config["dev"] + mod_indx = mconf["mod_indx"] + assert mod_indx < config_len + build_func = relay.build + # if there is a self defined build function then use it. + if "build" in mod_config and mod_config["build"]: + build_func = mod_config["build"] + + # build IRModule + mod = build_func( + ir_mod, + mod_config["target"], + params=mod_config["params"], + target_host=mod_config["target_host"], + mod_name=mod_config["mod_name"], + ) + + if export_path: + graph, lib, params = mod + lib_name = "{}/lib{}.so".format(export_path, mod_indx) + json_name = "{}/json{}".format(export_path, mod_indx) + params_name = "{}/params{}".format(export_path, mod_indx) + lib.export_library(lib_name) + write_file(json_name, graph, "w") + write_file(params_name, relay.save_param_dict(params), "wb") + + mconf["lib_name"] = lib_name + mconf["json_name"] = json_name + mconf["params_name"] = params_name + mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id) + # Create pipeline configuration + string_config[mod_indx] = mconf + # associate mod with device + mods[mod] = {"dev": dev} + + if export_path: + write_file("{}/config".format(export_path), json.dumps(string_config), "w") + # with open("{}/config".format(export_path), "w") as config_file: + # config_file.write(json.dumps(string_config)) + + # return IRModule list and pipeline configuration + return mods, string_config + + +def create(pipeline_mods, mod_config): + """Create a pipeline runtime executor. + + Parameters + ---------- + pipeline_mods : List[IRModule] + list of IRModule + + mod_config : Dict[int, Dict[str, Any]] + modules and modules dependency configuration informaiton. + + Returns + ------- + submodule : PipelineModule + Runtime pipeline module. + """ + + mods = [] + for pipeline_mod in pipeline_mods: + mod = graph_executor.GraphModule( + pipeline_mod["default"](pipeline_mods[pipeline_mod]["dev"]) + ) + mods.append(mod) + + submodule = PipelineModule(mods, json.dumps(mod_config)) + # submodule = PipelineModule(pipeline_mods, json.dumps(mod_config)) + return submodule + + +class PipelineModule(object): + """Wrapper runtime module. This is a thin wrapper of the underlying TVM module. + you can also directly call set_input, run, and get_output of underlying module functions. + + Parameters + ---------- + graph_module : List[GraphModule] + The internal tvm module that holds the actual graph functions. + + pipeline_config : Dict[IRModule, Dict[str, Any]] + modules and modules dependency configuration informaiton. + + """ + + def __init__(self, modules, pipeline_config): + mods = [] + for module in modules: + mods.append(module.module) + + pipelinecreate = tvm._ffi.get_global_func("tvm.pipeline_executor.create") + assert pipelinecreate + module = pipelinecreate(mods, pipeline_config) + + self.graph_modules_ = modules + + self._set_input = module["set_input"] + self._run = module["run"] + self._stop = module["stop"] + self._get_output = module["get_output"] + self._get_input = module["get_input"] + self._get_num_outputs = module["get_num_outputs"] + self._get_num_inputs = module["get_num_inputs"] + + def set_input(self, key, value, mod_idx=0, params=None): + """Set inputs to the module via kwargs + + Parameters + ---------- + key : array_like + The input key + + value : array_like. + The input key + + mod_idx : int + the submodule index + + params : dict of str to NDArray + Additional arguments + """ + assert mod_idx >= 0 + self._set_input(key, tvm.nd.array(value, tvm.cpu()), mod_idx) + + if params: + for param in params: + self.graph_modules_[mod_idx].set_input(**param) + + def run(self): + """Run forward execution of the graph""" + self._run() + + def stop(self): + """Stop pipeline run""" + self._stop() + + def get_num_outputs(self): + """Get the number of outputs from the graph + + Returns + ------- + count : int + The number of outputs. + """ + return self._get_num_outputs() + + def get_num_inputs(self): + """Get the number of inputs to the graph + + Returns + ------- + count : int + The number of inputs. + """ + return self._get_num_inputs() + + def get_input(self, input_indx, runtime_index=0, out=None): + """Get index-th input to out + + Parameters + ---------- + index : int + The input index + + out : NDArray + The output array container + """ + if out: + self._get_input(input_indx, runtime_index).copyto(out) + return out + + return self._get_input(input_indx, runtime_index) + + def get_output(self): + """Get index-th output to out + + Parameters + ---------- + index : int + The output index + """ + return self._get_output() + + def __getitem__(self, key): + """Get internal module function + + Parameters + ---------- + key : str + The key to the module. + """ + return self.module[key] diff --git a/src/runtime/pipeline/pipeline_data.h b/src/runtime/pipeline/pipeline_data.h new file mode 100644 index 000000000000..0f91a014cd35 --- /dev/null +++ b/src/runtime/pipeline/pipeline_data.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_DATA_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_DATA_H_ +#define EXPORT __attribute__((visibility("default"))) +#define IMPORT +#include +#include +#include +#include + +#include "pipeline_struct.h" +#ifdef __cplusplus + +#define read_barrier() std::atomic_thread_fence(std::memory_order_acquire) + +template +squeue* createQueue(squeue* q, size_t size) { + squeue* rq = new squeue(); + return rq; +} + +template +void deleteQueue(squeue* q) { + free(q); +} + +template +inline bool full(squeue* q) { + return ((q->tail + 1) % q->len) == q->head; +} + +template +inline bool empty(squeue* q) { + return q->head == q->tail; +} + +template +void q_push(squeue* q, const VARIABLE_TYPE& s) { + while (full(q)) { + } + q->q[q->tail] = s; + read_barrier(); + q->tail = (q->tail + 1) % q->len; +} + +template +bool q_poll(squeue* q, VARIABLE_TYPE* s) { + if (empty(q)) return false; + *s = q->q[q->head]; + read_barrier(); + q->head = (q->head + 1) % q->len; + return true; +} +// extern "C" +#endif + +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_DATA_H_ diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc new file mode 100644 index 000000000000..2c949bea058f --- /dev/null +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/*! + * \file pipeline_executor.cc + */ +#include "pipeline_executor.h" + +namespace tvm { +namespace runtime { + +/*! \bief Stop pipeline run. */ +void SubGraphRuntime::Stop() { pipeline_stop(runtimes_); } +/*! + * \brief Run all the operations one by one. + */ +void SubGraphRuntime::Run() { pipeline_run(runtimes_, input_int_map_); } + +void SubGraphRuntime::Init(const Array& modules, + const std::string& pipeline_json) { + std::istringstream is(pipeline_json); + dmlc::JSONReader reader(&is); + this->Load(&reader); + outpuNumber_ = pipeline_init(modules, &runtimes_, pipeline_conf_, mod_conf_); + return; +} + +/*! + * \brief set index-th input to the modIndx-th graph. + * \param index The input index. + * \param data_in The input data. + * \param modIndx The runtime index. + */ +void SubGraphRuntime::SetInput(int index, DLTensor* data_in, int mod_idx) { + if (0 == mod_idx) { + runtimes_[0]->runtimePtr->SetInput(index, data_in); + } else { + pipeline_setinput(input_int_map_, index, data_in, mod_idx); + } +} + +/*! + * \brief Get the number of outputs + * + * \return The number of outputs from last pipeline. + */ +int SubGraphRuntime::NumOutputs() const { return outpuNumber_; } + +/*! + * \brief Get the number of inputs + * + * \return The number of inputs to the first pipeline. + */ +int SubGraphRuntime::NumInputs() const { + int inputsNum = 0; + for (auto runtime : runtimes_) { + inputsNum += runtime->runtimePtr->NumInputs(); + } + return inputsNum; +} + +/*! + * \brief Return NDArray for given input index. + * \param index The input index. + * + * \return NDArray corresponding to given input node index. + */ +NDArray SubGraphRuntime::GetInput(int index, int mIndx) const { + auto gruntime = runtimes_[mIndx]; + return gruntime->runtimePtr->GetInput(index); +} + +/*! + * \brief Return input index for given input name. + * \param name The input name. + * + * \return int corresponding to given input node name. + */ +int SubGraphRuntime::GetInputIndex(const string& name, int mIndx) const { + auto gruntime = runtimes_[mIndx]; + return gruntime->runtimePtr->GetInputIndex(name); +} + +/*! + * \brief Return NDArray Array for all output. + * + * \return NDArray Array for all output. + */ +Array SubGraphRuntime::GetOutput(bool syncPoll) { + Array nd; + if (pipeline_poll(&output_entry_, runtimes_, syncPoll)) { + for (auto output : output_entry_) { + nd.push_back(output); + } + } + return nd; +} + +PackedFunc SubGraphRuntime::GetFunction(const std::string& name, + const ObjectPtr& sptr_to_self) { + if (name == "set_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + // Default use first runtime index value. + int modIndx = 0; + if (args.num_args == 3) { + modIndx = static_cast(args[2]); + } + if (String::CanConvertFrom(args[0])) { + int index = this->GetInputIndex(args[0].operator String(), modIndx); + this->SetInput(index, args[1], modIndx); + } else { + this->SetInput(static_cast(args[0]), args[1], modIndx); + } + }); + } else if (name == "get_output") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (args.num_args == 1) { + *rv = this->GetOutput(static_cast(args[0])); + } else { + *rv = this->GetOutput(); + } + }); + } else if (name == "get_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + int in_idx = 0, mod_idx = 0; + if (args.num_args == 2) { + mod_idx = args[1]; + } + + if (String::CanConvertFrom(args[0])) { + int index = this->GetInputIndex(args[0].operator String(), mod_idx); + *rv = this->GetInput(index, mod_idx); + } else { + in_idx = args[0]; + if (in_idx >= 0) { + *rv = this->GetInput(in_idx, mod_idx); + } + } + }); + } else if (name == "get_num_outputs") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumOutputs(); }); + } else if (name == "get_num_inputs") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumInputs(); }); + } else if (name == "run") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(); }); + } else if (name == "stop") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); }); + } else { + return PackedFunc(); + } +} + +Module PipelineRuntimeCreate(const Array& m, + const std::string& pipeline_json) { + auto exec = make_object(); + exec->Init(m, pipeline_json); + return Module(exec); +} + +TVM_REGISTER_GLOBAL("tvm.pipeline_executor.create").set_body([](TVMArgs args, TVMRetValue* rv) { + *rv = PipelineRuntimeCreate(args[0], args[1]); +}); +} // namespace runtime +} // namespace tvm diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h new file mode 100644 index 000000000000..8cd99df1ab56 --- /dev/null +++ b/src/runtime/pipeline/pipeline_executor.h @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/*! + * \brief pipeline executor + * \file pipeline_executor.h + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ +#include +#include +#include +#include + +#include "../file_utils.h" +#include "pipeline_function.h" + +using namespace std; +namespace tvm { +namespace runtime { + +/*! + * \brief pipeline runtime. + * + * This runtime can be acccesibly in various language via + * TVM runtime PackedFunc API. + */ +class TVM_DLL SubGraphRuntime : public ModuleNode { + public: + SubGraphRuntime() { input_int_map_ = make_shared(); } + ~SubGraphRuntime() { + /* stop pipeline threads and release data in deconstructor. + */ + Stop(); + } + /*! + * \brief Get member function to front-end + * \param name The name of the function. + * \param sptr_to_self The pointer to the module node. + * \return The corresponding member function. + */ + virtual PackedFunc GetFunction(const std::string& name, const ObjectPtr& sptr_to_self); + + /*! + * \return The type key of the executor. + */ + const char* type_key() const final { return "SubGraphRuntime"; } + void Run(); + void Stop(); + + /*! + * \brief Initialize the graph executor with graph and context. + * \param graph_json The execution graph. + * \param module The module containing the compiled functions for the host + * processor. + * \param ctxs The context of the host and devices where graph nodes will be + * executed on. + * \param lookup_linked_param_func If given, a PackedFunc invoked to lookup linked parameters + * by storage_id. If not given, linked parameters are looked-up using an internal implementation, + * which is not compatible with RPCModules. + */ + void Init(const Array& modules, const std::string& pipeline_json); + + /*! + * \brief set index-th input to the graph. + * \param index The input index. + * \param data_in The input data. + */ + void SetInput(int index, DLTensor* data_in, int mod_idx); + + /*! + * \brief get index-th input. + * \param index The input index. + * \return The input data. + */ + NDArray GetInput(int index, int mIndx) const; + + /*! + * \brief get input index-th by name. + * \param input name. + * \return The input index. + */ + int GetInputIndex(const string& name, int mIndx) const; + /*! + * \brief Get the number of outputs + * + * \return The number of outputs from graph. + */ + int NumOutputs() const; + /*! + * \brief Get the number of inputs + * + * \return The number of inputs to the graph. + */ + int NumInputs() const; + /*! + * \brief Return NDArray Array for all output. + * + * \param syncPoll Syncholization poll mode or ASyncholization. + * \return NDArray Array for all output. + */ + Array GetOutput(bool syncPoll = true); + + protected: + vector output_entry_; + PIPELINE_CONF pipeline_conf_; + MOD_CONF mod_conf_; + vector> runtimes_; + MOD_DLDATA_MAP_PTR input_int_map_; + size_t outpuNumber_ = 0; + + unordered_map LoadDependent(dmlc::JSONReader* reader) { + unordered_map ret; + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + string inputName; + int dep_mod_indx; + while (reader->NextObjectItem(&key)) { + if (key == "mod_indx") { + reader->Read(&dep_mod_indx); + } + if (key == "input_name") { + reader->Read(&inputName); + } + } + ret[dep_mod_indx] = inputName; + } + return ret; + } + + unordered_map> LoadOutput(dmlc::JSONReader* reader) { + reader->BeginArray(); + unordered_map> ret; + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + string inputName; + int output_indx; + unordered_map dep; + while (reader->NextObjectItem(&key)) { + if (key == "output_indx") { + reader->Read(&output_indx); + } + + if (key == "dependent") { + dep = LoadDependent(reader); + } + } + ret[output_indx] = dep; + } + return ret; + } + + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + int mod_indx = 0; + std::string libName; + std::string jsonName; + std::string paramsName; + std::string dev; + unordered_map> output; + unordered_map> lib; + while (reader->NextObjectItem(&key)) { + if (key == "mod_indx") { + reader->Read(&mod_indx); + } + if (key == "lib_name") { + reader->Read(&libName); + } + + if (key == "json_name") { + reader->Read(&jsonName); + } + + if (key == "params_name") { + reader->Read(¶msName); + } + + if (key == "dev") { + reader->Read(&dev); + } + + if (key == "output") { + output = LoadOutput(reader); + } + } + pipeline_conf_[mod_indx] = output; + mod_conf_[mod_indx] = { + {"lib_name", libName}, {"json_name", jsonName}, {"params", paramsName}, {"dev", dev}}; + } + } +}; +} // namespace runtime +} // namespace tvm +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ diff --git a/src/runtime/pipeline/pipeline_function.cc b/src/runtime/pipeline/pipeline_function.cc new file mode 100644 index 000000000000..f0a845c90434 --- /dev/null +++ b/src/runtime/pipeline/pipeline_function.cc @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "pipeline_function.h" + +#include +using namespace tvm::runtime; +using namespace std; +void pipeline_pipeline_run(const int& num, const shared_ptr& curRunItem) { + QUEUE* curQueue = curRunItem->queue; + QUEUE* nextQueue = curRunItem->next->queue; + + /* + * Wait at beginning, then only do wait once last time data poll failed, + * the loop would break after an exit notification get received. + */ + bool suc = false; + while (curRunItem->waitPipeLineData(suc)) { + suc = pipeline_queue_poll(curQueue, &curRunItem->rData); + if (!suc) { + continue; + } + + curRunItem->Run(); + + vector> outputs; + curRunItem->GetOutput(&outputs); + pipeline_queue_push(nextQueue, &outputs); + curRunItem->notifyDataReadyToNext(); + } + curRunItem->notifyNextExit(); +} + +thread* pipeline_pipeline_init(SHARED_RUNTIME_VEC* runtimes) { + for (size_t i = 1; i < runtimes->size(); i++) { + (*runtimes)[i]->t = thread(pipeline_pipeline_run, i, (*runtimes)[i]); + } + return NULL; +} + +RUNTIME_PIPELINE_OUTPUT_CONF +pipeline_name_to_indx(const Array& graphRuntimes, + const RUNTIME_PIPELINE_OUTPUT_CONF_STR& pConfStr) { + RUNTIME_PIPELINE_OUTPUT_CONF confRet; + for (auto outConf : pConfStr) { + for (auto conf : outConf.second) { + int modIndx = conf.first; + // -1 is global module/pipelineexecutor + if (modIndx >= 0) { + auto mGetIndex = ((Module)graphRuntimes[modIndx]).GetFunction("get_input_index"); + confRet[outConf.first][modIndx] = (static_cast(mGetIndex(conf.second))); + } else { + confRet[outConf.first][modIndx] = stoi(conf.second); + } + } + } + return confRet; +} + +vector pipeline_graph_runtime(Array modules, const MOD_CONF& mod_conf) { + const PackedFunc* graphRuntimeCreate = Registry::Get("tvm.graph_executor.create"); + vector ret; + // if modules not empty just return in vector container + if (!modules.empty()) { + for (auto mod : modules) { + ret.push_back(mod); + } + + // if modules is empty, need to build the graph runtime from mod_conf + } else { + ret.resize(mod_conf.size()); + for (auto mconf : mod_conf) { + // load lib + auto lib = Module::LoadFromFile(mconf.second["lib_name"].c_str()); + + // read json + ifstream ifJson(mconf.second["json_name"].c_str()); + if (ifJson.fail()) { + throw std::runtime_error("json file not found!"); + } + const std::string json((istreambuf_iterator(ifJson)), istreambuf_iterator()); + + // create graph runtime + istringstream istr(mconf.second["dev"]); + string str; + int deviceType = 1, deviceId = 0; + while (getline(istr, str, ';')) { + istringstream istrDev(str); + string stemp; + if (getline(istrDev, stemp)) { + deviceType = stoi(stemp); + } + if (getline(istrDev, stemp)) { + deviceId = stoi(stemp); + } + } + Module graphModule = (*graphRuntimeCreate)(json, lib, deviceType, deviceId); + + // load parameter + TVMByteArray params_arr; + ifstream ifParam(mconf.second["params"].c_str()); + if (ifParam.fail()) { + throw std::runtime_error("params file not found!"); + } + const std::string params((istreambuf_iterator(ifParam)), istreambuf_iterator()); + params_arr.data = params.c_str(); + params_arr.size = params.length(); + auto load_params = graphModule.GetFunction("load_params"); + load_params(params_arr); + + // put into return vector + ret[mconf.first] = graphModule; + } + } + return ret; +} + +size_t pipeline_init(Array modules, SHARED_RUNTIME_VEC* runtimes, + const PIPELINE_CONF& pipeline_conf, const MOD_CONF& mod_conf) { + int outputNum = 0; + vector graphRuntimes = pipeline_graph_runtime(modules, mod_conf); + int len = graphRuntimes.size(); + for (int i = 0; i < len; i++) { + QUEUE* sub_queue = createQueue(NULL, SUB_Q_SIZE); + /* runtimeIndx start from 1. + */ + int runtimeIndx = i; + /* get dependency configuration information. + */ + auto pConf = pipeline_name_to_indx(graphRuntimes, pipeline_conf.at(runtimeIndx)); + + auto runItem = make_shared(graphRuntimes[i], sub_queue, &pConf, runtimeIndx); + runtimes->push_back(runItem); + /* set prev and next for RuntimeItem, runtime need these information to + * poll data from prev and do notification for next. + */ + if (i > 0) { + (*runtimes)[i - 1]->next = (*runtimes)[i]; + } + if (i == len - 1) { + (*runtimes)[i]->next = (*runtimes)[0]; + } + /* get output number. + */ + if (i < len - 1) { + for (auto depMap : pConf) { + /* output is final output when dependent number is 0. + */ + outputNum += depMap.second.find(-1) != depMap.second.end(); + } + } else { + outputNum += runItem->runtimePtr->NumOutputs(); + } + } + pipeline_pipeline_init(runtimes); + return outputNum; +} + +inline void pipeline_queue_push(QUEUE* queue, vector>* outputs) { + q_push>*>(queue, outputs); + return; +} + +bool pipeline_queue_poll(QUEUE* queue, RuntimeData* runtimeData) { + return q_poll(queue, runtimeData); +} + +void pipeline_run(const SHARED_RUNTIME_VEC& runtimes, const MOD_DLDATA_MAP_PTR indxInputs) { + shared_ptr runtime = runtimes.front(); + runtime->Run(); + /* Get runtime output + */ + vector> outputs; + runtime->GetOutput(&outputs); + + /* Storage input data for runtimes after first runtime + */ + for (auto modInputs : *indxInputs) { + int modIndx = modInputs.first; + for (auto inputs : modInputs.second) { + outputs.push_back(make_shared(modIndx, inputs.first, inputs.second->data)); + } + } + + pipeline_queue_push(runtime->next->queue, &outputs); + runtime->notifyDataReadyToNext(); + return; +} + +bool pipeline_poll(vector* output, const SHARED_RUNTIME_VEC& runtimes, const bool bSynch) { + shared_ptr firstRuntime = runtimes.front(); + QUEUE* queue = firstRuntime->queue; + bool suc = false; + pipelineOutputData<> outputData(output); + suc = q_poll>(queue, &outputData); + while (!suc && bSynch) { + /* + * If get exit notify then break. + */ + if (!firstRuntime->waitPipeLineData(!bSynch)) { + break; + } + suc = q_poll>(queue, &outputData); + } + return suc; +} + +void pipeline_stop(const SHARED_RUNTIME_VEC& runtimes) { + if (!runtimes.empty()) { + runtimes.front()->notifyNextExit(); + } +} + +void pipeline_setinput(MOD_DLDATA_MAP_PTR input_int_map, const int index, const DLTensor* data_in, + const int modIndx) { + if (input_int_map->find(modIndx) == input_int_map->end()) { + DLDATA_MAP dlmap; + dlmap[index] = nullptr; + input_int_map->insert({modIndx, dlmap}); + } else if (input_int_map->at(modIndx).find(index) == input_int_map->at(modIndx).end()) { + input_int_map->at(modIndx)[index] = nullptr; + } + + TENSOR_DATA tensor_data = input_int_map->at(modIndx)[index]; + if (tensor_data == nullptr) { + tensor_data = make_shared(); + input_int_map->at(modIndx)[index] = tensor_data; + } + tensor_data->CreateCopyFrom(data_in, kDLCPU, 0); +} diff --git a/src/runtime/pipeline/pipeline_function.h b/src/runtime/pipeline/pipeline_function.h new file mode 100644 index 000000000000..bb820147018d --- /dev/null +++ b/src/runtime/pipeline/pipeline_function.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_FUNCTION_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_FUNCTION_H_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "pipeline_data.h" + +using namespace std; +using namespace tvm::runtime; +typedef vector> SHARED_RUNTIME_VEC; +typedef unordered_map>> PIPELINE_CONF; +typedef unordered_map> MOD_CONF; +typedef shared_ptr TENSOR_DATA; +typedef unordered_map DLDATA_MAP; +typedef unordered_map MOD_DLDATA_MAP; +typedef shared_ptr MOD_DLDATA_MAP_PTR; + +vector pipeline_get_graphRuntime(Array modules, const MOD_CONF& mod_conf); +size_t pipeline_init(Array modules, SHARED_RUNTIME_VEC* runtimes, + const PIPELINE_CONF& pipeline_conf, const MOD_CONF& mod_conf); +void pipeline_run(const SHARED_RUNTIME_VEC& runtimes, const MOD_DLDATA_MAP_PTR indxInputs); +inline void pipeline_queue_push(QUEUE* queue, vector>* outputs); +bool pipeline_queue_poll(QUEUE* queue, RuntimeData* runtimeData); +bool pipeline_poll(vector* output, const SHARED_RUNTIME_VEC& runtimes, + const bool bSync = false); +void pipeline_stop(const SHARED_RUNTIME_VEC& runtimes); +void pipeline_setinput(MOD_DLDATA_MAP_PTR input_int_map, const int index, const DLTensor* data_in, + const int modIndx); + +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_FUNCTION_H_ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h new file mode 100644 index 000000000000..af541faa64b1 --- /dev/null +++ b/src/runtime/pipeline/pipeline_struct.h @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ +#include +#include +#include +#include +//#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#define SLOT slot_t<> +#define SUB_Q_SIZE 1024 + +using namespace tvm::runtime; +using namespace std; +typedef unordered_map> RUNTIME_PIPELINE_OUTPUT_CONF; +typedef unordered_map> RUNTIME_PIPELINE_OUTPUT_CONF_STR; +/* thread control struction, for single consumer single producer mode. + */ +class TControl { + private: + condition_variable cond; + volatile bool bWait = false; + mutex m; + + public: + volatile bool bExit = false; + bool wait(bool bPollSuc) { + if (bPollSuc) { + return true; + } + + unique_lock lock(m); + cond.wait(lock, [&] { return this->bWait; }); + bWait = false; + + return !bExit; + } + + void notify(void) { + bWait = true; + cond.notify_one(); + } + + void exit_notify(thread* t) { + /* set bExit first then notify + */ + bExit = true; + notify(); + if (t->joinable()) { + t->join(); + } + } +}; + +#define DEPENDENT_MAX 32 +#define TYP_MAX(type) (1 << size_of(type) - 1) +typedef uint8_t DEP_INDX_TYPE; +class Dependent { + private: + /* index 0 represent output is final output or not.*/ + uint8_t bFinal = false; + /* how many dependent*/ + uint8_t depNum = 0; + /* dependent input index number.*/ + union { + DEP_INDX_TYPE dependent[DEPENDENT_MAX] = {0}; + DEP_INDX_TYPE outputIndx; + }; + + public: + void SetDepModInputIndx(const int modIndx, const uint8_t inputIndx) { + assert(modIndx <= DEPENDENT_MAX); + assert(inputIndx <= TYP_MAX(DEP_INDX_TYPE)); + if (modIndx == -1) { + bFinal = true; + outputIndx = inputIndx; + } else { + dependent[modIndx] = inputIndx; + } + depNum++; + } + + int GetOutputIndx(void) { return outputIndx; } + + int GetDepModInputIndx(const int modIndx) { return dependent[modIndx]; } + + void RemoveDependentRef(const int modIndx) { + dependent[modIndx] = -1; + depNum--; + } + + /* + * check if the output need get forward to next runtime. + */ + bool NeedForward() { return (bFinal || depNum > 0); } +}; + +class TensorData { + public: + DLTensor* data = nullptr; + + DLTensor* CreateCopyFrom(const DLTensor* from, int device_type, int device_id) { + size_t fromLen = tvm::runtime::GetDataSize(*from); + size_t toLen = data ? tvm::runtime::GetDataSize(*data) : 0; + + if (fromLen != toLen) { + if (data) { + TVMArrayFree(data); + data = nullptr; + } + TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, + device_type, device_id, &data); + } + TVMArrayCopyFromTo(const_cast(from), data, nullptr); + return data; + } + ~TensorData() { + if (data) { + TVMArrayFree(data); + data = nullptr; + } + } +}; + +class InputData { + public: + Dependent dependent; + TensorData dlData; + + DLTensor* CreateCopyFrom(const DLTensor* from, int device_type, int device_id) { + dlData.CreateCopyFrom(from, device_type, device_id); + return dlData.data; + } +}; + +class OutputData { + public: + OutputData(const NDArray& data, const size_t Indx, + RUNTIME_PIPELINE_OUTPUT_CONF runtime_pipeline_output_conf) { + assert(runtime_pipeline_output_conf.size() < DEPENDENT_MAX); + /* use data_ to keep the NDArray data reference, to avoid memory + * used by DLTensor get freed. + */ + data_ = data; + dltensor = const_cast(data_.operator->()); + outputIndx = Indx; + for (auto conf : runtime_pipeline_output_conf[outputIndx]) { + dependent.SetDepModInputIndx(conf.first, conf.second); + } + } + + OutputData(const int modIndx, const int inputIndx, const DLTensor* data) { + dltensor = const_cast(data); + dependent.SetDepModInputIndx(modIndx, inputIndx); + } + + explicit OutputData(const InputData* pdata) { + dependent = pdata->dependent; + /* caller need make sure pdata->dlData.data is avaialble. + */ + dltensor = pdata->dlData.data; + } + + OutputData& operator=(const InputData* pdata) { + dependent = pdata->dependent; + /* caller need make sure pdata->dlData.data is avaialble. + */ + dltensor = pdata->dlData.data; + return *this; + } + + int runtimeIdx; + /* reserved, for debug purpose + */ + int outputIndx; + /* index 0 represent output is final output or not. + * index offset is dependent mod index, + * value is dependent mode input index + */ + Dependent dependent; + DLTensor* dltensor; + + private: + NDArray data_; +}; + +class PipelineData { + private: + void FreeData() { + for (size_t i = 0; i < max_num; i++) { + delete inputList[i]; + } + + if (inputList) { + free(inputList); + } + } + + void ResetDataList(size_t num) { + if (max_num < num) { + FreeData(); + inputList = reinterpret_cast(calloc(num, sizeof(InputData))); + max_num = num; + } + return; + } + + InputData* CreateCopyFrom(const DLTensor* fromData, const Dependent& fromDep, InputData** to, + int device_type, int device_id) { + if (!*to) { + *to = new InputData; + } + + (*to)->CreateCopyFrom(fromData, device_type, device_id); + (*to)->dependent = fromDep; + return *to; + } + + public: + void ExportAppendData(vector>* outputs) { + for (size_t i = 0; i < num; i++) { + shared_ptr var = make_shared(inputList[i]); + outputs->push_back(var); + } + return; + } + + void Copy(const vector& dlInput, int device_type, int device_id) { + num = dlInput.size(); + ResetDataList(num); + + for (size_t i = 0; i < num; i++) { + CreateCopyFrom(dlInput[i]->dlData.data, dlInput[i]->dependent, &inputList[i], device_type, + device_id); + } + return; + } + + void Copy(const vector>* dlOutput, int device_type, int device_id) { + num = dlOutput->size(); + ResetDataList(num); + + for (size_t i = 0; i < num; i++) { + CreateCopyFrom(dlOutput->at(i)->dltensor, dlOutput->at(i)->dependent, &inputList[i], + device_type, device_id); + } + return; + } + + size_t num; + size_t max_num; + InputData** inputList; + + TControl controlData; + PipelineData(void) : num(0), max_num(0), inputList(nullptr) {} + ~PipelineData(void) { FreeData(); } +}; + +template +class slot_t { + public: + bool bExit = false; + PipelineData data; + slot_t(void) {} + + slot_t& operator=(const vector>* outputData) { + data.Copy(outputData, device_type, device_id); + return *this; + } +}; + +template +class pipelineOutputData { + public: + explicit pipelineOutputData(vector* datas) : datas_(datas) { ; } + pipelineOutputData& operator=(const slot_t& slot) { + assert(datas_->size() >= slot.data.num); + unordered_map dataMap; + /* output may not ordered by index in slot, use a map to index them. + */ + for (size_t i = 0; i < slot.data.num; i++) { + auto dlTensor = slot.data.inputList[i]->dlData.data; + int outputIndx = slot.data.inputList[i]->dependent.GetOutputIndx(); + assert(outputIndx < slot.data.num); + dataMap[outputIndx] = dlTensor; + } + + for (size_t i = 0; i < dataMap.size(); i++) { + auto dlTensor = dataMap[i]; + /* alloc NDArray if there is no NDArray Allocated in vector + */ + if (datas_->size() < i + 1) { + /* allocated NDArray + */ + vector shape; + for (int i = 0; i < dlTensor->ndim; i++) { + shape.push_back(dlTensor->shape[i]); + } + auto ndarray = NDArray::Empty(shape, dlTensor->dtype, dlTensor->device); + ndarray.CreateView(shape, dlTensor->dtype); + + /* push into NDArray vector + */ + datas_->push_back(ndarray); + } + datas_->at(i).CopyFrom(dlTensor); + } + return *this; + } + + private: + vector* datas_; +}; + +template +class squeue { + public: + size_t len; + volatile size_t head; + volatile size_t tail; + SLOT_TYPE q[QLEN]; + squeue(void) : len(QLEN), head(0), tail(0) {} +}; +typedef squeue QUEUE; + +class RuntimeFunction { + public: + DLTensor* dlLocal = nullptr; + Module module_; + tvm::runtime::PackedFunc get_num_output; + tvm::runtime::PackedFunc get_num_inputs; + tvm::runtime::PackedFunc set_input; + tvm::runtime::PackedFunc get_output; + tvm::runtime::PackedFunc get_input; + tvm::runtime::PackedFunc get_input_index; + tvm::runtime::PackedFunc run; + explicit RuntimeFunction(const Module& m) { + module_ = m; + get_num_output = module_.GetFunction("get_num_outputs"); + get_num_inputs = module_.GetFunction("get_num_inputs"); + set_input = module_.GetFunction("set_input"); + get_output = module_.GetFunction("get_output"); + get_input = module_.GetFunction("get_input"); + get_input_index = module_.GetFunction("get_input_index"); + run = module_.GetFunction("run"); + } + ~RuntimeFunction() { + if (dlLocal) { + TVMArrayFree(dlLocal); + dlLocal = nullptr; + } + } + + DLTensor* CreateFromDLTensor(const DLTensor* from) { + DLTensor* ret = NULL; + TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, + kDLCPU, 0, &ret); + return ret; + } + + int NumOutputs() const { return get_num_output(); } + int NumInputs() const { return get_num_inputs(); } + + /* + when doing pipeline, the from data and to + data may comming from different device, for example + one from GPU another from VTA, here we need first + copy it into cpu type memory from GPU then copy the + cpu type memory into VTA, because current NDArray + copy not support cross device memory copy. + */ + void CopyFromTo(DLTensor* from, DLTensor* to) { + if (!(from->device.device_type == to->device.device_type || + from->device.device_type == kDLCPU || to->device.device_type == kDLCPU)) { + if (dlLocal == nullptr) { + dlLocal = CreateFromDLTensor(from); + } + TVMArrayCopyFromTo(from, dlLocal, nullptr); + from = dlLocal; + } + + TVMArrayCopyFromTo(from, to, nullptr); + } + + void SetInput(int index, DLTensor* data_in) { + /* + Here we can not use 'GetInput' of this class to replace + 'get_input' although it just be one more level wrap for + 'get_input', doing one more level wrap would + cause a NDArray copy and deconstruction after GetInput call, + when such NDArray comming from a RPC value, the deconstruction may + cause the remote data get free. then following operation for + such NDArray which linked a corrupt data would cause crash. + */ + NDArray input = get_input(index); + DLTensor* dlInput = const_cast(input.operator->()); + CopyFromTo(data_in, dlInput); + } + + void SetInput(const std::string& name, DLTensor* data_in) { + NDArray input = get_input(name); + DLTensor* dlInput = const_cast(input.operator->()); + CopyFromTo(data_in, dlInput); + } + + NDArray GetInput(const std::string& name) { return get_input(name); } + + NDArray GetOutput(int index) const { return get_output(index); } + + NDArray GetInput(int index) const { return get_input(index); } + + int GetInputIndex(const std::string& name) { return get_input_index(name); } + + void Run() { run(); } +}; + +class RuntimeData { + private: + shared_ptr runtimePtr; + int runtimeIndx; + /* Storage these data that need to get forwarding to + * next runtime. + */ + PipelineData forwardData; + + void ImportData(vector dlTensors, size_t inputsLen) { + assert(runtimePtr->NumInputs() >= inputsLen); + for (size_t i = 0; i < inputsLen; i++) { + /* + * Use SetInput which have logic to handle + * cross device memory copy to set input data. + */ + runtimePtr->SetInput(i, dlTensors[i]); + } + return; + } + + void ImportPipelineData(InputData** data, size_t inputsLen) { + assert(runtimePtr->NumInputs() >= inputsLen); + vector forwardDatas; + for (size_t i = 0; i < inputsLen; i++) { + /* + * Use SetInput which have logic to handle + * cross device memory copy to set input data. + */ + int inputIndx = data[i]->dependent.GetDepModInputIndx(runtimeIndx); + if (inputIndx >= 0) { + runtimePtr->SetInput(inputIndx, data[i]->dlData.data); + /* data getused remove dependent reference for current runtime + */ + data[i]->dependent.RemoveDependentRef(runtimeIndx); + } + + /* save these data that need forwarding to next runtime. + */ + if (data[i]->dependent.NeedForward()) { + forwardDatas.push_back(data[i]); + } + } + + forwardData.Copy(forwardDatas, kDLCPU, 0); + return; + } + + public: + void ExportAppendData(vector>* outputs) { + forwardData.ExportAppendData(outputs); + return; + } + + void Init(shared_ptr runtime, int Indx) { + runtimeIndx = Indx; + runtimePtr = runtime; + } + + RuntimeData& operator=(const SLOT& slot) { + ImportPipelineData(slot.data.inputList, slot.data.num); + + return *this; + } +}; + +class RuntimeItem { + public: + shared_ptr prev = nullptr; + shared_ptr next = nullptr; + + RUNTIME_PIPELINE_OUTPUT_CONF runtime_pipeline_output_conf; + int runtimeIndx; + int inputsNum; + RuntimeData rData; + TControl control; + QUEUE* queue = nullptr; + thread t; + shared_ptr runtimePtr = nullptr; + RuntimeItem(Module mod, QUEUE* inputQueue, RUNTIME_PIPELINE_OUTPUT_CONF* pconfig, int indx) { + if (runtimePtr == nullptr) { + runtimePtr = make_shared(mod); + inputsNum = runtimePtr->NumOutputs(); + runtimeIndx = indx; + rData.Init(runtimePtr, runtimeIndx); + } + + if (!queue) { + queue = inputQueue; + } + runtime_pipeline_output_conf = *pconfig; + runtimeIndx = indx; + } + + RuntimeItem(void) {} + + void Run(void) { runtimePtr->Run(); } + + bool waitPipeLineData(bool bPollSuc) { + /* + wait input data ready. + */ + return control.wait(bPollSuc); + } + + void notifyDataReadyToNext(void) { + if (next) { + next->control.notify(); + } + } + + void notifyNextExit(void) { + if (next) { + next->control.exit_notify(&next->t); + } + } + + void GetOutput(vector>* outputs) { + size_t outputsNum = runtimePtr->NumOutputs(); + for (size_t i = 0; i < outputsNum; i++) { + shared_ptr output = + make_shared(runtimePtr->GetOutput(i), i , runtime_pipeline_output_conf); + + outputs->push_back(output); + } + + rData.ExportAppendData(outputs); + return; + } +}; + +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py new file mode 100644 index 000000000000..c7dd211eea97 --- /dev/null +++ b/tests/python/relay/test_pipeline_executor.py @@ -0,0 +1,342 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np +import tvm +import tvm.testing +from tvm import relay +from tvm.relay import transform +from tvm.contrib import graph_executor, pipeline_executor + +""" +Split graph into a serial of sbgraph. +""" +def pipeline_graph(expr, indices): + """Split Graph Into A Group Of Subgraph + Parameters + ---------- + expr : tvm.relay.Expr + indices : Array[int] + Returns + ------- + ret : Array[tvm.relay.IRModule] + """ + + def run_opt_pass(expr, opt_pass): + """Exectue a relay pass""" + assert isinstance(opt_pass, tvm.transform.Pass) + mod = tvm.IRModule.from_expr(expr) + mod = tvm.relay.transform.InferType()(mod) + mod = opt_pass(mod) + entry = mod["main"] + return entry if isinstance(expr, tvm.relay.Function) else entry.body + + def _operator_idx_inc(expr, operator_current_idx): + """Increase operator index""" + if not isinstance(expr, tvm.relay.expr.Constant): + operator_current_idx = operator_current_idx + 1 + + return operator_current_idx + + def merge_constant_expr(constant_expr, expr): + # merge constant express with a express + # Parameters + # ---------- + # constant_expr: + # constant expression + # expr: + # expression to merge with constant expression + + # If body not let, then reached end of the express + if not isinstance(constant_expr.body, tvm.relay.expr.Let): + return tvm.relay.expr.Let(constant_expr.var, constant_expr.value, expr) + + return tvm.relay.expr.Let( + constant_expr.var, constant_expr.value, merge_constant_expr(constant_expr.body, expr) + ) + + def _recursion(anf, operator_indx, pipeline_mods, indices, constant_expr): + # Enumrate all operator of compute graph then split the compute graph + # into a group subgraph. + # Parameters + # ---------- + # anf: + # ANF format expression + # operator_indx: + # current operator indice + # pipeline_mods: + # the subgraph list get storage in this variable + # indices: + # Array of indices use to define the subgraph scope + # constant_expr: + # constant defined before current operator + + # Do the split work + if isinstance(anf, tvm.relay.Function): + return tvm.relay.Function( + anf.params, + _recursion(anf.body, operator_indx, pipeline_mods, indices, constant_expr), + anf.ret_type, + anf.type_params, + anf.attrs, + ) + if isinstance(anf, tvm.relay.expr.Let): + value = anf.value + operator_indx = _operator_idx_inc(value, operator_indx) + + # record constan expr to make sure all sugraph can find correct + # constant. + if isinstance(value, tvm.relay.expr.Constant): + if not constant_expr: + constant_expr = tvm.relay.expr.Let(anf.var, value, anf.var) + else: + constant_expr = tvm.relay.expr.Let(anf.var, value, constant_expr) + + if isinstance(value, tvm.relay.expr.Call): + if isinstance(value.op, tvm.ir.Op): + + # if have expr a(b(c(d(e)))) and indexes are [1,2,3] + # then would get separate modules for a(b),c,d(e). + # the split area is a(b)[0,1] c[2,2] d(e)[2,3] + if indices and operator_indx == indices[0]: + indices.pop(0) + ann = _recursion( + anf.body, operator_indx, pipeline_mods, indices, constant_expr + ) + + # when current subgraph use previous subgraph constant, + # such constant may become free varaible due to the constant + # not exist, merge the previous constant with current subgraph + # to avoid such issue. + if constant_expr: + ann = merge_constant_expr(constant_expr, ann) + + ann = run_opt_pass(ann, transform.ToGraphNormalForm()) + mod = tvm.IRModule.from_expr(ann) + pipeline_mods.insert(0, mod) + return tvm.relay.expr.Let(anf.var, value, anf.var) + return tvm.relay.expr.Let( + anf.var, + value, + _recursion(anf.body, operator_indx, pipeline_mods, indices, constant_expr), + ) + else: + return anf + + pipeline_mods = [] + + # operator count start from 0, then initial value get set into -1 + operator_indx = -1 + constant_expr = None + subgraph_indices = indices.copy() + anf = run_opt_pass(expr, transform.ToANormalForm()) + anf = run_opt_pass(anf, transform.InferType()) + ann = _recursion(anf, operator_indx, pipeline_mods, subgraph_indices, constant_expr) + ann = run_opt_pass(ann.body, transform.ToGraphNormalForm()) + mod = tvm.IRModule.from_expr(ann) + pipeline_mods.insert(0, mod) + return pipeline_mods + +def run_modules(mod_configs, dev, target, dname, data, iMod, iName, iData): + mod_input = {} + final_output = {} + indx = 0 + for mod in mod_configs: + with tvm.transform.PassContext(opt_level=3): + lib = relay.build(mod, target) + + m = graph_executor.GraphModule(lib["default"](dev)) + # Get input information + mod_key = indx + if mod_key in mod_input: + for input in mod_input[mod_key]: + input = mod_input[mod_key][input] + m.set_input(input["index"], input["data"]) + else: + m.set_input(dname, data) + + # set input for specify module + if mod == iMod: + m.set_input(iName, iData) + + m.run() + n = m.get_num_outputs() + # parse mod_config and set current output as next mod input data + mconfig = mod_configs[mod] + for output in mconfig["pipeline"]["output"]: + output_data = m.get_output(output["output_indx"]).asnumpy() + for dep in output["dependent"]: + # currnet output use as dependent input, + # input_name indicate the input index number. + mod_indx = dep["mod_indx"] + input_name = dep["input_name"] + if mod_indx == -1: + final_output[input_name] = output_data + else: + if mod_indx in mod_input: + mod_input[mod_indx][input_name] = {"index": input_name, "data": output_data} + else: + mod_input[mod_indx] = { + input_name: {"index": input_name, "data": output_data} + } + indx = indx + 1 + + return final_output + +def get_network(): + dshape = (3, 3) + data = relay.var("data", relay.TensorType(dshape, "float32")) + data21 = relay.var("data_1", relay.TensorType(dshape, "float32")) + mvalue1 = np.full((1), 1).astype("float32") + mvalue2 = np.full((1), 2).astype("float32") + mvalue3 = np.full((1), 3).astype("float32") + mv1 = relay.Constant(tvm.nd.array(mvalue1)) + mv2 = relay.Constant(tvm.nd.array(mvalue2)) + mv3 = relay.Constant(tvm.nd.array(mvalue3)) + data = relay.var("data", relay.TensorType(dshape, "float32")) + net = relay.add(data, mv1) + net = relay.multiply(net, mv3) + + net = relay.add(net, mv2) + net = relay.add(net, data21) + net = relay.add(net, mv3) + + net = relay.multiply(net, mv3) + net_output2 = relay.subtract(net, mv2) + net = relay.add(net, mv3) + func = relay.Function([data, data21], net) + mod = tvm.IRModule.from_expr(func) + return mod, dshape + +def get_split_mod(): + mod, dshape = get_network() + """ + #split compute graph into 4 subgraph + """ + pl = [2, 5] + mods = pipeline_graph(mod["main"], pl) + return mods, dshape + +def run_pipeline(target): + """ + #Get 4 pipeline module. + """ + mods, dshape = get_split_mod() + """ + #Prepare batch data for pipeline feeding + """ + datas = [] + for i in range(len(mods) + 1): + datas.append(np.full(dshape, 3 + i).astype("float32")) + + # set configure + indx = 0 + mod_config = {} + mconfig = {"target_host": None, "mod_name": "default", "build": None, "params": None} + mconfig1 = mconfig.copy() + mconfig1["target"] = target[0] + mconfig1["dev"] = target[1] + # third output is final output, second output for mod3, first for mod2 + # input + mconfig1["pipeline"] = { + "mod_indx": 0, + "output": [ + {"output_indx": 0, "dependent": [{"mod_indx": 1, "input_name": "x"}]}, + ], + } + mod_config[mods[0]] = mconfig1 + + mconfig2 = mconfig.copy() + mconfig2["target"] = "llvm" + mconfig2["dev"] = tvm.cpu(0) + mconfig2["pipeline"] = { + "mod_indx": 1, + "output": [ + {"output_indx": 0, "dependent": [{"mod_indx": 2, "input_name": "x"}]}, + ], + } + mod_config[mods[1]] = mconfig2 + + mconfig3 = mconfig.copy() + mconfig3["target"] = "llvm" + mconfig3["dev"] = tvm.cpu(0) + + mconfig3["pipeline"] = { + "mod_indx": 2, + "output": [{"output_indx": 0, "dependent": [{"mod_indx": -1, "input_name": "0"}]}], + } + mod_config[mods[2]] = mconfig3 + + """ + #Run with graph executor for verification purpose + """ + outs = [ + run_modules(mod_config, tvm.cpu(), "llvm", "data", data, mods[1], "data_1", data) + for data in datas + ] + """ + + + #build and create pipeline module + """ + with relay.build_config(opt_level=3): + pipeline_mods, string_config = pipeline_executor.build_pipeline( + mod_config, "/scratch/hj/data/" + ) + + pipeline_module = pipeline_executor.create(pipeline_mods, string_config) + + """ + #Use pipeline executor to pipeline the said pipeline which use different backend + """ + d3 = np.full(dshape, 10).astype("float32") + for data in datas: + pipeline_module.set_input("data", data) + pipeline_module.set_input("data_1", data, mod_idx=1) + pipeline_module.run() + + """ + Get result + """ + pipeline_outputs = [] + for i in range(len(datas)): + curOutputs = [output.asnumpy() for output in pipeline_module.get_output()] + pipeline_outputs.append(curOutputs) + + """ + #Stop pipeline execution. + """ + pipeline_module.stop() + """ + + #Verify result + """ + for ref_out, out in zip(outs, pipeline_outputs): + for ref in ref_out: + tvm.testing.assert_allclose(ref_out[ref], out[int(ref) - 1]) + print(ref_out[ref]) + + +def test_pipeline(): + if pipeline_executor.pipeline_executor_enabled(): + target_list = tvm.testing.enabled_targets() + for target in target_list: + run_pipeline(target) + + +if __name__ == "__main__": + test_pipeline()