diff --git a/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h b/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h index ebfdefa478..b15b91e0e3 100644 --- a/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h +++ b/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h @@ -2,6 +2,7 @@ #define _FLEXFLOW_LIB_PCG_INCLUDE_PCG_MAPPED_PARALLEL_COMPUTATION_GRAPH_MAPPED_OPERATOR_TASK_GROUP_H #include "op-attrs/computation_graph_op_attrs.dtg.h" +#include "op-attrs/tensor_slot_name.dtg.h" #include "pcg/machine_space_coordinate.dtg.h" #include "pcg/mapped_parallel_computation_graph/operator_atomic_task_shard_binding.dtg.h" #include "utils/bidict/bidict.h" @@ -32,6 +33,10 @@ struct MappedOperatorTaskGroup { friend struct ::std::hash; }; +bidict + get_tensor_bindings_for_slot_name(MappedOperatorTaskGroup const &, + TensorSlotName const &); + std::string format_as(::FlexFlow::MappedOperatorTaskGroup const &); std::ostream &operator<<(std::ostream &, ::FlexFlow::MappedOperatorTaskGroup const &); diff --git a/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc b/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc index 4436efd727..8f9db7eac7 100644 --- a/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc +++ b/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc @@ -3,6 +3,7 @@ #include "op-attrs/operator_task_space.h" #include "op-attrs/parallel_tensor_space_coordinate.h" #include "pcg/mapped_parallel_computation_graph/operator_atomic_task_shard_binding.h" +#include "utils/bidict/algorithms/transform_values.h" #include "utils/bidict/generate_bidict.h" #include "utils/containers/are_all_distinct.h" #include "utils/containers/require_all_same.h" @@ -70,6 +71,17 @@ bidict const & return this->shard_bindings; } +bidict + get_tensor_bindings_for_slot_name(MappedOperatorTaskGroup const &task_group, + TensorSlotName const &slot_name) { + return transform_values(task_group.get_shard_bindings(), + [&](OperatorAtomicTaskShardBinding const &b) { + return ptensor_space_coord_for_slot_name(b, + slot_name); + }) + .reversed(); +} + std::string format_as(::FlexFlow::MappedOperatorTaskGroup const &m) { return fmt::format("", m.get_shard_bindings()); diff --git a/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml b/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml index 89feb11905..92e9de8145 100644 --- a/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml +++ b/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml @@ -5,7 +5,7 @@ features = [] docstring = ''' \brief Maps each shard-expanded DynamicNodeInvocation to its corresponding PerDeviceOpState. -PerDeviceOpStateBacking is to PerDeviceOpState as DistributedDeviceHandle is to \ref device_handle_t (i.e., FFHandle). +\ref PerDeviceOpStateBacking is to \ref PerDeviceOpState as \ref DistributedFfHandle is to \ref device_handle_t (i.e., FFHandle). ''' diff --git a/lib/realm-execution/include/realm-execution/realm_context.h b/lib/realm-execution/include/realm-execution/realm_context.h index e1180147fd..ab89e916c0 100644 --- a/lib/realm-execution/include/realm-execution/realm_context.h +++ b/lib/realm-execution/include/realm-execution/realm_context.h @@ -4,6 +4,7 @@ #include "kernels/allocation.h" #include "kernels/device_handle_t.dtg.h" #include "kernels/managed_per_device_ff_handle.h" +#include "op-attrs/parallel_tensor_shape.dtg.h" #include "op-attrs/tensor_shape.dtg.h" #include "pcg/device_id_t.dtg.h" #include "pcg/machine_space_coordinate.dtg.h" @@ -62,6 +63,17 @@ struct RealmContext { int priority = 0); ///\} + /** \name Data movement */ + ///\{ + Realm::Event issue_copy(ParallelTensorShape const &src_shape, + Realm::RegionInstance src_inst, + ParallelTensorShape const &dst_shape, + Realm::RegionInstance dst_inst, + Realm::ProfilingRequestSet const &requests, + Realm::Event wait_on = Realm::Event::NO_EVENT, + int priority = 0); + ///\} + /** \name Instance management */ ///\{ std::pair diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index 8263a49b0a..0ecd02143e 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -7,6 +7,7 @@ #include "realm-execution/realm_context.h" #include "realm-execution/tasks/impl/op_task.h" #include "realm-execution/tensor_instance_backing.h" +#include "task-spec/dynamic_graph/copy_insertion.h" #include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" #include "task-spec/dynamic_graph/dynamic_task_type.dtg.h" @@ -18,6 +19,7 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "task-spec/dynamic_graph/training_operation_attrs.dtg.h" #include "task-spec/dynamic_graph/update_insertion.h" +#include "utils/containers/get_only.h" #include "utils/containers/map_values.h" #include "utils/containers/transform.h" #include "utils/containers/try_at.h" @@ -104,6 +106,7 @@ PCGInstance create_pcg_instance( } dg = perform_update_insertion(dg, optimizer_attrs); + dg = perform_copy_insertion(dg); dg = perform_shard_expansion(dg); TensorInstanceBacking tensor_instance_backing = perform_instance_allocation(dg, inputs, ctx); @@ -157,6 +160,76 @@ PCGInstance create_pcg_instance( /*logit_grad_tensor=*/logit_grad_tensor}; } +/** + * \brief Spawn the Realm operations (tasks, copies, etc.) for a given \ref + * DynamicNodeInvocation, given the specified dependencies, instances, etc. Note + * that one \ref DynamicNodeInvocation may become multiple Realm operations + * (e.g., a parallel operator may turn into multiple copies). + */ +static Realm::Event spawn_dynamic_node_invocation( + RealmContext &ctx, + DynamicNodeInvocation const &invocation, + std::vector const &input_dependencies, + std::vector const &output_dependencies, + TensorInstanceBacking const &tensor_instance_backing, + PerDeviceOpStateBacking const &device_state_backing, + OptimizerAttrs const &optimizer_attrs, + ProfilingSettings const &profiling_settings, + DistributedFfHandle const &device_handle, + FFIterationConfig iteration_config) { + Realm::Event precondition = Realm::Event::merge_events( + Realm::Event::merge_events(input_dependencies), + Realm::Event::merge_events(output_dependencies)); + + TensorInstanceBacking tensor_backing = + subset_tensor_instance_backing_for_invocation(tensor_instance_backing, + invocation); + + auto spawn_task = [&]() { + Realm::Processor target_proc = ctx.map_device_coord_to_processor( + assert_unwrap(invocation.node_attrs.device_coord)); + return spawn_op_task(ctx, + target_proc, + invocation, + tensor_backing, + try_at(device_state_backing.backing, invocation), + profiling_settings, + device_handle.at(target_proc), + iteration_config, + optimizer_attrs, + precondition); + }; + + auto issue_copy = [&]() { + DynamicValueAttrs const &input = get_only(invocation.inputs).second; + DynamicValueAttrs const &output = get_only(invocation.outputs).second; + Realm::RegionInstance src_inst = + tensor_instance_backing.backing.at(input).first; + Realm::RegionInstance dst_inst = + tensor_instance_backing.backing.at(output).first; + return ctx.issue_copy(assert_unwrap(input.parallel_tensor_shape), + src_inst, + assert_unwrap(output.parallel_tensor_shape), + dst_inst, + Realm::ProfilingRequestSet{}, + precondition); + }; + + TrainingOperationAttrs op_attrs = + assert_unwrap(invocation.node_attrs.op_attrs); + return op_attrs.visit(overload{ + [&](PCGOperatorAttrs const &pcg_op_attrs) { + return pcg_op_attrs.visit(overload{ + [&](InputAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](WeightAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](auto const &) { return spawn_task(); }, + }); + }, + [&](LossAttrs const &) { return spawn_task(); }, + [&](CopyAttrs const &) { return issue_copy(); }, + }); +} + static std::unordered_map execute_distributed_dynamic_node_invocation_set( RealmContext &ctx, @@ -172,14 +245,6 @@ static std::unordered_map DependencySet dependency_set{ctx.get_outstanding_events()}; return unordered_map_from_pairs( transform(invocations, [&](DynamicNodeInvocation const &invocation) { - TrainingOperationAttrs op_attrs = - assert_unwrap(invocation.node_attrs.op_attrs); - if (op_attrs.is_pcg_op() && (op_attrs.require_pcg_op().is_input() || - op_attrs.require_pcg_op().is_weight())) { - return std::pair{invocation.node_attrs.layer_guid, - Realm::Event::NO_EVENT}; - } - std::vector input_dependencies = transform(vector_of(values(invocation.inputs)), [&](DynamicValueAttrs const &value) { @@ -190,27 +255,19 @@ static std::unordered_map [&](DynamicValueAttrs const &value) { return dependency_set.get_dependency_for_writer(value); }); - Realm::Event dependencies = Realm::Event::merge_events( - Realm::Event::merge_events(input_dependencies), - Realm::Event::merge_events(output_dependencies)); - Realm::Processor target_proc = ctx.map_device_coord_to_processor( - assert_unwrap(invocation.node_attrs.device_coord)); - - TensorInstanceBacking tensor_backing = - subset_tensor_instance_backing_for_invocation( - tensor_instance_backing, invocation); Realm::Event result = - spawn_op_task(ctx, - target_proc, - invocation, - tensor_backing, - try_at(device_state_backing.backing, invocation), - profiling_settings, - device_handle.at(target_proc), - iteration_config, - optimizer_attrs, - dependencies); + spawn_dynamic_node_invocation(ctx, + invocation, + input_dependencies, + output_dependencies, + tensor_instance_backing, + device_state_backing, + optimizer_attrs, + profiling_settings, + device_handle, + iteration_config); + for (DynamicValueAttrs const &value : values(invocation.inputs)) { dependency_set.add_reader(value, result); } diff --git a/lib/realm-execution/src/realm-execution/realm_context.cc b/lib/realm-execution/src/realm-execution/realm_context.cc index 96beb63953..790c1bd613 100644 --- a/lib/realm-execution/src/realm-execution/realm_context.cc +++ b/lib/realm-execution/src/realm-execution/realm_context.cc @@ -2,6 +2,7 @@ #include "kernels/device_handle_t.dtg.h" #include "kernels/device_handle_t.h" #include "op-attrs/datatype.h" +#include "op-attrs/parallel_tensor_shape.h" #include "op-attrs/tensor_dims.dtg.h" #include "pcg/device_id_t.h" #include "pcg/device_type.dtg.h" @@ -10,6 +11,7 @@ #include "realm-execution/tasks/task_id_t.h" #include "utils/containers/contains_key.h" #include "utils/containers/transform.h" +#include "utils/exception.h" #include "utils/nonnegative_int/nonnegative_int.h" #include "utils/one_to_many/one_to_many.h" #include "utils/positive_int/positive_int.h" @@ -146,13 +148,89 @@ static Realm::Rect rect_from_dims(TensorDims const &dims) { Realm::Point::ONES()}; } +template +static Realm::IndexSpace ispace_from_dims(TensorDims const &dims) { + Realm::Rect rect = rect_from_dims(dims); + return Realm::IndexSpace{rect}; +} + +Realm::Event + RealmContext::issue_copy(ParallelTensorShape const &src_shape, + Realm::RegionInstance src_inst, + ParallelTensorShape const &dst_shape, + Realm::RegionInstance dst_inst, + Realm::ProfilingRequestSet const &requests, + Realm::Event wait_on, + int priority) { + TensorShape src_piece_shape = get_piece_shape(src_shape); + TensorShape dst_piece_shape = get_piece_shape(dst_shape); + ASSERT(src_piece_shape == dst_piece_shape); // For now, assume they match + + Realm::CopySrcDstField src_field; + src_field.set_field( + /*inst=*/src_inst, + /*field_id=*/0, + /*size=*/ + static_cast( + size_of_datatype(src_piece_shape.data_type).int_from_positive_int()), + /*subfield_offset=*/0); + Realm::CopySrcDstField dst_field; + dst_field.set_field( + /*inst=*/dst_inst, + /*field_id=*/0, + /*size=*/ + static_cast( + size_of_datatype(src_piece_shape.data_type).int_from_positive_int()), + /*subfield_offset=*/0); + + Realm::Event result; + switch (src_piece_shape.dims.ff_ordered.num_dims()) { +#if REALM_MAX_DIM >= 1 + case 1: + result = ispace_from_dims<1>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 2 + case 2: + result = ispace_from_dims<2>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 3 + case 3: + result = ispace_from_dims<3>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 4 + case 4: + result = ispace_from_dims<4>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 5 + case 5: + result = ispace_from_dims<5>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif + default: + PANIC("TensorShape dims greater than REALM_MAX_DIM: {}", + src_piece_shape.dims.ff_ordered.num_dims()); + break; + } + this->outstanding_events.push_back(result); + return result; +} + std::pair RealmContext::create_instance(Realm::Memory memory, TensorShape const &shape, Realm::ProfilingRequestSet const &prs, Realm::Event wait_on) { - std::vector field_sizes{ - static_cast(int{size_of_datatype(shape.data_type)})}; + std::vector field_sizes{static_cast( + size_of_datatype(shape.data_type).int_from_positive_int())}; Realm::RegionInstance inst; Realm::Event ready; switch (shape.dims.ff_ordered.num_dims()) { diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml new file mode 100644 index 0000000000..cef44f5f08 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml @@ -0,0 +1,13 @@ +namespace = "FlexFlow" +name = "CopyAttrs" +type = "struct" +features = [ + "eq", + "ord", + "hash", + "json", + "fmt", + "rapidcheck", +] + +fields = [] diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h new file mode 100644 index 0000000000..a1726c2ae1 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h @@ -0,0 +1,26 @@ +#ifndef _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_COPY_INSERTION_H +#define _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_COPY_INSERTION_H + +#include "task-spec/dynamic_graph/dynamic_node_attrs.dtg.h" +#include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" +#include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.dtg.h" + +namespace FlexFlow { + +bool node_is_copy(DynamicNodeAttrs const &n); +bool value_is_mapped(DynamicValueAttrs const &); + +bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &); +bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &); + +std::unordered_set perform_copy_insertion_for_invocation( + DynamicNodeInvocation const &i, + std::unordered_map const + &unmapped_value_to_mapped_source_value); + +DynamicOpenDataflowGraph + perform_copy_insertion(DynamicOpenDataflowGraph const &); + +} // namespace FlexFlow + +#endif diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml new file mode 100644 index 0000000000..de13cdd9a8 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml @@ -0,0 +1,13 @@ +namespace = "FlexFlow" +name = "dynamic_copy_layer_guid_t" +type = "struct" +features = [ + "eq", + "ord", + "hash", + "json", + "fmt", + "rapidcheck", +] + +fields = [] diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml index bd64f52567..8def0ec5fb 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml @@ -12,6 +12,7 @@ includes = [ "pcg/layer_guid_t.dtg.h", "pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h", "task-spec/dynamic_graph/dynamic_loss_layer_guid_t.dtg.h", + "task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.h", ] [[values]] @@ -25,3 +26,7 @@ key = "pcg_layer_guid" [[values]] type = "::FlexFlow::dynamic_loss_layer_guid_t" key = "loss_layer_guid" + +[[values]] +type = "::FlexFlow::dynamic_copy_layer_guid_t" +key = "copy_layer_guid" diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml index 128e305dc6..73c023fd40 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml @@ -28,6 +28,16 @@ type = "std::optional<::FlexFlow::DynamicTaskType>" [[fields]] name = "device_coord" type = "std::optional<::FlexFlow::MachineSpaceCoordinate>" +docstring = ''' +\note Right now the \c device_coord for a copy node is sort of meaningless +because we have one controller issuing all copies for the entire graph, no +matter where they are. However the intention is this to be the "owner" or +"issuer" of the copy, which matters a lot more down the road once we write the +control replicated version of the Realm backend. At that point, you will have +to pick one specific node to issue the copy, and the program should be correct +no matter what you pick, but there may be performance implications to those +choices. +''' [[fields]] name = "mapping" diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h new file mode 100644 index 0000000000..7b13b480c8 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h @@ -0,0 +1,13 @@ +#ifndef _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_DYNAMIC_TASK_TYPE_H +#define _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_DYNAMIC_TASK_TYPE_H + +#include "task-spec/dynamic_graph/dynamic_task_type.dtg.h" +#include "task-spec/dynamic_graph/dynamic_tensor_role.dtg.h" + +namespace FlexFlow { + +DynamicTaskType dynamic_task_type_from_tensor_role_for_copy(DynamicTensorRole); + +} // namespace FlexFlow + +#endif diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml index 89b94b1017..490a51f88d 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml @@ -12,6 +12,8 @@ includes = [ "task-spec/dynamic_graph/dynamic_tensor_guid_t.dtg.h", "op-attrs/parallel_tensor_shape.dtg.h", "op-attrs/parallel_tensor_space_coordinate.dtg.h", + "pcg/machine_space_coordinate.dtg.h", + "utils/bidict/bidict.h", "task-spec/dynamic_graph/dynamic_tensor_accessor.dtg.h", "task-spec/dynamic_graph/dynamic_tensor_role.dtg.h", ] @@ -32,6 +34,10 @@ type = "std::optional<::FlexFlow::ParallelTensorShape>" name = "shard_coord" type = "std::optional<::FlexFlow::ParallelTensorSpaceCoordinate>" +[[fields]] +name = "mapping" +type = "std::optional<::FlexFlow::bidict<::FlexFlow::ParallelTensorSpaceCoordinate, ::FlexFlow::MachineSpaceCoordinate>>" + [[fields]] name = "accessor" type = "std::optional<::FlexFlow::DynamicTensorAccessor>" diff --git a/lib/task-spec/include/task-spec/dynamic_graph/index.dox b/lib/task-spec/include/task-spec/dynamic_graph/index.dox index 04ceaf4935..c48e67f4b3 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/index.dox +++ b/lib/task-spec/include/task-spec/dynamic_graph/index.dox @@ -11,6 +11,7 @@ namespace FlexFlow { - \ref shard_expansion.h - \ref update_insertion.h - \ref loss_insertion.h +- \ref copy_insertion.h Inserts copies into the dynamic graph by inspecting the mappings of operators and inferring where tensors are used on devices other than where they were created. - \ref machine_slicing.h */ diff --git a/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml index 6209bfa247..454f1b7e8c 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml @@ -13,6 +13,8 @@ includes = [ "task-spec/dynamic_graph/dynamic_tensor_guid_t.dtg.h", "op-attrs/parallel_tensor_shape.dtg.h", "op-attrs/parallel_tensor_space_coordinate.dtg.h", + "pcg/machine_space_coordinate.dtg.h", + "utils/bidict/bidict.h", "task-spec/dynamic_graph/dynamic_tensor_role.dtg.h", ] @@ -33,6 +35,10 @@ type = "std::optional<::FlexFlow::ParallelTensorShape>" name = "shard_coord" type = "std::optional<::FlexFlow::ParallelTensorSpaceCoordinate>" +[[fields]] +name = "mapping" +type = "std::optional<::FlexFlow::bidict<::FlexFlow::ParallelTensorSpaceCoordinate, ::FlexFlow::MachineSpaceCoordinate>>" + [[fields]] name = "role" type = "std::optional<::FlexFlow::DynamicTensorRole>" diff --git a/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml index 1051d8ac13..8f8f6467c8 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml @@ -11,6 +11,7 @@ features = [ includes = [ "op-attrs/ops/loss_functions/loss_attrs.dtg.h", "op-attrs/pcg_operator_attrs.dtg.h", + "task-spec/dynamic_graph/copy_attrs.dtg.h", ] [[values]] @@ -20,3 +21,7 @@ key = "pcg_op" [[values]] type = "::FlexFlow::LossAttrs" key = "loss" + +[[values]] +type = "::FlexFlow::CopyAttrs" +key = "copy" diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc new file mode 100644 index 0000000000..4c1b9d4609 --- /dev/null +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -0,0 +1,182 @@ +#include "task-spec/dynamic_graph/copy_insertion.h" +#include "op-attrs/parallel_tensor_space_coordinate.dtg.h" +#include "op-attrs/tensor_slot_name.dtg.h" +#include "pcg/machine_space_coordinate.dtg.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/dynamic_node_attrs.dtg.h" +#include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" +#include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "task-spec/dynamic_graph/dynamic_task_type.h" +#include "task-spec/dynamic_graph/dynamic_tensor_slot.dtg.h" +#include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" +#include "utils/bidict/algorithms/bidict_from_pairs.h" +#include "utils/bidict/algorithms/unordered_set_of.h" +#include "utils/containers/contains_key.h" +#include "utils/containers/flatmap.h" +#include "utils/containers/intersection.h" +#include "utils/containers/map_values2.h" +#include "utils/containers/set_difference.h" +#include "utils/containers/transform.h" +#include "utils/optional.h" + +namespace FlexFlow { + +bool node_is_copy(DynamicNodeAttrs const &n) { + return n.op_attrs.has_value() && n.op_attrs.value().is_copy(); +} + +bool value_is_mapped(DynamicValueAttrs const &n) { + return n.mapping.has_value(); +} + +bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &g) { + auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return false; }; + + return no_part_of_dynamic_graph_satisfies( + g, node_is_copy, value_is_mapped, slot_is_mapped); +} + +bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &g) { + auto node_is_any = [](DynamicNodeAttrs const &) -> bool { return true; }; + auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return true; }; + + return full_dynamic_graph_satisfies( + g, node_is_any, value_is_mapped, slot_is_mapped); +} + +static DynamicValueAttrs map_dynamic_value_attrs_for_task_group( + DynamicTensorSlot const &slot, + DynamicValueAttrs const &value, + MappedOperatorTaskGroup const &mapping) { + DynamicValueAttrs result = value; + result.mapping = get_tensor_bindings_for_slot_name(mapping, slot.slot_name); + return result; +} + +static std::pair + filter_mapping_to_avoid_degenerate_copies(DynamicValueAttrs const &input, + DynamicValueAttrs const &output) { + std::unordered_set< + std::pair> + input_mapping = unordered_set_of(assert_unwrap(input.mapping)); + std::unordered_set< + std::pair> + output_mapping = unordered_set_of(assert_unwrap(output.mapping)); + + // Exclude the point shared between the input and output mappings, because + // those will not result in actual copies once shard expansion is performed + std::unordered_set< + std::pair> + remove = intersection(input_mapping, output_mapping); + + DynamicValueAttrs filtered_input = input; + filtered_input.mapping = + bidict_from_pairs(set_difference(input_mapping, remove)); + + DynamicValueAttrs filtered_output = output; + filtered_output.mapping = + bidict_from_pairs(set_difference(output_mapping, remove)); + + return std::pair{filtered_input, filtered_output}; +} + +std::unordered_set perform_copy_insertion_for_invocation( + DynamicNodeInvocation const &i, + std::unordered_map const + &unmapped_value_to_mapped_source_value) { + + MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); + + auto map_tensor = [&](DynamicTensorSlot const &slot, + DynamicValueAttrs const &value) { + return map_dynamic_value_attrs_for_task_group(slot, value, mapping); + }; + + std::unordered_map mapped_inputs = + map_values2(i.inputs, map_tensor); + std::unordered_map mapped_outputs = + map_values2(i.outputs, map_tensor); + + std::unordered_set result{DynamicNodeInvocation{ + /*inputs=*/mapped_inputs, + /*node_attrs=*/i.node_attrs, + /*outputs=*/mapped_outputs, + }}; + + for (auto const &[slot, input] : i.inputs) { + if (!contains_key(unmapped_value_to_mapped_source_value, input)) { + continue; + } + + DynamicValueAttrs source_value = + unmapped_value_to_mapped_source_value.at(input); + DynamicValueAttrs use_value = mapped_inputs.at(slot); + if (source_value != use_value) { + auto const &[filtered_source, filtered_use] = + filter_mapping_to_avoid_degenerate_copies(source_value, use_value); + DynamicNodeInvocation copy{ + /*inputs=*/{ + { + DynamicTensorSlot{TensorSlotName::INPUT, + slot.slot_tensor_role}, + filtered_source, + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/transform( + slot.slot_tensor_role, + dynamic_task_type_from_tensor_role_for_copy), + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + DynamicTensorSlot{TensorSlotName::OUTPUT, + slot.slot_tensor_role}, + filtered_use, + }, + }, + }; + result.insert(copy); + } + } + + return result; +} + +DynamicOpenDataflowGraph + perform_copy_insertion(DynamicOpenDataflowGraph const &g) { + + ASSERT(no_part_of_graph_is_copy_inserted(g)); + + std::unordered_map + unmapped_value_to_mapped_source_value; + for (DynamicNodeInvocation const &i : g.invocations) { + for (auto const &[slot, value] : i.outputs) { + unmapped_value_to_mapped_source_value.insert( + std::pair{value, + map_dynamic_value_attrs_for_task_group( + slot, value, assert_unwrap(i.node_attrs.mapping))}); + } + } + + // Use regular flatmap here to remove duplicates (we don't want to copy the + // same tensor to the same place multiple times) + DynamicOpenDataflowGraph result = + dynamic_open_dataflow_graph_from_invocation_set( + flatmap(g.invocations, [&](DynamicNodeInvocation const &i) { + return perform_copy_insertion_for_invocation( + i, unmapped_value_to_mapped_source_value); + })); + + ASSERT(graph_is_fully_copy_inserted(result)); + + return result; +} + +} // namespace FlexFlow diff --git a/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc b/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc new file mode 100644 index 0000000000..216fa1b0ff --- /dev/null +++ b/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc @@ -0,0 +1,25 @@ +#include "task-spec/dynamic_graph/dynamic_task_type.h" +#include "utils/overload.h" + +namespace FlexFlow { + +DynamicTaskType + dynamic_task_type_from_tensor_role_for_copy(DynamicTensorRole role) { + return role.visit(overload{ + [](FwbTensorType const &fwb_tensor) { + switch (fwb_tensor) { + case FwbTensorType::FORWARD: + return DynamicTaskType::FWD; + case FwbTensorType::GRADIENT: + return DynamicTaskType::BWD; + default: + PANIC("Unexpected FwbTensorType", fwb_tensor); + break; + } + }, + [](DynamicOptimizerTensorRole const &) { return DynamicTaskType::UPD; }, + [](DynamicLossTensorRole const &) { return DynamicTaskType::LOSS; }, + }); +} + +} // namespace FlexFlow diff --git a/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc index 857fed1a84..8066926262 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc @@ -7,6 +7,7 @@ #include "task-spec/dynamic_graph/dynamic_tensor_role.h" #include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "task-spec/dynamic_graph/training_operation_attrs.dtg.h" +#include "utils/containers/transform.h" #include "utils/optional.h" #include @@ -24,6 +25,7 @@ LossInsertionResult perform_loss_insertion( /*tensor_guid=*/mk_dynamic_tensor_guid_for_loss(), /*parallel_tensor_shape=*/logit_value.parallel_tensor_shape, /*shard_coord=*/logit_value.shard_coord, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/mk_dynamic_tensor_role_loss(), }; @@ -31,6 +33,7 @@ LossInsertionResult perform_loss_insertion( /*tensor_guid=*/logit_value.tensor_guid, /*parallel_tensor_shape=*/logit_value.parallel_tensor_shape, /*shard_coord=*/logit_value.shard_coord, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/mk_dynamic_tensor_role_bwd(), }; diff --git a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc index 204597386e..6bfc477e3a 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc @@ -44,6 +44,7 @@ DynamicOpenDataflowGraph /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/lift_to_parallel(attrs.shape), /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, @@ -63,6 +64,7 @@ DynamicOpenDataflowGraph /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/lift_to_parallel(attrs.shape), /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc index e06e7d5a32..246f9a3242 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc @@ -43,6 +43,7 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/attrs.shape, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, @@ -63,6 +64,7 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/attrs.shape, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc b/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc index 2dc0b509ab..b4d398c3f0 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc @@ -9,6 +9,7 @@ SerializableDynamicValueAttrs /*tensor_guid=*/attrs.tensor_guid, /*parallel_tensor_shape=*/attrs.parallel_tensor_shape, /*shard_coord=*/attrs.shard_coord, + /*mapping=*/attrs.mapping, /*role=*/attrs.role, }; } @@ -19,6 +20,7 @@ DynamicValueAttrs dynamic_value_attrs_from_serializable( /*tensor_guid=*/attrs.tensor_guid, /*parallel_tensor_shape=*/attrs.parallel_tensor_shape, /*shard_coord=*/attrs.shard_coord, + /*mapping=*/attrs.mapping, /*accessor=*/std::nullopt, /*role=*/attrs.role, }; diff --git a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc index 402e0ef055..fb6efb96d0 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,6 +1,11 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" +#include "utils/bidict/algorithms/filter_keys.h" +#include "utils/containers/get_only.h" #include "utils/containers/map_values2.h" +#include "utils/containers/require_same.h" +#include "utils/containers/transform.h" #include "utils/optional.h" namespace FlexFlow { @@ -35,6 +40,16 @@ bool graph_is_fully_shard_expanded(DynamicOpenDataflowGraph const &g) { slot_is_shard_expanded); } +static bidict + restrict_tensor_mapping_keys_to_coord( + bidict const + &mapping, + ParallelTensorSpaceCoordinate const ¶llel_tensor_coord) { + return filter_keys(mapping, [&](ParallelTensorSpaceCoordinate const &p) { + return p == parallel_tensor_coord; + }); +} + static DynamicNodeInvocation shard_invocation_for_binding( DynamicNodeInvocation const &i, MachineSpaceCoordinate const &machine_coord, @@ -47,6 +62,13 @@ static DynamicNodeInvocation shard_invocation_for_binding( DynamicValueAttrs result = v; result.shard_coord = parallel_tensor_coord; + result.mapping = transform( + v.mapping, + [&](bidict const + &mapping) { + return restrict_tensor_mapping_keys_to_coord(mapping, + parallel_tensor_coord); + }); return result; }; @@ -63,8 +85,41 @@ static DynamicNodeInvocation shard_invocation_for_binding( }; } +static std::unordered_set + perform_shard_expansion_for_copy(DynamicNodeInvocation const &i) { + auto [input_slot, input] = get_only(i.inputs); + auto [output_slot, output] = get_only(i.outputs); + bidict input_mapping = + assert_unwrap(input.mapping); + require_same(input_mapping.left_values(), + assert_unwrap(output.mapping).left_values()); + + return transform( + input_mapping.left_values(), [&](ParallelTensorSpaceCoordinate const &p) { + // The machine coord for a copy is inherently nebulous because it + // doesn't strictly run in any single location. Further, Realm has the + // flexibility to issue a copy operation from anywhere in the machine, + // including remotely. Here we choose machine_coord based on the input + // because we expect this to align with the most efficient way to issue + // copies in Realm, although the current Realm backend uses a + // centralized controller and thus issues copies all from a single node. + MachineSpaceCoordinate machine_coord = input_mapping.at_l(p); + + return shard_invocation_for_binding(i, + machine_coord, + OperatorAtomicTaskShardBinding{{ + {input_slot.slot_name, p}, + {output_slot.slot_name, p}, + }}); + }); +} + std::unordered_set perform_shard_expansion_for_invocation(DynamicNodeInvocation const &i) { + if (i.node_attrs.op_attrs.has_value() && + i.node_attrs.op_attrs.value().is_copy()) { + return perform_shard_expansion_for_copy(i); + } MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc new file mode 100644 index 0000000000..2160f6bf82 --- /dev/null +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc @@ -0,0 +1,433 @@ +#include "task-spec/dynamic_graph/copy_insertion.h" +#include "op-attrs/tensor_slot_name.dtg.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/dynamic_task_type.dtg.h" +#include "task-spec/dynamic_graph/dynamic_tensor_role.h" +#include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" +#include "test/utils/doctest/fmt/unordered_set.h" +#include + +using namespace ::FlexFlow; + +TEST_SUITE(FF_TEST_SUITE) { + TEST_CASE("perform_copy_insertion_for_invocation") { + auto mk_machine_coord = + [](nonnegative_int node_idx, + nonnegative_int device_idx) -> MachineSpaceCoordinate { + return MachineSpaceCoordinate{ + /*node_idx=*/node_idx, + /*device_idx=*/device_idx, + /*device_type=*/DeviceType::GPU, + }; + }; + + auto mk_pt_coord = + [](nonnegative_int idx1, + nonnegative_int idx2, + nonnegative_int idx3, + nonnegative_int idx4) -> ParallelTensorSpaceCoordinate { + return ParallelTensorSpaceCoordinate{ + /*sum_component=*/idx1, + /*discard_copy_component=*/idx2, + /*shard_components=*/ + FFOrdered{ + idx3, + idx4, + }, + }; + }; + + auto mk_input_shard_binding = [&](ParallelTensorSpaceCoordinate const &c) + -> OperatorAtomicTaskShardBinding { + return OperatorAtomicTaskShardBinding{ + /*tensor_coords=*/{ + { + TensorSlotName::OUTPUT, + c, + }, + }, + }; + }; + + auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, + ParallelTensorSpaceCoordinate const &c2, + ParallelTensorSpaceCoordinate const &c3, + ParallelTensorSpaceCoordinate const &c4) + -> OperatorAtomicTaskShardBinding { + return OperatorAtomicTaskShardBinding{ + /*tensor_coords=*/{ + { + TensorSlotName::INPUT, + c1, + }, + { + TensorSlotName::WEIGHT, + c2, + }, + { + TensorSlotName::OUTPUT_1, + c3, + }, + { + TensorSlotName::OUTPUT_2, + c4, + }, + }, + }; + }; + + MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); + MachineSpaceCoordinate mc2 = mk_machine_coord(1_n, 0_n); + MachineSpaceCoordinate mc3 = mk_machine_coord(2_n, 0_n); + MachineSpaceCoordinate mc4 = mk_machine_coord(3_n, 0_n); + + ParallelTensorSpaceCoordinate mc1_input_coord = + mk_pt_coord(0_n, 0_n, 0_n, 0_n); + ParallelTensorSpaceCoordinate mc1_weight_coord = + mk_pt_coord(0_n, 1_n, 2_n, 0_n); + ParallelTensorSpaceCoordinate mc1_output_1_coord = + mk_pt_coord(1_n, 0_n, 0_n, 1_n); + ParallelTensorSpaceCoordinate mc1_output_2_coord = + mk_pt_coord(3_n, 0_n, 0_n, 0_n); + + ParallelTensorSpaceCoordinate mc2_input_coord = + mk_pt_coord(0_n, 1_n, 0_n, 0_n); + ParallelTensorSpaceCoordinate mc2_weight_coord = + mk_pt_coord(0_n, 4_n, 2_n, 0_n); + ParallelTensorSpaceCoordinate mc2_output_1_coord = + mk_pt_coord(1_n, 2_n, 0_n, 1_n); + ParallelTensorSpaceCoordinate mc2_output_2_coord = + mk_pt_coord(0_n, 0_n, 0_n, 0_n); + + MappedOperatorTaskGroup input_mapping_same = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc2, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + + MappedOperatorTaskGroup weight_mapping_same = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_weight_coord), + }, + { + mc2, + mk_input_shard_binding(mc2_weight_coord), + }, + }, + }; + + MappedOperatorTaskGroup invocation_mapping = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_shard_binding(mc1_input_coord, + mc1_weight_coord, + mc1_output_1_coord, + mc1_output_2_coord), + }, + { + mc2, + mk_shard_binding(mc2_input_coord, + mc2_weight_coord, + mc2_output_1_coord, + mc2_output_2_coord), + }, + }, + }; + + MappedOperatorTaskGroup invocation_mapping_diff_vs_copy1 = + MappedOperatorTaskGroup{ + bidict{ + { + mc2, + mk_shard_binding(mc2_input_coord, + mc2_weight_coord, + mc2_output_1_coord, + mc2_output_2_coord), + }, + }, + }; + auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { + return DynamicTensorSlot{ + /*slot_name=*/slot_name, + /*slot_tensor_role=*/mk_dynamic_tensor_role_fwd(), + }; + }; + + auto mk_value = [&](size_t src_node_id, + TensorSlotName src_slot_name, + MappedOperatorTaskGroup const &mapping, + std::optional const &use_slot_name) + -> DynamicValueAttrs { + return DynamicValueAttrs{ + /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ + KwargDataflowOutput{ + Node{src_node_id}, + src_slot_name, + }, + }}, + /*parallel_tensor_shape=*/std::nullopt, + /*shard_coord=*/std::nullopt, + /*mapping=*/ + transform(use_slot_name, + [&](TensorSlotName s) { + return get_tensor_bindings_for_slot_name(mapping, s); + }), + /*accessor=*/std::nullopt, + /*role=*/std::nullopt, + }; + }; + + size_t invocation1_id = 20; + + DynamicValueAttrs graph_input1 = + mk_value(0, TensorSlotName::OUTPUT, invocation_mapping, std::nullopt); + DynamicValueAttrs graph_input1_use = mk_value( + 0, TensorSlotName::OUTPUT, invocation_mapping, TensorSlotName::INPUT); + DynamicValueAttrs graph_input1_use_diff_vs_copy1 = + mk_value(0, + TensorSlotName::OUTPUT, + invocation_mapping_diff_vs_copy1, + TensorSlotName::INPUT); + DynamicValueAttrs graph_input2 = + mk_value(1, TensorSlotName::OUTPUT, invocation_mapping, std::nullopt); + DynamicValueAttrs graph_input2_use = mk_value( + 1, TensorSlotName::OUTPUT, invocation_mapping, TensorSlotName::WEIGHT); + DynamicValueAttrs invocation1_output1 = mk_value(invocation1_id, + TensorSlotName::OUTPUT_1, + invocation_mapping, + std::nullopt); + DynamicValueAttrs invocation1_output1_src = + mk_value(invocation1_id, + TensorSlotName::OUTPUT_1, + invocation_mapping, + TensorSlotName::OUTPUT_1); + DynamicValueAttrs invocation1_output2 = mk_value(invocation1_id, + TensorSlotName::OUTPUT_2, + invocation_mapping, + std::nullopt); + DynamicValueAttrs invocation1_output2_src = + mk_value(invocation1_id, + TensorSlotName::OUTPUT_2, + invocation_mapping, + TensorSlotName::OUTPUT_2); + + DynamicValueAttrs graph_input1_src_same = mk_value( + 0, TensorSlotName::OUTPUT, input_mapping_same, TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input2_src_same = mk_value( + 1, TensorSlotName::OUTPUT, weight_mapping_same, TensorSlotName::OUTPUT); + + DynamicNodeInvocation input = DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + graph_input1, + }, + { + mk_slot(TensorSlotName::WEIGHT), + graph_input2, + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/DynamicTaskType::FWD, + /*device_coord=*/std::nullopt, + /*mapping=*/invocation_mapping, + /*op_attrs=*/std::nullopt, + /*layer_guid=*/ + dynamic_layer_guid_t{parallel_layer_guid_t{Node{20}}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT_1), + invocation1_output1, + }, + { + mk_slot(TensorSlotName::OUTPUT_2), + invocation1_output2, + }, + }, + }; + + DynamicNodeInvocation mapped = DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + graph_input1_use, + }, + { + mk_slot(TensorSlotName::WEIGHT), + graph_input2_use, + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/DynamicTaskType::FWD, + /*device_coord=*/std::nullopt, + /*mapping=*/invocation_mapping, + /*op_attrs=*/std::nullopt, + /*layer_guid=*/ + dynamic_layer_guid_t{parallel_layer_guid_t{Node{20}}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT_1), + invocation1_output1_src, + }, + { + mk_slot(TensorSlotName::OUTPUT_2), + invocation1_output2_src, + }, + }, + }; + + auto mk_copy = [&](DynamicValueAttrs const &src, + DynamicValueAttrs const &dst) { + return DynamicNodeInvocation{ + /*inputs=*/{{mk_slot(TensorSlotName::INPUT), src}}, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/DynamicTaskType::FWD, + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/{{mk_slot(TensorSlotName::OUTPUT), dst}}, + }; + }; + + SUBCASE("same mapping, no copies") { + std::unordered_map sources_same{ + {graph_input1, graph_input1_src_same}, + {graph_input2, graph_input2_src_same}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources_same); + + std::unordered_set correct = {mapped}; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } + + SUBCASE("copy one tensor, one point") { + MappedOperatorTaskGroup input_mapping_copy1 = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc3, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + MappedOperatorTaskGroup input_mapping_copy1_diff_vs_use = + MappedOperatorTaskGroup{ + bidict{ + { + mc3, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + + DynamicValueAttrs graph_input1_src_copy1 = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy1, + TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input1_src_copy1_diff_vs_use = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy1_diff_vs_use, + TensorSlotName::OUTPUT); + + std::unordered_map sources_copy1{ + {graph_input1, graph_input1_src_copy1}, + {graph_input2, graph_input2_src_same}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources_copy1); + + std::unordered_set correct = { + mapped, + mk_copy(graph_input1_src_copy1_diff_vs_use, + graph_input1_use_diff_vs_copy1), + }; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } + + SUBCASE("copy two tensors, two points") { + MappedOperatorTaskGroup input_mapping_copy2 = MappedOperatorTaskGroup{ + bidict{ + { + mc3, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc4, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + MappedOperatorTaskGroup weight_mapping_copy2 = MappedOperatorTaskGroup{ + bidict{ + { + mc4, + mk_input_shard_binding(mc1_weight_coord), + }, + { + mc3, + mk_input_shard_binding(mc2_weight_coord), + }, + }, + }; + + DynamicValueAttrs graph_input1_src_copy2 = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy2, + TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input2_src_copy2 = + mk_value(1, + TensorSlotName::OUTPUT, + weight_mapping_copy2, + TensorSlotName::OUTPUT); + + std::unordered_map sources_copy2{ + {graph_input1, graph_input1_src_copy2}, + {graph_input2, graph_input2_src_copy2}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources_copy2); + + std::unordered_set correct = { + mapped, + mk_copy(graph_input1_src_copy2, graph_input1_use), + mk_copy(graph_input2_src_copy2, graph_input2_use), + }; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } + } +} diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc index fc9110b6e4..49b8d4a77a 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc @@ -15,6 +15,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*tensor_type=*/std::nullopt, }; @@ -28,6 +29,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*tensor_type=*/std::nullopt, }; @@ -41,6 +43,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*tensor_type=*/std::nullopt, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc index 40d37f50df..40b3460ee5 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc @@ -75,6 +75,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/shard_coord, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc index e8fcf2e40b..fb087f5295 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc @@ -19,6 +19,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/tensor_role, }; @@ -112,6 +113,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/tensor_role, }; @@ -228,6 +230,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/tensor_type, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc index 23fbb6e514..efe21146db 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,37 +1,75 @@ #include "task-spec/dynamic_graph/shard_expansion.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/copy_attrs.dtg.h" +#include "task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.h" +#include "task-spec/dynamic_graph/training_operation_attrs.dtg.h" #include "test/utils/doctest/fmt/unordered_set.h" +#include "utils/bidict/algorithms/filter_keys.h" #include using namespace ::FlexFlow; -TEST_SUITE(FF_TEST_SUITE) { - TEST_CASE("perform_shard_expansion_for_invocation") { - auto mk_machine_coord = - [](nonnegative_int node_idx, - nonnegative_int device_idx) -> MachineSpaceCoordinate { - return MachineSpaceCoordinate{ - /*node_idx=*/node_idx, - /*device_idx=*/device_idx, - /*device_type=*/DeviceType::GPU, - }; - }; +static MachineSpaceCoordinate mk_machine_coord(nonnegative_int node_idx, + nonnegative_int device_idx) { + return MachineSpaceCoordinate{ + /*node_idx=*/node_idx, + /*device_idx=*/device_idx, + /*device_type=*/DeviceType::GPU, + }; +}; - auto mk_pt_coord = - [](nonnegative_int idx1, - nonnegative_int idx2, - nonnegative_int idx3, - nonnegative_int idx4) -> ParallelTensorSpaceCoordinate { - return ParallelTensorSpaceCoordinate{ - /*sum_component=*/idx1, - /*discard_copy_component=*/idx2, - /*shard_components=*/ - FFOrdered{ - idx3, - idx4, +static ParallelTensorSpaceCoordinate mk_pt_coord(nonnegative_int idx1, + nonnegative_int idx2, + nonnegative_int idx3, + nonnegative_int idx4) { + return ParallelTensorSpaceCoordinate{ + /*sum_component=*/idx1, + /*discard_copy_component=*/idx2, + /*shard_components=*/ + FFOrdered{ + idx3, + idx4, + }, + }; +}; + +DynamicTensorSlot mk_slot(TensorSlotName const &slot_name) { + return DynamicTensorSlot{ + /*slot_name=*/slot_name, + /*slot_tensor_role=*/std::nullopt, + }; +}; + +DynamicValueAttrs + mk_value(size_t src_node_id, + TensorSlotName src_slot_name, + bidict + tensor_binding, + std::optional const &shard_coord) { + if (shard_coord.has_value()) { + tensor_binding = filter_keys(tensor_binding, + [&](ParallelTensorSpaceCoordinate const &p) { + return p == shard_coord.value(); + }); + } + return DynamicValueAttrs{ + /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ + KwargDataflowOutput{ + Node{src_node_id}, + src_slot_name, }, - }; - }; + }}, + /*parallel_tensor_shape=*/std::nullopt, + /*shard_coord=*/shard_coord, + /*mapping=*/ + tensor_binding, + /*accessor=*/std::nullopt, + /*role=*/std::nullopt, + }; +}; +TEST_SUITE(FF_TEST_SUITE) { + TEST_CASE("perform_shard_expansion_for_invocation") { auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, ParallelTensorSpaceCoordinate const &c2, ParallelTensorSpaceCoordinate const &c3, @@ -99,41 +137,33 @@ TEST_SUITE(FF_TEST_SUITE) { }, }; - auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { - return DynamicTensorSlot{ - /*slot_name=*/slot_name, - /*slot_tensor_role=*/std::nullopt, - }; - }; - - auto mk_value = - [](size_t src_node_id, - TensorSlotName src_slot_name, - std::optional const &shard_coord) + auto mk_op_value = + [&](size_t src_node_id, + TensorSlotName src_slot_name, + TensorSlotName use_slot_name, + std::optional const &shard_coord) -> DynamicValueAttrs { - return DynamicValueAttrs{ - /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ - KwargDataflowOutput{ - Node{src_node_id}, - src_slot_name, - }, - }}, - /*parallel_tensor_shape=*/std::nullopt, - /*shard_coord=*/shard_coord, - /*accessor=*/std::nullopt, - /*role=*/std::nullopt, - }; + bidict + tensor_binding = get_tensor_bindings_for_slot_name(mapped_task_group, + use_slot_name); + return mk_value(src_node_id, src_slot_name, tensor_binding, shard_coord); }; DynamicNodeInvocation input = DynamicNodeInvocation{ /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, TensorSlotName::OUTPUT, std::nullopt), + mk_op_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + std::nullopt), }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, TensorSlotName::OUTPUT, std::nullopt), + mk_op_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + std::nullopt), }, }, /*node_attrs=*/ @@ -150,11 +180,17 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(20, TensorSlotName::OUTPUT_1, std::nullopt), + mk_op_value(20, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + std::nullopt), }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(20, TensorSlotName::OUTPUT_2, std::nullopt), + mk_op_value(20, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + std::nullopt), }, }, }; @@ -173,11 +209,17 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, TensorSlotName::OUTPUT, input_shard_coord), + mk_op_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + input_shard_coord), }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, TensorSlotName::OUTPUT, weight_shard_coord), + mk_op_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + weight_shard_coord), }, }, /*node_attrs=*/ @@ -194,11 +236,17 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(20, TensorSlotName::OUTPUT_1, output_1_shard_coord), + mk_op_value(20, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + output_1_shard_coord), }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(20, TensorSlotName::OUTPUT_2, output_2_shard_coord), + mk_op_value(20, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + output_2_shard_coord), }, }, }; @@ -220,4 +268,83 @@ TEST_SUITE(FF_TEST_SUITE) { CHECK(result.size() == correct.size()); CHECK(result == correct); } + + TEST_CASE("perform_shard_expansion_for_invocation (copy)") { + MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); + MachineSpaceCoordinate mc2 = mk_machine_coord(1_n, 0_n); + MachineSpaceCoordinate mc3 = mk_machine_coord(2_n, 0_n); + MachineSpaceCoordinate mc4 = mk_machine_coord(3_n, 0_n); + + ParallelTensorSpaceCoordinate pt1 = mk_pt_coord(0_n, 0_n, 0_n, 0_n); + ParallelTensorSpaceCoordinate pt2 = mk_pt_coord(0_n, 1_n, 0_n, 0_n); + + bidict src_binding{ + {pt1, mc1}, + {pt2, mc2}, + }; + bidict dst_binding{ + {pt1, mc3}, + {pt2, mc4}, + }; + + DynamicNodeInvocation input = DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + mk_value(0, TensorSlotName::OUTPUT, src_binding, std::nullopt), + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs=*/TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT), + mk_value(20, TensorSlotName::OUTPUT, dst_binding, std::nullopt), + }, + }, + }; + + std::unordered_set result = + perform_shard_expansion_for_invocation(input); + + auto mk_invocation_shard = + [&](MachineSpaceCoordinate const &device_coord, + ParallelTensorSpaceCoordinate const &tensor_shard_coord) + -> DynamicNodeInvocation { + DynamicNodeInvocation result = input; + result.inputs = { + { + mk_slot(TensorSlotName::INPUT), + mk_value( + 0, TensorSlotName::OUTPUT, src_binding, tensor_shard_coord), + }, + }; + // See perform_shard_expansion_for_copy in shard_expansion.cc for explanation of the choice of device placement. + result.node_attrs.device_coord = device_coord; + result.outputs = { + { + mk_slot(TensorSlotName::OUTPUT), + mk_value( + 20, TensorSlotName::OUTPUT, dst_binding, tensor_shard_coord), + }, + }; + return result; + }; + + std::unordered_set correct = { + mk_invocation_shard(mc1, pt1), + mk_invocation_shard(mc2, pt2), + }; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } }