From 2d7d1afeeb49d6ddcbf1372fe7303719e98b307e Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Wed, 6 Sep 2023 09:05:28 +0000 Subject: [PATCH 1/9] replicate OP version0.1 --- deps/fmt | 2 +- lib/runtime/src/ops/replicate.cc | 533 +++++++++++++++++++------------ lib/runtime/src/ops/replicate.h | 4 +- 3 files changed, 328 insertions(+), 211 deletions(-) diff --git a/deps/fmt b/deps/fmt index f5e54359df..a33701196a 160000 --- a/deps/fmt +++ b/deps/fmt @@ -1 +1 @@ -Subproject commit f5e54359df4c26b6230fc61d38aa294581393084 +Subproject commit a33701196adfad74917046096bf5a2aa0ab0bb50 diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 9504754550..6712ce0d39 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -15,7 +15,9 @@ #include "parallel_ops/replicate.h" #include "kernels/replicate_kernels.h" +#include "utils/exception.decl.h" #include "utils/hash-utils.h" +#include namespace FlexFlow { // declare Legion names @@ -40,6 +42,8 @@ using Legion::TaskLauncher; using namespace FlexFlow::Kernels::Replicate; +enum Slots { INPUT, OUTPUT, ATTRS, PROFILING }; + /* Params */ bool operator==(ReplicateParams const &lhs, ReplicateParams const &rhs) { return lhs.replicate_legion_dim == rhs.replicate_legion_dim && @@ -57,233 +61,346 @@ ReplicateParams Replicate::get_params() const { return params; } -Replicate::Replicate(FFModel &model, - const ParallelTensor _input, - int _replicate_legion_dim, - int _replicate_degree, - char const *name) - : ParallelOp(model, OP_REPLICATE, name, _input), - replicate_dim(_replicate_legion_dim), - replicate_degree(_replicate_degree) { - int numdim = _input->num_dims; - ParallelDim dims[MAX_TENSOR_DIM]; - for (int i = 0; i < numdim; i++) { - dims[i] = _input->dims[i]; - } - dims[replicate_dim].size *= replicate_degree; - dims[replicate_dim].degree *= replicate_degree; - ParallelTensorBase::update_parallel_ids(numdim, dims); - outputs[0] = model.create_parallel_tensor_legion_ordering( - numdim, dims, DT_FLOAT, this); - // inputs[0]->print("Replicate::input"); - // outputs[0]->print("Replicate::output"); +OpTaskInvocation init(ReplicateAttrs const &attrs) { + OpTaskBinding binding; + + binding.bind(INPUT, input_tensor(0)); + binding.bind(OUTPUT, output_tensor(0)); + + return {REPLICATE_INIT_TASK_ID, binding}; } -Replicate::Replicate(FFModel &model, - ReplicateParams const ¶ms, - ParallelTensor const input, - char const *name) - : Replicate(model, - input, - params.replicate_legion_dim, - params.replicate_degree, - name) {} - -void Replicate::create_input_partition(FFModel &ff) { - assert(outputs[0]->part != LogicalPartition::NO_PART); - assert(inputs[0]->part != LogicalPartition::NO_PART); - // input_lp is an aliased partitioning along the replica dim - ff.create_aliased_partition(outputs[0]->num_dims, - outputs[0]->dims, - replicate_dim, - outputs[0]->parallel_is, - inputs[0]->region, - input_lp); - // output_grad_lp is a disjoint partition - ff.create_disjoint_partition(inputs[0]->num_dims, - inputs[0]->dims, - inputs[0]->parallel_is, - outputs[0]->region_grad, - output_grad_lp); +OpTaskInvocation forward(ReplicateAttrs const &attrs) { + OpTaskBinding binding; + + binding.bind_arg(PROFILING, profiling_settings()); + + binding.bind(INPUT, input_tensor(0)); + binding.bind(OUTPUT, output_tensor(0)); + + return {REPLICATE_FWD_TASK_ID, binding}; } +OpTaskInvocation backward(ReplicateAttrs const &attrs) { + OpTaskBinding binding = infer_bwd_binding(forward(attrs).binding); -void Replicate::init(FFModel const &ff) { - // Do nothing - ArgumentMap argmap; - Context ctx = ff.config.lg_ctx; - Runtime *runtime = ff.config.lg_hlr; - assert(numOutputs == 1); - assert(numInputs == 1); - IndexLauncher launcher(REPLICATE_FWD_TASK_ID, - outputs[0]->parallel_is, - TaskArgument(NULL, 0), - argmap, - Predicate::TRUE_PRED, - false /*must*/, - 0 /*mapper_id*/, - outputs[0]->machine_view.hash()); - launcher.add_region_requirement(RegionRequirement( - input_lp, 0 /*projection id*/, READ_ONLY, EXCLUSIVE, inputs[0]->region)); - launcher.add_field(0, FID_DATA); - launcher.add_region_requirement(RegionRequirement(outputs[0]->part, - 0 /*projection id*/, - WRITE_ONLY, - EXCLUSIVE, - outputs[0]->region)); - launcher.add_field(1, FID_DATA); - runtime->execute_index_space(ctx, launcher); + return {REPLICATE_BWD_TASK_ID, binding}; } -void Replicate::forward(FFModel const &ff) { - ArgumentMap argmap; - Context ctx = ff.config.lg_ctx; - Runtime *runtime = ff.config.lg_hlr; - assert(numOutputs == 1); - assert(numInputs == 1); - IndexLauncher launcher(REPLICATE_FWD_TASK_ID, - outputs[0]->parallel_is, - TaskArgument(NULL, 0), - argmap, - Predicate::TRUE_PRED, - false /*must*/, - 0 /*mapper_id*/, - outputs[0]->machine_view.hash()); - launcher.add_region_requirement(RegionRequirement( - input_lp, 0 /*projection id*/, READ_ONLY, EXCLUSIVE, inputs[0]->region)); - launcher.add_field(0, FID_DATA); - launcher.add_region_requirement(RegionRequirement(outputs[0]->part, - 0 /*projection id*/, - WRITE_ONLY, - EXCLUSIVE, - outputs[0]->region)); - launcher.add_field(1, FID_DATA); - runtime->execute_index_space(ctx, launcher); +static optional forward_task_impl(TaskArgumentAccessor const &acc) { + ProfilingSettings profiling = acc.get_argument(PROFILING); + + auto input = acc.get_tensor(INPUT); + auto output = acc.get_tensor(OUTPUT); + + return profile(forward_kernel, + profiling, + "[replicate] forward_time = %.2lfms\n", + input, + output); } -void Replicate::backward(FFModel const &ff) { - ArgumentMap argmap; - Context ctx = ff.config.lg_ctx; - Runtime *runtime = ff.config.lg_hlr; - assert(numOutputs == 1); - assert(numInputs == 1); - IndexLauncher launcher(REPLICATE_BWD_TASK_ID, - inputs[0]->parallel_is, - TaskArgument(NULL, 0), - argmap, - Predicate::TRUE_PRED, - false /*must*/, - 0 /*mapper_id*/, - inputs[0]->machine_view.hash()); - launcher.add_region_requirement(RegionRequirement(output_grad_lp, - 0 /*projection id*/, - READ_ONLY, - EXCLUSIVE, - outputs[0]->region_grad)); - launcher.add_field(0, FID_DATA); - launcher.add_region_requirement(RegionRequirement(inputs[0]->part_grad, - 0 /*projection id*/, - READ_WRITE, - EXCLUSIVE, - inputs[0]->region_grad)); - launcher.add_field(1, FID_DATA); - runtime->execute_index_space(ctx, launcher); +static void forward_task(Task const *task, + std::vector const ®ions, + Context ctx, + Runtime *runtime) { + TaskArgumentAccessor acc(task, regions, ctx, runtime); + forward_task_impl(acc); } -bool Replicate::measure_operator_cost(Simulator *sim, - MachineView const &pc, - CostMetrics &cost_metrics) const { - cost_metrics = CostMetrics(); - cost_metrics.forward_time = 0.0f; - cost_metrics.backward_time = 0.0f; - - cost_metrics.sync_time = 0; - cost_metrics.inputs_memory = 0; - cost_metrics.outputs_memory = 0; - cost_metrics.weights_memory = 0; - return true; +static optional backward_task_impl(TaskArgumentAccessor const &acc) { + ProfilingSettings profiling = acc.get_argument(PROFILING); + + auto input_grad = acc.get_tensor_grad(INPUT); + auto output_grad = acc.get_tensor_grad(OUTPUT); + + return profile(backward_kernel, + profiling, + "[replicate] backward_time = %.2lfms\n", + input_grad, + output_grad); } -bool Replicate::get_int_parameter(PMParameter para, int *value) const { - switch (para) { - case PM_REPLICATE_DIM: - *value = replicate_dim; - return true; - case PM_REPLICATE_DEGREE: - *value = replicate_degree; - return true; - default: - return Op::get_int_parameter(para, value); - } +static void backward_task(Task const *task, + std::vector const ®ions, + Context ctx, + Runtime *runtime) { + TaskArgumentAccessor acc(task, regions, ctx, runtime); + backward_task_impl(acc); } -bool Replicate::append_parallel_op_info( - std::vector ¶llel_ops) const { - ParallelOpInfo ret; - ret.op_type = op_type; - ret.parallel_dim = replicate_dim; - ret.parallel_degree = replicate_degree; - parallel_ops.push_back(ret); - return true; +CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, + ReplicateAttrs const &attrs, + InputParallelTensorDesc const &input, + ProfilingSettings const &settings, + MachineView const &machine_view) { + // Note(lambda): Does replicate has cost? currently I assume the replicate has + // no cost + auto env = sim.new_environment(); + + float forward_time = 0.0; + float backward_time = 0.0; + float sync_time = 0.0; + return make_metrics(forward_time, backward_time, sync_time, env); } -void Replicate::forward_task(Task const *task, - std::vector const ®ions, - Context ctx, - Runtime *runtime) { - assert(regions.size() == 2); - assert(task->regions.size() == 2); - Domain input_domain = runtime->get_index_space_domain( - ctx, task->regions[0].region.get_index_space()); - Domain output_domain = runtime->get_index_space_domain( - ctx, task->regions[1].region.get_index_space()); - // Currently only support the outter most dimension - for (int i = 0; i < output_domain.get_dim() - 1; i++) { - assert(output_domain.lo()[i] == input_domain.lo()[i]); - assert(output_domain.hi()[i] == input_domain.hi()[i]); - } - assert(input_domain.get_volume() == output_domain.get_volume()); - float const *input_ptr = helperGetTensorPointerRO( - regions[0], task->regions[0], FID_DATA, ctx, runtime); - float *output_ptr = helperGetTensorPointerRW( - regions[1], task->regions[1], FID_DATA, ctx, runtime); - - forward_kernel(input_ptr, output_ptr, input_domain.get_volume()); +template <> +void register_task() { + OpTaskSignature init(OpTaskType::INIT); + + init.add_input_slot(INPUT); + init.add_output_slot(OUTPUT); + + // TODO: should we implement the init_task? how to do it? + // register_task(REPLICATE_INIT_TASK_ID, "Replicate init", init , init_task); } -void Replicate::backward_task(Task const *task, - std::vector const ®ions, - Context ctx, - Runtime *runtime) { - assert(regions.size() == 2); - assert(task->regions.size() == 2); - Domain output_grad_domain = runtime->get_index_space_domain( - ctx, task->regions[0].region.get_index_space()); - Domain input_grad_domain = runtime->get_index_space_domain( - ctx, task->regions[1].region.get_index_space()); - // Currently only support the outter most dimension - for (int i = 0; i < output_grad_domain.get_dim() - 1; i++) { - assert(output_grad_domain.lo()[i] == input_grad_domain.lo()[i]); - assert(output_grad_domain.hi()[i] == input_grad_domain.hi()[i]); - } - size_t num_elements = input_grad_domain.get_volume(); - size_t num_replicas = output_grad_domain.get_volume() / num_elements; - float const *output_grad_ptr = helperGetTensorPointerRO( - regions[0], task->regions[0], FID_DATA, ctx, runtime); - float *input_grad_ptr = helperGetTensorPointerRW( - regions[1], task->regions[1], FID_DATA, ctx, runtime); - - backward_kernel( - output_grad_ptr, input_grad_ptr, num_elements, num_replicas); +template <> +void register_task() { + OpTaskSignature fwd(OpTaskType::FWD); + + fwd.add_arg_slot(PROFILING); + fwd.add_input_slot(INPUT); + fwd.add_output_slot(OUTPUT); + + register_task(REPLICATE_FWD_TASK_ID, "Replicate fwd", fwd, forward_task); } -}; // namespace FlexFlow +template <> +void register_task() { + OpTaskSignature bwd = infer_bwd_signature(get_op_signature(CAST_FWD_TASK_ID)); -namespace std { -size_t hash::operator()( - FlexFlow::ReplicateParams const ¶ms) const { - size_t key = 0; - hash_combine(key, params.replicate_legion_dim); - hash_combine(key, params.replicate_degree); - return key; + register_task(REPLICATE_BWD_TASK_ID, "Replicate bwd", bwd, backward_task); } -}; // namespace std + +// Replicate::Replicate(FFModel &model, +// const ParallelTensor _input, +// int _replicate_legion_dim, +// int _replicate_degree, +// char const *name) +// : ParallelOp(model, OP_REPLICATE, name, _input), +// replicate_dim(_replicate_legion_dim), +// replicate_degree(_replicate_degree) { +// int numdim = _input->num_dims; +// ParallelDim dims[MAX_TENSOR_DIM]; +// for (int i = 0; i < numdim; i++) { +// dims[i] = _input->dims[i]; +// } +// dims[replicate_dim].size *= replicate_degree; +// dims[replicate_dim].degree *= replicate_degree; +// ParallelTensorBase::update_parallel_ids(numdim, dims); +// outputs[0] = model.create_parallel_tensor_legion_ordering( +// numdim, dims, DT_FLOAT, this); +// // inputs[0]->print("Replicate::input"); +// // outputs[0]->print("Replicate::output"); +// } + +// Replicate::Replicate(FFModel &model, +// ReplicateParams const ¶ms, +// ParallelTensor const input, +// char const *name) +// : Replicate(model, +// input, +// params.replicate_legion_dim, +// params.replicate_degree, +// name) {} + +// void Replicate::create_input_partition(FFModel &ff) { +// assert(outputs[0]->part != LogicalPartition::NO_PART); +// assert(inputs[0]->part != LogicalPartition::NO_PART); +// // input_lp is an aliased partitioning along the replica dim +// ff.create_aliased_partition(outputs[0]->num_dims, +// outputs[0]->dims, +// replicate_dim, +// outputs[0]->parallel_is, +// inputs[0]->region, +// input_lp); +// // output_grad_lp is a disjoint partition +// ff.create_disjoint_partition(inputs[0]->num_dims, +// inputs[0]->dims, +// inputs[0]->parallel_is, +// outputs[0]->region_grad, +// output_grad_lp); +// } + +// void Replicate::init(FFModel const &ff) { +// // Do nothing +// ArgumentMap argmap; +// Context ctx = ff.config.lg_ctx; +// Runtime *runtime = ff.config.lg_hlr; +// assert(numOutputs == 1); +// assert(numInputs == 1); +// IndexLauncher launcher(REPLICATE_FWD_TASK_ID, +// outputs[0]->parallel_is, +// TaskArgument(NULL, 0), +// argmap, +// Predicate::TRUE_PRED, +// false /*must*/, +// 0 /*mapper_id*/, +// outputs[0]->machine_view.hash()); +// launcher.add_region_requirement(RegionRequirement( +// input_lp, 0 /*projection id*/, READ_ONLY, EXCLUSIVE, +// inputs[0]->region)); +// launcher.add_field(0, FID_DATA); +// launcher.add_region_requirement(RegionRequirement(outputs[0]->part, +// 0 /*projection id*/, +// WRITE_ONLY, +// EXCLUSIVE, +// outputs[0]->region)); +// launcher.add_field(1, FID_DATA); +// runtime->execute_index_space(ctx, launcher); +// } + +// void Replicate::forward(FFModel const &ff) { +// ArgumentMap argmap; +// Context ctx = ff.config.lg_ctx; +// Runtime *runtime = ff.config.lg_hlr; +// assert(numOutputs == 1); +// assert(numInputs == 1); +// IndexLauncher launcher(REPLICATE_FWD_TASK_ID, +// outputs[0]->parallel_is, +// TaskArgument(NULL, 0), +// argmap, +// Predicate::TRUE_PRED, +// false /*must*/, +// 0 /*mapper_id*/, +// outputs[0]->machine_view.hash()); +// launcher.add_region_requirement(RegionRequirement( +// input_lp, 0 /*projection id*/, READ_ONLY, EXCLUSIVE, +// inputs[0]->region)); +// launcher.add_field(0, FID_DATA); +// launcher.add_region_requirement(RegionRequirement(outputs[0]->part, +// 0 /*projection id*/, +// WRITE_ONLY, +// EXCLUSIVE, +// outputs[0]->region)); +// launcher.add_field(1, FID_DATA); +// runtime->execute_index_space(ctx, launcher); +// } + +// void Replicate::backward(FFModel const &ff) { +// ArgumentMap argmap; +// Context ctx = ff.config.lg_ctx; +// Runtime *runtime = ff.config.lg_hlr; +// assert(numOutputs == 1); +// assert(numInputs == 1); +// IndexLauncher launcher(REPLICATE_BWD_TASK_ID, +// inputs[0]->parallel_is, +// TaskArgument(NULL, 0), +// argmap, +// Predicate::TRUE_PRED, +// false /*must*/, +// 0 /*mapper_id*/, +// inputs[0]->machine_view.hash()); +// launcher.add_region_requirement(RegionRequirement(output_grad_lp, +// 0 /*projection id*/, +// READ_ONLY, +// EXCLUSIVE, +// outputs[0]->region_grad)); +// launcher.add_field(0, FID_DATA); +// launcher.add_region_requirement(RegionRequirement(inputs[0]->part_grad, +// 0 /*projection id*/, +// READ_WRITE, +// EXCLUSIVE, +// inputs[0]->region_grad)); +// launcher.add_field(1, FID_DATA); +// runtime->execute_index_space(ctx, launcher); +// } + +// bool Replicate::measure_operator_cost(Simulator *sim, +// MachineView const &pc, +// CostMetrics &cost_metrics) const { +// cost_metrics = CostMetrics(); +// cost_metrics.forward_time = 0.0f; +// cost_metrics.backward_time = 0.0f; + +// cost_metrics.sync_time = 0; +// cost_metrics.inputs_memory = 0; +// cost_metrics.outputs_memory = 0; +// cost_metrics.weights_memory = 0; +// return true; +// } + +// bool Replicate::get_int_parameter(PMParameter para, int *value) const { +// switch (para) { +// case PM_REPLICATE_DIM: +// *value = replicate_dim; +// return true; +// case PM_REPLICATE_DEGREE: +// *value = replicate_degree; +// return true; +// default: +// return Op::get_int_parameter(para, value); +// } +// } + +// bool Replicate::append_parallel_op_info( +// std::vector ¶llel_ops) const { +// ParallelOpInfo ret; +// ret.op_type = op_type; +// ret.parallel_dim = replicate_dim; +// ret.parallel_degree = replicate_degree; +// parallel_ops.push_back(ret); +// return true; +// } + +// void Replicate::forward_task(Task const *task, +// std::vector const ®ions, +// Context ctx, +// Runtime *runtime) { +// assert(regions.size() == 2); +// assert(task->regions.size() == 2); +// Domain input_domain = runtime->get_index_space_domain( +// ctx, task->regions[0].region.get_index_space()); +// Domain output_domain = runtime->get_index_space_domain( +// ctx, task->regions[1].region.get_index_space()); +// // Currently only support the outter most dimension +// for (int i = 0; i < output_domain.get_dim() - 1; i++) { +// assert(output_domain.lo()[i] == input_domain.lo()[i]); +// assert(output_domain.hi()[i] == input_domain.hi()[i]); +// } +// assert(input_domain.get_volume() == output_domain.get_volume()); +// float const *input_ptr = helperGetTensorPointerRO( +// regions[0], task->regions[0], FID_DATA, ctx, runtime); +// float *output_ptr = helperGetTensorPointerRW( +// regions[1], task->regions[1], FID_DATA, ctx, runtime); + +// forward_kernel(input_ptr, output_ptr, input_domain.get_volume()); +// } + +// void Replicate::backward_task(Task const *task, +// std::vector const ®ions, +// Context ctx, +// Runtime *runtime) { +// assert(regions.size() == 2); +// assert(task->regions.size() == 2); +// Domain output_grad_domain = runtime->get_index_space_domain( +// ctx, task->regions[0].region.get_index_space()); +// Domain input_grad_domain = runtime->get_index_space_domain( +// ctx, task->regions[1].region.get_index_space()); +// // Currently only support the outter most dimension +// for (int i = 0; i < output_grad_domain.get_dim() - 1; i++) { +// assert(output_grad_domain.lo()[i] == input_grad_domain.lo()[i]); +// assert(output_grad_domain.hi()[i] == input_grad_domain.hi()[i]); +// } +// size_t num_elements = input_grad_domain.get_volume(); +// size_t num_replicas = output_grad_domain.get_volume() / num_elements; +// float const *output_grad_ptr = helperGetTensorPointerRO( +// regions[0], task->regions[0], FID_DATA, ctx, runtime); +// float *input_grad_ptr = helperGetTensorPointerRW( +// regions[1], task->regions[1], FID_DATA, ctx, runtime); + +// backward_kernel( +// output_grad_ptr, input_grad_ptr, num_elements, num_replicas); +// } + +// }; // namespace FlexFlow + +// namespace std { +// size_t hash::operator()( +// FlexFlow::ReplicateParams const ¶ms) const { +// size_t key = 0; +// hash_combine(key, params.replicate_legion_dim); +// hash_combine(key, params.replicate_degree); +// return key; +// } +}; // namespace FlexFlow diff --git a/lib/runtime/src/ops/replicate.h b/lib/runtime/src/ops/replicate.h index fd5ffd9ef9..9880f0991b 100644 --- a/lib/runtime/src/ops/replicate.h +++ b/lib/runtime/src/ops/replicate.h @@ -2,8 +2,8 @@ #define _FLEXFLOW_REPLICATE_H #include "op-attrs/ops/replicate.h" -#include "op_task_invocation.h" #include "sim_environment.h" +#include "task_spec/op_task_invocation.h" namespace FlexFlow { @@ -20,7 +20,7 @@ OpTaskInvocation backward(ReplicateAttrs const &); CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, ReplicateAttrs const &attrs, - ParallelTensorShape const &input_shape, + InputParallelTensorDesc const &input, ProfilingSettings const &settings, MachineView const &machine_view); From a2cc5dd8ab6ecf59c876b6a9483c66d58c6fd72a Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Sun, 24 Sep 2023 19:38:48 +0000 Subject: [PATCH 2/9] update the replicate --- lib/runtime/src/ops/replicate.cc | 250 ++----------------------------- 1 file changed, 14 insertions(+), 236 deletions(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 6712ce0d39..c49523d8ec 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -64,7 +64,7 @@ ReplicateParams Replicate::get_params() const { OpTaskInvocation init(ReplicateAttrs const &attrs) { OpTaskBinding binding; - binding.bind(INPUT, input_tensor(0)); + binding.bind(INPUT, input_parallel_tensor_shape(0)); binding.bind(OUTPUT, output_tensor(0)); return {REPLICATE_INIT_TASK_ID, binding}; @@ -75,7 +75,7 @@ OpTaskInvocation forward(ReplicateAttrs const &attrs) { binding.bind_arg(PROFILING, profiling_settings()); - binding.bind(INPUT, input_tensor(0)); + binding.bind(INPUT, input_parallel_tensor_shape(0)); binding.bind(OUTPUT, output_tensor(0)); return {REPLICATE_FWD_TASK_ID, binding}; @@ -136,10 +136,19 @@ CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, // Note(lambda): Does replicate has cost? currently I assume the replicate has // no cost auto env = sim.new_environment(); + SimTaskBinding fwd_binding; + fwd_binding.bind_arg(PROFILING, settings); + fwd_binding.bind(INPUT, input_parallel_tensor_shape(0)); + fwd_binding.bind(OUTPUT, output_tensor(0)); - float forward_time = 0.0; - float backward_time = 0.0; - float sync_time = 0.0; + SimTaskBinding bwd_binding = infer_bwd_binding(fwd_binding); + auto fwd_accessor = env.get_fwd_accessor(TOPK_FWD_TASK_ID, fwd_binding); + auto bwd_accessor = env.get_bwd_accessor(TOPK_BWD_TASK_ID, bwd_binding); + + float forward_time = forward_task_impl(fwd_accessor).value(); + float backward_time = backward_task_impl(bwd_accessor).value(); + + float sync_time = default_estimate_sync_time(env); return make_metrics(forward_time, backward_time, sync_time, env); } @@ -172,235 +181,4 @@ void register_task() { register_task(REPLICATE_BWD_TASK_ID, "Replicate bwd", bwd, backward_task); } -// Replicate::Replicate(FFModel &model, -// const ParallelTensor _input, -// int _replicate_legion_dim, -// int _replicate_degree, -// char const *name) -// : ParallelOp(model, OP_REPLICATE, name, _input), -// replicate_dim(_replicate_legion_dim), -// replicate_degree(_replicate_degree) { -// int numdim = _input->num_dims; -// ParallelDim dims[MAX_TENSOR_DIM]; -// for (int i = 0; i < numdim; i++) { -// dims[i] = _input->dims[i]; -// } -// dims[replicate_dim].size *= replicate_degree; -// dims[replicate_dim].degree *= replicate_degree; -// ParallelTensorBase::update_parallel_ids(numdim, dims); -// outputs[0] = model.create_parallel_tensor_legion_ordering( -// numdim, dims, DT_FLOAT, this); -// // inputs[0]->print("Replicate::input"); -// // outputs[0]->print("Replicate::output"); -// } - -// Replicate::Replicate(FFModel &model, -// ReplicateParams const ¶ms, -// ParallelTensor const input, -// char const *name) -// : Replicate(model, -// input, -// params.replicate_legion_dim, -// params.replicate_degree, -// name) {} - -// void Replicate::create_input_partition(FFModel &ff) { -// assert(outputs[0]->part != LogicalPartition::NO_PART); -// assert(inputs[0]->part != LogicalPartition::NO_PART); -// // input_lp is an aliased partitioning along the replica dim -// ff.create_aliased_partition(outputs[0]->num_dims, -// outputs[0]->dims, -// replicate_dim, -// outputs[0]->parallel_is, -// inputs[0]->region, -// input_lp); -// // output_grad_lp is a disjoint partition -// ff.create_disjoint_partition(inputs[0]->num_dims, -// inputs[0]->dims, -// inputs[0]->parallel_is, -// outputs[0]->region_grad, -// output_grad_lp); -// } - -// void Replicate::init(FFModel const &ff) { -// // Do nothing -// ArgumentMap argmap; -// Context ctx = ff.config.lg_ctx; -// Runtime *runtime = ff.config.lg_hlr; -// assert(numOutputs == 1); -// assert(numInputs == 1); -// IndexLauncher launcher(REPLICATE_FWD_TASK_ID, -// outputs[0]->parallel_is, -// TaskArgument(NULL, 0), -// argmap, -// Predicate::TRUE_PRED, -// false /*must*/, -// 0 /*mapper_id*/, -// outputs[0]->machine_view.hash()); -// launcher.add_region_requirement(RegionRequirement( -// input_lp, 0 /*projection id*/, READ_ONLY, EXCLUSIVE, -// inputs[0]->region)); -// launcher.add_field(0, FID_DATA); -// launcher.add_region_requirement(RegionRequirement(outputs[0]->part, -// 0 /*projection id*/, -// WRITE_ONLY, -// EXCLUSIVE, -// outputs[0]->region)); -// launcher.add_field(1, FID_DATA); -// runtime->execute_index_space(ctx, launcher); -// } - -// void Replicate::forward(FFModel const &ff) { -// ArgumentMap argmap; -// Context ctx = ff.config.lg_ctx; -// Runtime *runtime = ff.config.lg_hlr; -// assert(numOutputs == 1); -// assert(numInputs == 1); -// IndexLauncher launcher(REPLICATE_FWD_TASK_ID, -// outputs[0]->parallel_is, -// TaskArgument(NULL, 0), -// argmap, -// Predicate::TRUE_PRED, -// false /*must*/, -// 0 /*mapper_id*/, -// outputs[0]->machine_view.hash()); -// launcher.add_region_requirement(RegionRequirement( -// input_lp, 0 /*projection id*/, READ_ONLY, EXCLUSIVE, -// inputs[0]->region)); -// launcher.add_field(0, FID_DATA); -// launcher.add_region_requirement(RegionRequirement(outputs[0]->part, -// 0 /*projection id*/, -// WRITE_ONLY, -// EXCLUSIVE, -// outputs[0]->region)); -// launcher.add_field(1, FID_DATA); -// runtime->execute_index_space(ctx, launcher); -// } - -// void Replicate::backward(FFModel const &ff) { -// ArgumentMap argmap; -// Context ctx = ff.config.lg_ctx; -// Runtime *runtime = ff.config.lg_hlr; -// assert(numOutputs == 1); -// assert(numInputs == 1); -// IndexLauncher launcher(REPLICATE_BWD_TASK_ID, -// inputs[0]->parallel_is, -// TaskArgument(NULL, 0), -// argmap, -// Predicate::TRUE_PRED, -// false /*must*/, -// 0 /*mapper_id*/, -// inputs[0]->machine_view.hash()); -// launcher.add_region_requirement(RegionRequirement(output_grad_lp, -// 0 /*projection id*/, -// READ_ONLY, -// EXCLUSIVE, -// outputs[0]->region_grad)); -// launcher.add_field(0, FID_DATA); -// launcher.add_region_requirement(RegionRequirement(inputs[0]->part_grad, -// 0 /*projection id*/, -// READ_WRITE, -// EXCLUSIVE, -// inputs[0]->region_grad)); -// launcher.add_field(1, FID_DATA); -// runtime->execute_index_space(ctx, launcher); -// } - -// bool Replicate::measure_operator_cost(Simulator *sim, -// MachineView const &pc, -// CostMetrics &cost_metrics) const { -// cost_metrics = CostMetrics(); -// cost_metrics.forward_time = 0.0f; -// cost_metrics.backward_time = 0.0f; - -// cost_metrics.sync_time = 0; -// cost_metrics.inputs_memory = 0; -// cost_metrics.outputs_memory = 0; -// cost_metrics.weights_memory = 0; -// return true; -// } - -// bool Replicate::get_int_parameter(PMParameter para, int *value) const { -// switch (para) { -// case PM_REPLICATE_DIM: -// *value = replicate_dim; -// return true; -// case PM_REPLICATE_DEGREE: -// *value = replicate_degree; -// return true; -// default: -// return Op::get_int_parameter(para, value); -// } -// } - -// bool Replicate::append_parallel_op_info( -// std::vector ¶llel_ops) const { -// ParallelOpInfo ret; -// ret.op_type = op_type; -// ret.parallel_dim = replicate_dim; -// ret.parallel_degree = replicate_degree; -// parallel_ops.push_back(ret); -// return true; -// } - -// void Replicate::forward_task(Task const *task, -// std::vector const ®ions, -// Context ctx, -// Runtime *runtime) { -// assert(regions.size() == 2); -// assert(task->regions.size() == 2); -// Domain input_domain = runtime->get_index_space_domain( -// ctx, task->regions[0].region.get_index_space()); -// Domain output_domain = runtime->get_index_space_domain( -// ctx, task->regions[1].region.get_index_space()); -// // Currently only support the outter most dimension -// for (int i = 0; i < output_domain.get_dim() - 1; i++) { -// assert(output_domain.lo()[i] == input_domain.lo()[i]); -// assert(output_domain.hi()[i] == input_domain.hi()[i]); -// } -// assert(input_domain.get_volume() == output_domain.get_volume()); -// float const *input_ptr = helperGetTensorPointerRO( -// regions[0], task->regions[0], FID_DATA, ctx, runtime); -// float *output_ptr = helperGetTensorPointerRW( -// regions[1], task->regions[1], FID_DATA, ctx, runtime); - -// forward_kernel(input_ptr, output_ptr, input_domain.get_volume()); -// } - -// void Replicate::backward_task(Task const *task, -// std::vector const ®ions, -// Context ctx, -// Runtime *runtime) { -// assert(regions.size() == 2); -// assert(task->regions.size() == 2); -// Domain output_grad_domain = runtime->get_index_space_domain( -// ctx, task->regions[0].region.get_index_space()); -// Domain input_grad_domain = runtime->get_index_space_domain( -// ctx, task->regions[1].region.get_index_space()); -// // Currently only support the outter most dimension -// for (int i = 0; i < output_grad_domain.get_dim() - 1; i++) { -// assert(output_grad_domain.lo()[i] == input_grad_domain.lo()[i]); -// assert(output_grad_domain.hi()[i] == input_grad_domain.hi()[i]); -// } -// size_t num_elements = input_grad_domain.get_volume(); -// size_t num_replicas = output_grad_domain.get_volume() / num_elements; -// float const *output_grad_ptr = helperGetTensorPointerRO( -// regions[0], task->regions[0], FID_DATA, ctx, runtime); -// float *input_grad_ptr = helperGetTensorPointerRW( -// regions[1], task->regions[1], FID_DATA, ctx, runtime); - -// backward_kernel( -// output_grad_ptr, input_grad_ptr, num_elements, num_replicas); -// } - -// }; // namespace FlexFlow - -// namespace std { -// size_t hash::operator()( -// FlexFlow::ReplicateParams const ¶ms) const { -// size_t key = 0; -// hash_combine(key, params.replicate_legion_dim); -// hash_combine(key, params.replicate_degree); -// return key; -// } }; // namespace FlexFlow From fc35175cafff7a5e24120c818d34efc044cabb84 Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Wed, 27 Sep 2023 20:31:23 +0000 Subject: [PATCH 3/9] use exceptions --- lib/runtime/src/ops/replicate.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index c49523d8ec..1a17bd0682 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -15,7 +15,7 @@ #include "parallel_ops/replicate.h" #include "kernels/replicate_kernels.h" -#include "utils/exception.decl.h" +#include "utils/exceptions.h" #include "utils/hash-utils.h" #include From 6cfd43135de966c41c8ccda7481545381d4460f0 Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Wed, 27 Sep 2023 21:43:36 +0000 Subject: [PATCH 4/9] remove the init related because the replicate doesn't need a perdervicestate --- lib/runtime/src/ops/replicate.cc | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 1a17bd0682..69113b8a24 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -42,7 +42,7 @@ using Legion::TaskLauncher; using namespace FlexFlow::Kernels::Replicate; -enum Slots { INPUT, OUTPUT, ATTRS, PROFILING }; +enum Slots {INPUT, OUTPUT, PROFILING }; /* Params */ bool operator==(ReplicateParams const &lhs, ReplicateParams const &rhs) { @@ -61,15 +61,6 @@ ReplicateParams Replicate::get_params() const { return params; } -OpTaskInvocation init(ReplicateAttrs const &attrs) { - OpTaskBinding binding; - - binding.bind(INPUT, input_parallel_tensor_shape(0)); - binding.bind(OUTPUT, output_tensor(0)); - - return {REPLICATE_INIT_TASK_ID, binding}; -} - OpTaskInvocation forward(ReplicateAttrs const &attrs) { OpTaskBinding binding; @@ -152,17 +143,6 @@ CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, return make_metrics(forward_time, backward_time, sync_time, env); } -template <> -void register_task() { - OpTaskSignature init(OpTaskType::INIT); - - init.add_input_slot(INPUT); - init.add_output_slot(OUTPUT); - - // TODO: should we implement the init_task? how to do it? - // register_task(REPLICATE_INIT_TASK_ID, "Replicate init", init , init_task); -} - template <> void register_task() { OpTaskSignature fwd(OpTaskType::FWD); From 9b8966d4cfc6038fd9c3b7feea36cf3d068486c5 Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Tue, 3 Oct 2023 15:13:35 +0000 Subject: [PATCH 5/9] remove the old code --- lib/runtime/src/ops/replicate.cc | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 69113b8a24..2f02a28fed 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -44,23 +44,6 @@ using namespace FlexFlow::Kernels::Replicate; enum Slots {INPUT, OUTPUT, PROFILING }; -/* Params */ -bool operator==(ReplicateParams const &lhs, ReplicateParams const &rhs) { - return lhs.replicate_legion_dim == rhs.replicate_legion_dim && - lhs.replicate_degree == rhs.replicate_degree; -} - -bool ReplicateParams::is_valid(ParallelTensorShape const &input) const { - return input.is_valid(); -} - -ReplicateParams Replicate::get_params() const { - ReplicateParams params; - params.replicate_legion_dim = this->replicate_dim; - params.replicate_degree = this->replicate_degree; - return params; -} - OpTaskInvocation forward(ReplicateAttrs const &attrs) { OpTaskBinding binding; From 63aa45bd1b8d795373f8387daa6b7affa3cdddd4 Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Tue, 3 Oct 2023 15:16:54 +0000 Subject: [PATCH 6/9] remove the comment --- lib/runtime/src/ops/replicate.h | 36 --------------------------------- 1 file changed, 36 deletions(-) diff --git a/lib/runtime/src/ops/replicate.h b/lib/runtime/src/ops/replicate.h index 9880f0991b..da2b71f098 100644 --- a/lib/runtime/src/ops/replicate.h +++ b/lib/runtime/src/ops/replicate.h @@ -23,42 +23,6 @@ CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, InputParallelTensorDesc const &input, ProfilingSettings const &settings, MachineView const &machine_view); - -/* class Replicate : public ParallelOp { */ -/* public: */ -/* Replicate(FFModel &model, */ -/* ParallelTensor const &input, */ -/* int replicate_legion_dim, */ -/* int replicate_degree, */ -/* char const *name = NULL); */ -/* Replicate(FFModel &model, */ -/* ReplicateAttrs const &attrs, */ -/* std::vector const &inputs, */ -/* char const *name = nullptr); */ -/* void create_input_partition(FFModel &model) override; */ -/* void init(FFModel const &) override; */ -/* void forward(FFModel const &) override; */ -/* void backward(FFModel const &) override; */ -/* bool append_parallel_op_info( */ -/* std::vector ¶llel_ops) const override; */ -/* static void forward_task(Legion::Task const *task, */ -/* std::vector const - * ®ions, */ -/* Legion::Context ctx, */ -/* Legion::Runtime *runtime); */ -/* static void backward_task(Legion::Task const *task, */ -/* std::vector const - * ®ions, */ -/* Legion::Context ctx, */ -/* Legion::Runtime *runtime); */ -/* bool measure_operator_cost(Simulator *sim, */ -/* MachineView const &pc, */ -/* CostMetrics &cost_metrics) const override; */ - -/* public: */ -/* int replicate_dim, replicate_degree; */ -/* }; */ - } // namespace FlexFlow #endif From 9a07d9ffde4a22d61c9f616bf81584cf6fdaf0e2 Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Tue, 10 Oct 2023 20:11:00 +0000 Subject: [PATCH 7/9] fix the replicate.cc --- lib/runtime/src/ops/replicate.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 2f02a28fed..3e7c099bb9 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -15,7 +15,10 @@ #include "parallel_ops/replicate.h" #include "kernels/replicate_kernels.h" +#include "op-attrs/get_output_shapes.h" +#include "op-attrs/parallel_tensor_shape.h" #include "utils/exceptions.h" +#include "utils/graph/serialparallel.h" #include "utils/hash-utils.h" #include @@ -49,7 +52,7 @@ OpTaskInvocation forward(ReplicateAttrs const &attrs) { binding.bind_arg(PROFILING, profiling_settings()); - binding.bind(INPUT, input_parallel_tensor_shape(0)); + binding.bind(INPUT, input_tensor(0)); binding.bind(OUTPUT, output_tensor(0)); return {REPLICATE_FWD_TASK_ID, binding}; @@ -112,8 +115,9 @@ CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, auto env = sim.new_environment(); SimTaskBinding fwd_binding; fwd_binding.bind_arg(PROFILING, settings); - fwd_binding.bind(INPUT, input_parallel_tensor_shape(0)); - fwd_binding.bind(OUTPUT, output_tensor(0)); + ParallelTensorShape output = get_output_shape(input, attrs); + fwd_binding.bind(INPUT, input.shape); + fwd_binding.bind(OUTPUT, output); SimTaskBinding bwd_binding = infer_bwd_binding(fwd_binding); auto fwd_accessor = env.get_fwd_accessor(TOPK_FWD_TASK_ID, fwd_binding); From c429ebf9fad073f3ce391ca56ccd29415316e4a5 Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Tue, 10 Oct 2023 20:11:36 +0000 Subject: [PATCH 8/9] refine the code --- lib/runtime/src/ops/replicate.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 3e7c099bb9..4b30d5b423 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -45,7 +45,7 @@ using Legion::TaskLauncher; using namespace FlexFlow::Kernels::Replicate; -enum Slots {INPUT, OUTPUT, PROFILING }; +enum Slots { INPUT, OUTPUT, PROFILING }; OpTaskInvocation forward(ReplicateAttrs const &attrs) { OpTaskBinding binding; From 7b3e3ec4516c183bb16aea06fbf11f8fb762cdda Mon Sep 17 00:00:00 2001 From: lambda7xx Date: Tue, 17 Oct 2023 21:10:31 +0000 Subject: [PATCH 9/9] fix the typo --- lib/runtime/src/ops/replicate.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/runtime/src/ops/replicate.cc b/lib/runtime/src/ops/replicate.cc index 4b30d5b423..1675a62c5f 100644 --- a/lib/runtime/src/ops/replicate.cc +++ b/lib/runtime/src/ops/replicate.cc @@ -110,18 +110,16 @@ CostMetrics measure_operator_cost(SimEnvFactory const &sim_factory, InputParallelTensorDesc const &input, ProfilingSettings const &settings, MachineView const &machine_view) { - // Note(lambda): Does replicate has cost? currently I assume the replicate has - // no cost auto env = sim.new_environment(); SimTaskBinding fwd_binding; fwd_binding.bind_arg(PROFILING, settings); - ParallelTensorShape output = get_output_shape(input, attrs); + ParallelTensorShape output = get_output_shape(attrs, input.shape); fwd_binding.bind(INPUT, input.shape); fwd_binding.bind(OUTPUT, output); SimTaskBinding bwd_binding = infer_bwd_binding(fwd_binding); - auto fwd_accessor = env.get_fwd_accessor(TOPK_FWD_TASK_ID, fwd_binding); - auto bwd_accessor = env.get_bwd_accessor(TOPK_BWD_TASK_ID, bwd_binding); + auto fwd_accessor = env.get_fwd_accessor(REPLICATE_FWD_TASK_ID, fwd_binding); + auto bwd_accessor = env.get_bwd_accessor(REPLICATE_BWD_TASK_ID, bwd_binding); float forward_time = forward_task_impl(fwd_accessor).value(); float backward_time = backward_task_impl(bwd_accessor).value();