From 9d68a636da79f4c1365488b0b7d19a2d07d2959b Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Wed, 25 Feb 2026 17:03:38 -0800 Subject: [PATCH 01/20] Initial work on exposing copies to dynamic graph. --- .../mapped_operator_task_group.h | 5 +++++ .../mapped_operator_task_group.cc | 12 ++++++++++++ .../task-spec/dynamic_graph/copy_attrs.dtg.toml | 13 +++++++++++++ .../dynamic_graph/dynamic_value_attrs.dtg.toml | 6 ++++++ .../serializable_dynamic_value_attrs.dtg.toml | 6 ++++++ .../dynamic_graph/training_operation_attrs.dtg.toml | 5 +++++ .../src/task-spec/dynamic_graph/loss_insertion.cc | 13 +++++++++++++ .../make_dynamic_open_dataflow_graph_from_cg.cc | 2 ++ ...e_dynamic_open_dataflow_graph_from_mapped_pcg.cc | 10 +++++++++- .../serializable_dynamic_value_attrs.cc | 2 ++ 10 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml diff --git a/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h b/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h index ebfdefa478..b15b91e0e3 100644 --- a/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h +++ b/lib/pcg/include/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h @@ -2,6 +2,7 @@ #define _FLEXFLOW_LIB_PCG_INCLUDE_PCG_MAPPED_PARALLEL_COMPUTATION_GRAPH_MAPPED_OPERATOR_TASK_GROUP_H #include "op-attrs/computation_graph_op_attrs.dtg.h" +#include "op-attrs/tensor_slot_name.dtg.h" #include "pcg/machine_space_coordinate.dtg.h" #include "pcg/mapped_parallel_computation_graph/operator_atomic_task_shard_binding.dtg.h" #include "utils/bidict/bidict.h" @@ -32,6 +33,10 @@ struct MappedOperatorTaskGroup { friend struct ::std::hash; }; +bidict + get_tensor_bindings_for_slot_name(MappedOperatorTaskGroup const &, + TensorSlotName const &); + std::string format_as(::FlexFlow::MappedOperatorTaskGroup const &); std::ostream &operator<<(std::ostream &, ::FlexFlow::MappedOperatorTaskGroup const &); diff --git a/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc b/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc index 4436efd727..8f9db7eac7 100644 --- a/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc +++ b/lib/pcg/src/pcg/mapped_parallel_computation_graph/mapped_operator_task_group.cc @@ -3,6 +3,7 @@ #include "op-attrs/operator_task_space.h" #include "op-attrs/parallel_tensor_space_coordinate.h" #include "pcg/mapped_parallel_computation_graph/operator_atomic_task_shard_binding.h" +#include "utils/bidict/algorithms/transform_values.h" #include "utils/bidict/generate_bidict.h" #include "utils/containers/are_all_distinct.h" #include "utils/containers/require_all_same.h" @@ -70,6 +71,17 @@ bidict const & return this->shard_bindings; } +bidict + get_tensor_bindings_for_slot_name(MappedOperatorTaskGroup const &task_group, + TensorSlotName const &slot_name) { + return transform_values(task_group.get_shard_bindings(), + [&](OperatorAtomicTaskShardBinding const &b) { + return ptensor_space_coord_for_slot_name(b, + slot_name); + }) + .reversed(); +} + std::string format_as(::FlexFlow::MappedOperatorTaskGroup const &m) { return fmt::format("", m.get_shard_bindings()); diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml new file mode 100644 index 0000000000..cef44f5f08 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_attrs.dtg.toml @@ -0,0 +1,13 @@ +namespace = "FlexFlow" +name = "CopyAttrs" +type = "struct" +features = [ + "eq", + "ord", + "hash", + "json", + "fmt", + "rapidcheck", +] + +fields = [] diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml index 89b94b1017..490a51f88d 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_value_attrs.dtg.toml @@ -12,6 +12,8 @@ includes = [ "task-spec/dynamic_graph/dynamic_tensor_guid_t.dtg.h", "op-attrs/parallel_tensor_shape.dtg.h", "op-attrs/parallel_tensor_space_coordinate.dtg.h", + "pcg/machine_space_coordinate.dtg.h", + "utils/bidict/bidict.h", "task-spec/dynamic_graph/dynamic_tensor_accessor.dtg.h", "task-spec/dynamic_graph/dynamic_tensor_role.dtg.h", ] @@ -32,6 +34,10 @@ type = "std::optional<::FlexFlow::ParallelTensorShape>" name = "shard_coord" type = "std::optional<::FlexFlow::ParallelTensorSpaceCoordinate>" +[[fields]] +name = "mapping" +type = "std::optional<::FlexFlow::bidict<::FlexFlow::ParallelTensorSpaceCoordinate, ::FlexFlow::MachineSpaceCoordinate>>" + [[fields]] name = "accessor" type = "std::optional<::FlexFlow::DynamicTensorAccessor>" diff --git a/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml index 6209bfa247..454f1b7e8c 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/serializable_dynamic_value_attrs.dtg.toml @@ -13,6 +13,8 @@ includes = [ "task-spec/dynamic_graph/dynamic_tensor_guid_t.dtg.h", "op-attrs/parallel_tensor_shape.dtg.h", "op-attrs/parallel_tensor_space_coordinate.dtg.h", + "pcg/machine_space_coordinate.dtg.h", + "utils/bidict/bidict.h", "task-spec/dynamic_graph/dynamic_tensor_role.dtg.h", ] @@ -33,6 +35,10 @@ type = "std::optional<::FlexFlow::ParallelTensorShape>" name = "shard_coord" type = "std::optional<::FlexFlow::ParallelTensorSpaceCoordinate>" +[[fields]] +name = "mapping" +type = "std::optional<::FlexFlow::bidict<::FlexFlow::ParallelTensorSpaceCoordinate, ::FlexFlow::MachineSpaceCoordinate>>" + [[fields]] name = "role" type = "std::optional<::FlexFlow::DynamicTensorRole>" diff --git a/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml index 1051d8ac13..8f8f6467c8 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml @@ -11,6 +11,7 @@ features = [ includes = [ "op-attrs/ops/loss_functions/loss_attrs.dtg.h", "op-attrs/pcg_operator_attrs.dtg.h", + "task-spec/dynamic_graph/copy_attrs.dtg.h", ] [[values]] @@ -20,3 +21,7 @@ key = "pcg_op" [[values]] type = "::FlexFlow::LossAttrs" key = "loss" + +[[values]] +type = "::FlexFlow::CopyAttrs" +key = "copy" diff --git a/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc index 857fed1a84..0f33dec041 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc @@ -7,6 +7,7 @@ #include "task-spec/dynamic_graph/dynamic_tensor_role.h" #include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "task-spec/dynamic_graph/training_operation_attrs.dtg.h" +#include "utils/containers/transform.h" #include "utils/optional.h" #include @@ -24,6 +25,12 @@ LossInsertionResult perform_loss_insertion( /*tensor_guid=*/mk_dynamic_tensor_guid_for_loss(), /*parallel_tensor_shape=*/logit_value.parallel_tensor_shape, /*shard_coord=*/logit_value.shard_coord, + /*mapping=*/ + transform(loss_mapping, + [](MappedOperatorTaskGroup const &node_mapping) { + return get_tensor_bindings_for_slot_name( + node_mapping, TensorSlotName::INPUT); + }), /*accessor=*/std::nullopt, /*role=*/mk_dynamic_tensor_role_loss(), }; @@ -31,6 +38,12 @@ LossInsertionResult perform_loss_insertion( /*tensor_guid=*/logit_value.tensor_guid, /*parallel_tensor_shape=*/logit_value.parallel_tensor_shape, /*shard_coord=*/logit_value.shard_coord, + /*mapping=*/ + transform(loss_mapping, + [](MappedOperatorTaskGroup const &node_mapping) { + return get_tensor_bindings_for_slot_name( + node_mapping, TensorSlotName::LOGIT); + }), /*accessor=*/std::nullopt, /*role=*/mk_dynamic_tensor_role_bwd(), }; diff --git a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc index 204597386e..6bfc477e3a 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_cg.cc @@ -44,6 +44,7 @@ DynamicOpenDataflowGraph /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/lift_to_parallel(attrs.shape), /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, @@ -63,6 +64,7 @@ DynamicOpenDataflowGraph /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/lift_to_parallel(attrs.shape), /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc index e06e7d5a32..4f19780849 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc @@ -1,6 +1,7 @@ #include "task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.h" #include "op-attrs/parallel_tensor_shape.h" #include "op-attrs/pcg_operator_attrs.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" #include "pcg/parallel_computation_graph/parallel_computation_graph.h" #include "pcg/parallel_computation_graph/parallel_tensor_attrs.dtg.h" #include "task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.h" @@ -19,10 +20,11 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( for (auto const &[layer, attrs] : get_parallel_layer_attrs_mapping(mpcg.pcg)) { + MappedOperatorTaskGroup node_mapping = mpcg.mapped_tasks.at(layer); DynamicNodeAttrs result_attrs{ /*task_type=*/std::nullopt, /*device_coord=*/std::nullopt, - /*mapping=*/mpcg.mapped_tasks.at(layer), + /*mapping=*/node_mapping, /*op_attrs=*/TrainingOperationAttrs{attrs.op_attrs}, /*pcg_layer_guid=*/dynamic_layer_guid_t{layer}, /*per_device_op_state=*/std::nullopt, @@ -43,6 +45,9 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/attrs.shape, /*shard_coord=*/std::nullopt, + /*mapping=*/ + get_tensor_bindings_for_slot_name(node_mapping, + slot_name), /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, @@ -63,6 +68,9 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/attrs.shape, /*shard_coord=*/std::nullopt, + /*mapping=*/ + get_tensor_bindings_for_slot_name(node_mapping, + slot_name), /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc b/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc index 2dc0b509ab..b4d398c3f0 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/serializable_dynamic_value_attrs.cc @@ -9,6 +9,7 @@ SerializableDynamicValueAttrs /*tensor_guid=*/attrs.tensor_guid, /*parallel_tensor_shape=*/attrs.parallel_tensor_shape, /*shard_coord=*/attrs.shard_coord, + /*mapping=*/attrs.mapping, /*role=*/attrs.role, }; } @@ -19,6 +20,7 @@ DynamicValueAttrs dynamic_value_attrs_from_serializable( /*tensor_guid=*/attrs.tensor_guid, /*parallel_tensor_shape=*/attrs.parallel_tensor_shape, /*shard_coord=*/attrs.shard_coord, + /*mapping=*/attrs.mapping, /*accessor=*/std::nullopt, /*role=*/attrs.role, }; From b8af3abecbd358d2703976263f2f69929cb4ce14 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Wed, 25 Feb 2026 17:17:14 -0800 Subject: [PATCH 02/20] And now with actually fixed tests. --- .../dynamic_open_dataflow_graph.cc | 3 ++ .../dynamic_graph/machine_slicing.cc | 54 +++++++++++++++++-- .../task-spec/dynamic_graph/pass_expansion.cc | 3 ++ .../dynamic_graph/shard_expansion.cc | 9 ++-- 4 files changed, 63 insertions(+), 6 deletions(-) diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc index fc9110b6e4..49b8d4a77a 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/dynamic_open_dataflow_graph.cc @@ -15,6 +15,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*tensor_type=*/std::nullopt, }; @@ -28,6 +29,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*tensor_type=*/std::nullopt, }; @@ -41,6 +43,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*tensor_type=*/std::nullopt, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc index 40d37f50df..61d55408dc 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc @@ -32,6 +32,33 @@ TEST_SUITE(FF_TEST_SUITE) { }; }; + auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, + ParallelTensorSpaceCoordinate const &c2, + ParallelTensorSpaceCoordinate const &c3, + ParallelTensorSpaceCoordinate const &c4) + -> OperatorAtomicTaskShardBinding { + return OperatorAtomicTaskShardBinding{ + /*tensor_coords=*/{ + { + TensorSlotName::INPUT, + c1, + }, + { + TensorSlotName::WEIGHT, + c2, + }, + { + TensorSlotName::OUTPUT_1, + c3, + }, + { + TensorSlotName::OUTPUT_2, + c4, + }, + }, + }; + }; + MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); MachineSpaceCoordinate mc2 = mk_machine_coord(2_n, 0_n); MachineSpaceCoordinate mc3 = mk_machine_coord(4_n, 0_n); @@ -54,6 +81,25 @@ TEST_SUITE(FF_TEST_SUITE) { ParallelTensorSpaceCoordinate mc2_output_2_coord = mk_pt_coord(0_n, 0_n, 0_n, 0_n); + MappedOperatorTaskGroup mapped_task_group = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_shard_binding(mc1_input_coord, + mc1_weight_coord, + mc1_output_1_coord, + mc1_output_2_coord), + }, + { + mc2, + mk_shard_binding(mc2_input_coord, + mc2_weight_coord, + mc2_output_1_coord, + mc2_output_2_coord), + }, + }, + }; + auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { return DynamicTensorSlot{ /*slot_name=*/slot_name, @@ -62,9 +108,9 @@ TEST_SUITE(FF_TEST_SUITE) { }; auto mk_value = - [](size_t src_node_id, - TensorSlotName src_slot_name, - std::optional const &shard_coord) + [&](size_t src_node_id, + TensorSlotName src_slot_name, + std::optional const &shard_coord) -> DynamicValueAttrs { return DynamicValueAttrs{ /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ @@ -75,6 +121,8 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/shard_coord, + /*mapping=*/ + get_tensor_bindings_for_slot_name(mapped_task_group, src_slot_name), /*accessor=*/std::nullopt, /*role=*/std::nullopt, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc index e8fcf2e40b..fb087f5295 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/pass_expansion.cc @@ -19,6 +19,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/tensor_role, }; @@ -112,6 +113,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/tensor_role, }; @@ -228,6 +230,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/std::nullopt, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/tensor_type, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc index 23fbb6e514..340ee1c8fa 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,4 +1,5 @@ #include "task-spec/dynamic_graph/shard_expansion.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" #include "test/utils/doctest/fmt/unordered_set.h" #include @@ -107,9 +108,9 @@ TEST_SUITE(FF_TEST_SUITE) { }; auto mk_value = - [](size_t src_node_id, - TensorSlotName src_slot_name, - std::optional const &shard_coord) + [&](size_t src_node_id, + TensorSlotName src_slot_name, + std::optional const &shard_coord) -> DynamicValueAttrs { return DynamicValueAttrs{ /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ @@ -120,6 +121,8 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/shard_coord, + /*mapping=*/ + get_tensor_bindings_for_slot_name(mapped_task_group, src_slot_name), /*accessor=*/std::nullopt, /*role=*/std::nullopt, }; From 4b8411e7338104a7be0bc7ebc387f9035d8b914c Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 10:38:30 -0800 Subject: [PATCH 03/20] More work on copy insertion. --- .../task-spec/dynamic_graph/copy_insertion.h | 23 ++++ .../dynamic_copy_layer_guid_t.dtg.toml | 13 ++ .../dynamic_layer_guid_t.dtg.toml | 5 + .../task-spec/dynamic_graph/copy_insertion.cc | 116 ++++++++++++++++++ ...mic_open_dataflow_graph_from_mapped_pcg.cc | 12 +- .../dynamic_graph/shard_expansion.cc | 19 +++ .../dynamic_graph/machine_slicing.cc | 79 ++++++++---- .../dynamic_graph/shard_expansion.cc | 53 ++++++-- 8 files changed, 278 insertions(+), 42 deletions(-) create mode 100644 lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h create mode 100644 lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml create mode 100644 lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h new file mode 100644 index 0000000000..112730fcba --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h @@ -0,0 +1,23 @@ +#ifndef _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_COPY_INSERTION_H +#define _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_COPY_INSERTION_H + +#include "task-spec/dynamic_graph/dynamic_node_attrs.dtg.h" +#include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" +#include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.dtg.h" + +namespace FlexFlow { + +bool value_is_mapped(DynamicValueAttrs const &); + +bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &); +bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &); + +std::unordered_set + perform_copy_insertion_for_invocation(DynamicNodeInvocation const &); + +DynamicOpenDataflowGraph + perform_copy_insertion(DynamicOpenDataflowGraph const &); + +} // namespace FlexFlow + +#endif diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml new file mode 100644 index 0000000000..de13cdd9a8 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.toml @@ -0,0 +1,13 @@ +namespace = "FlexFlow" +name = "dynamic_copy_layer_guid_t" +type = "struct" +features = [ + "eq", + "ord", + "hash", + "json", + "fmt", + "rapidcheck", +] + +fields = [] diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml index bd64f52567..8def0ec5fb 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.toml @@ -12,6 +12,7 @@ includes = [ "pcg/layer_guid_t.dtg.h", "pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h", "task-spec/dynamic_graph/dynamic_loss_layer_guid_t.dtg.h", + "task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.h", ] [[values]] @@ -25,3 +26,7 @@ key = "pcg_layer_guid" [[values]] type = "::FlexFlow::dynamic_loss_layer_guid_t" key = "loss_layer_guid" + +[[values]] +type = "::FlexFlow::dynamic_copy_layer_guid_t" +key = "copy_layer_guid" diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc new file mode 100644 index 0000000000..63eb3afdfb --- /dev/null +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -0,0 +1,116 @@ +#include "task-spec/dynamic_graph/copy_insertion.h" +#include "op-attrs/parallel_tensor_space_coordinate.dtg.h" +#include "pcg/machine_space_coordinate.dtg.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/dynamic_node_attrs.dtg.h" +#include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" +#include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "task-spec/dynamic_graph/dynamic_tensor_slot.dtg.h" +#include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" +#include "utils/containers/map_values2.h" +#include "utils/containers/transform.h" +#include "utils/optional.h" +#include + +namespace FlexFlow { + +bool value_is_mapped(DynamicValueAttrs const &n) { + return n.mapping.has_value(); +} + +bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &g) { + auto node_is_mapped = [](DynamicNodeAttrs const &) -> bool { return false; }; + auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return false; }; + + return no_part_of_dynamic_graph_satisfies( + g, node_is_mapped, value_is_mapped, slot_is_mapped); +} + +bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &g) { + auto node_is_mapped = [](DynamicNodeAttrs const &) -> bool { return true; }; + auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return true; }; + + return full_dynamic_graph_satisfies( + g, node_is_mapped, value_is_mapped, slot_is_mapped); +} + +static DynamicValueAttrs map_dynamic_value_attrs_for_task_group( + DynamicTensorSlot const &slot, + DynamicValueAttrs const &value, + MappedOperatorTaskGroup const &mapping) { + DynamicValueAttrs result = value; + result.mapping = get_tensor_bindings_for_slot_name(mapping, slot.slot_name); + return result; +} + +std::unordered_set perform_copy_insertion_for_invocation( + DynamicNodeInvocation const &i, + std::unordered_map const &sources) { + + MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); + + auto map_tensor = [&](DynamicTensorSlot const &slot, + DynamicValueAttrs const &value) { + return map_dynamic_value_attrs_for_task_group(slot, value, mapping); + }; + + std::unordered_map mapped_inputs = + map_values2(i.inputs, map_tensor); + std::unordered_map mapped_outputs = + map_values2(i.outputs, map_tensor); + + std::unordered_set result{DynamicNodeInvocation{ + /*inputs=*/mapped_inputs, + /*node_attrs=*/i.node_attrs, + /*outputs=*/mapped_outputs, + }}; + + for (auto const &[slot, input] : i.inputs) { + DynamicValueAttrs source_value = sources.at(input); + DynamicValueAttrs use_value = mapped_inputs.at(slot); + if (source_value != use_value) { + result.insert(DynamicNodeInvocation{ + /*inputs=*/{{slot, source_value}}, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/{{slot, use_value}}, + }); + } + } + + return result; +} + +DynamicOpenDataflowGraph + perform_copy_insertion(DynamicOpenDataflowGraph const &g) { + + ASSERT(no_part_of_graph_is_copy_inserted(g)); + + std::unordered_map sources; + for (DynamicNodeInvocation const &i : g.invocations) { + for (auto const &[slot, value] : i.outputs) { + sources.insert( + std::pair{value, + map_dynamic_value_attrs_for_task_group( + slot, value, assert_unwrap(i.node_attrs.mapping))}); + } + } + + DynamicOpenDataflowGraph result = + flatmap_dynamic_invocation_set(g, [&](DynamicNodeInvocation const &i) { + return perform_copy_insertion_for_invocation(i, sources); + }); + + ASSERT(graph_is_fully_copy_inserted(result)); + + return result; +} + +} // namespace FlexFlow diff --git a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc index 4f19780849..246f9a3242 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc @@ -1,7 +1,6 @@ #include "task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.h" #include "op-attrs/parallel_tensor_shape.h" #include "op-attrs/pcg_operator_attrs.h" -#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" #include "pcg/parallel_computation_graph/parallel_computation_graph.h" #include "pcg/parallel_computation_graph/parallel_tensor_attrs.dtg.h" #include "task-spec/dynamic_graph/dynamic_layer_guid_t.dtg.h" @@ -20,11 +19,10 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( for (auto const &[layer, attrs] : get_parallel_layer_attrs_mapping(mpcg.pcg)) { - MappedOperatorTaskGroup node_mapping = mpcg.mapped_tasks.at(layer); DynamicNodeAttrs result_attrs{ /*task_type=*/std::nullopt, /*device_coord=*/std::nullopt, - /*mapping=*/node_mapping, + /*mapping=*/mpcg.mapped_tasks.at(layer), /*op_attrs=*/TrainingOperationAttrs{attrs.op_attrs}, /*pcg_layer_guid=*/dynamic_layer_guid_t{layer}, /*per_device_op_state=*/std::nullopt, @@ -45,9 +43,7 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/attrs.shape, /*shard_coord=*/std::nullopt, - /*mapping=*/ - get_tensor_bindings_for_slot_name(node_mapping, - slot_name), + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, @@ -68,9 +64,7 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( /*tensor_guid=*/dynamic_tensor_guid_t{tensor}, /*parallel_tensor_shape=*/attrs.shape, /*shard_coord=*/std::nullopt, - /*mapping=*/ - get_tensor_bindings_for_slot_name(node_mapping, - slot_name), + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc index 402e0ef055..999bee3d92 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,6 +1,8 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "utils/bidict/algorithms/filter_keys.h" #include "utils/containers/map_values2.h" +#include "utils/containers/transform.h" #include "utils/optional.h" namespace FlexFlow { @@ -35,6 +37,16 @@ bool graph_is_fully_shard_expanded(DynamicOpenDataflowGraph const &g) { slot_is_shard_expanded); } +static bidict + subset_tensor_mapping_for_coord( + bidict const + &mapping, + ParallelTensorSpaceCoordinate const ¶llel_tensor_coord) { + return filter_keys(mapping, [&](ParallelTensorSpaceCoordinate const &p) { + return p == parallel_tensor_coord; + }); +} + static DynamicNodeInvocation shard_invocation_for_binding( DynamicNodeInvocation const &i, MachineSpaceCoordinate const &machine_coord, @@ -47,6 +59,13 @@ static DynamicNodeInvocation shard_invocation_for_binding( DynamicValueAttrs result = v; result.shard_coord = parallel_tensor_coord; + result.mapping = transform( + v.mapping, + [&](bidict const + &mapping) { + return subset_tensor_mapping_for_coord(mapping, + parallel_tensor_coord); + }); return result; }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc index 61d55408dc..a30c342278 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc @@ -1,5 +1,6 @@ #include "task-spec/dynamic_graph/machine_slicing.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "utils/bidict/algorithms/filter_keys.h" #include using namespace ::FlexFlow; @@ -110,8 +111,18 @@ TEST_SUITE(FF_TEST_SUITE) { auto mk_value = [&](size_t src_node_id, TensorSlotName src_slot_name, + TensorSlotName use_slot_name, std::optional const &shard_coord) -> DynamicValueAttrs { + bidict + tensor_binding = get_tensor_bindings_for_slot_name(mapped_task_group, + use_slot_name); + if (shard_coord.has_value()) { + tensor_binding = filter_keys( + tensor_binding, [&](ParallelTensorSpaceCoordinate const &p) { + return p == shard_coord.value(); + }); + } return DynamicValueAttrs{ /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ KwargDataflowOutput{ @@ -122,7 +133,7 @@ TEST_SUITE(FF_TEST_SUITE) { /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/shard_coord, /*mapping=*/ - get_tensor_bindings_for_slot_name(mapped_task_group, src_slot_name), + tensor_binding, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }; @@ -132,28 +143,21 @@ TEST_SUITE(FF_TEST_SUITE) { size_t invocation2_id = 21; size_t invocation3_id = 22; - DynamicValueAttrs graph_input1 = - mk_value(0, TensorSlotName::OUTPUT, std::nullopt); - DynamicValueAttrs graph_input2 = - mk_value(1, TensorSlotName::OUTPUT, std::nullopt); - DynamicValueAttrs invocation1_output1 = - mk_value(invocation1_id, TensorSlotName::OUTPUT_1, std::nullopt); - DynamicValueAttrs invocation1_output2 = - mk_value(invocation1_id, TensorSlotName::OUTPUT_2, std::nullopt); - DynamicValueAttrs invocation2_output1 = - mk_value(invocation2_id, TensorSlotName::OUTPUT_4, std::nullopt); - DynamicValueAttrs invocation3_output1 = - mk_value(invocation3_id, TensorSlotName::OUTPUT_1, std::nullopt); - DynamicNodeInvocation invocation1 = DynamicNodeInvocation{ /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - graph_input1, + mk_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + std::nullopt), }, { mk_slot(TensorSlotName::WEIGHT), - graph_input2, + mk_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + std::nullopt), }, }, /*node_attrs=*/ @@ -170,18 +174,30 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - invocation1_output1, + mk_value(invocation1_id, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + std::nullopt), }, { mk_slot(TensorSlotName::OUTPUT_2), - invocation1_output2, + mk_value(invocation1_id, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + std::nullopt), }, }, }; DynamicNodeInvocation invocation2 = DynamicNodeInvocation{ /*inputs=*/{ - {mk_slot(TensorSlotName::INPUT), invocation1_output2}, + { + mk_slot(TensorSlotName::INPUT), + mk_value(invocation1_id, + TensorSlotName::OUTPUT_2, + TensorSlotName::INPUT, + std::nullopt), + }, }, /*node_attrs=*/ DynamicNodeAttrs{ @@ -197,7 +213,10 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_4), - invocation2_output1, + mk_value(invocation2_id, + TensorSlotName::OUTPUT_4, + TensorSlotName::OUTPUT_4, + std::nullopt), }, }, }; @@ -206,15 +225,24 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{ { mk_slot(TensorSlotName::KEY), - invocation2_output1, + mk_value(invocation2_id, + TensorSlotName::OUTPUT_4, + TensorSlotName::KEY, + std::nullopt), }, { mk_slot(TensorSlotName::QUERY), - graph_input2, + mk_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::QUERY, + std::nullopt), }, { mk_slot(TensorSlotName::VALUE), - invocation1_output1, + mk_value(invocation1_id, + TensorSlotName::OUTPUT_1, + TensorSlotName::VALUE, + std::nullopt), }, }, /*node_attrs=*/ @@ -231,7 +259,10 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - invocation3_output1, + mk_value(invocation3_id, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + std::nullopt), }, }, }; diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc index 340ee1c8fa..36a73cf1a2 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,6 +1,7 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" #include "test/utils/doctest/fmt/unordered_set.h" +#include "utils/bidict/algorithms/filter_keys.h" #include using namespace ::FlexFlow; @@ -110,8 +111,18 @@ TEST_SUITE(FF_TEST_SUITE) { auto mk_value = [&](size_t src_node_id, TensorSlotName src_slot_name, + TensorSlotName use_slot_name, std::optional const &shard_coord) -> DynamicValueAttrs { + bidict + tensor_binding = get_tensor_bindings_for_slot_name(mapped_task_group, + use_slot_name); + if (shard_coord.has_value()) { + tensor_binding = filter_keys( + tensor_binding, [&](ParallelTensorSpaceCoordinate const &p) { + return p == shard_coord.value(); + }); + } return DynamicValueAttrs{ /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ KwargDataflowOutput{ @@ -122,7 +133,7 @@ TEST_SUITE(FF_TEST_SUITE) { /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/shard_coord, /*mapping=*/ - get_tensor_bindings_for_slot_name(mapped_task_group, src_slot_name), + tensor_binding, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }; @@ -132,11 +143,17 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, TensorSlotName::OUTPUT, std::nullopt), + mk_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + std::nullopt), }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, TensorSlotName::OUTPUT, std::nullopt), + mk_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + std::nullopt), }, }, /*node_attrs=*/ @@ -153,11 +170,17 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(20, TensorSlotName::OUTPUT_1, std::nullopt), + mk_value(20, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + std::nullopt), }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(20, TensorSlotName::OUTPUT_2, std::nullopt), + mk_value(20, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + std::nullopt), }, }, }; @@ -176,11 +199,17 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, TensorSlotName::OUTPUT, input_shard_coord), + mk_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + input_shard_coord), }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, TensorSlotName::OUTPUT, weight_shard_coord), + mk_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + weight_shard_coord), }, }, /*node_attrs=*/ @@ -197,11 +226,17 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(20, TensorSlotName::OUTPUT_1, output_1_shard_coord), + mk_value(20, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + output_1_shard_coord), }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(20, TensorSlotName::OUTPUT_2, output_2_shard_coord), + mk_value(20, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + output_2_shard_coord), }, }, }; From 76afe660ced7deab6e3cd3d0056121180a195b20 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 10:41:43 -0800 Subject: [PATCH 04/20] Don't test mapping in machine_slicing. --- .../dynamic_graph/machine_slicing.cc | 132 ++++-------------- 1 file changed, 27 insertions(+), 105 deletions(-) diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc index a30c342278..40b3460ee5 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/machine_slicing.cc @@ -1,6 +1,5 @@ #include "task-spec/dynamic_graph/machine_slicing.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" -#include "utils/bidict/algorithms/filter_keys.h" #include using namespace ::FlexFlow; @@ -33,33 +32,6 @@ TEST_SUITE(FF_TEST_SUITE) { }; }; - auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, - ParallelTensorSpaceCoordinate const &c2, - ParallelTensorSpaceCoordinate const &c3, - ParallelTensorSpaceCoordinate const &c4) - -> OperatorAtomicTaskShardBinding { - return OperatorAtomicTaskShardBinding{ - /*tensor_coords=*/{ - { - TensorSlotName::INPUT, - c1, - }, - { - TensorSlotName::WEIGHT, - c2, - }, - { - TensorSlotName::OUTPUT_1, - c3, - }, - { - TensorSlotName::OUTPUT_2, - c4, - }, - }, - }; - }; - MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); MachineSpaceCoordinate mc2 = mk_machine_coord(2_n, 0_n); MachineSpaceCoordinate mc3 = mk_machine_coord(4_n, 0_n); @@ -82,25 +54,6 @@ TEST_SUITE(FF_TEST_SUITE) { ParallelTensorSpaceCoordinate mc2_output_2_coord = mk_pt_coord(0_n, 0_n, 0_n, 0_n); - MappedOperatorTaskGroup mapped_task_group = MappedOperatorTaskGroup{ - bidict{ - { - mc1, - mk_shard_binding(mc1_input_coord, - mc1_weight_coord, - mc1_output_1_coord, - mc1_output_2_coord), - }, - { - mc2, - mk_shard_binding(mc2_input_coord, - mc2_weight_coord, - mc2_output_1_coord, - mc2_output_2_coord), - }, - }, - }; - auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { return DynamicTensorSlot{ /*slot_name=*/slot_name, @@ -109,20 +62,10 @@ TEST_SUITE(FF_TEST_SUITE) { }; auto mk_value = - [&](size_t src_node_id, - TensorSlotName src_slot_name, - TensorSlotName use_slot_name, - std::optional const &shard_coord) + [](size_t src_node_id, + TensorSlotName src_slot_name, + std::optional const &shard_coord) -> DynamicValueAttrs { - bidict - tensor_binding = get_tensor_bindings_for_slot_name(mapped_task_group, - use_slot_name); - if (shard_coord.has_value()) { - tensor_binding = filter_keys( - tensor_binding, [&](ParallelTensorSpaceCoordinate const &p) { - return p == shard_coord.value(); - }); - } return DynamicValueAttrs{ /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ KwargDataflowOutput{ @@ -132,8 +75,7 @@ TEST_SUITE(FF_TEST_SUITE) { }}, /*parallel_tensor_shape=*/std::nullopt, /*shard_coord=*/shard_coord, - /*mapping=*/ - tensor_binding, + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/std::nullopt, }; @@ -143,21 +85,28 @@ TEST_SUITE(FF_TEST_SUITE) { size_t invocation2_id = 21; size_t invocation3_id = 22; + DynamicValueAttrs graph_input1 = + mk_value(0, TensorSlotName::OUTPUT, std::nullopt); + DynamicValueAttrs graph_input2 = + mk_value(1, TensorSlotName::OUTPUT, std::nullopt); + DynamicValueAttrs invocation1_output1 = + mk_value(invocation1_id, TensorSlotName::OUTPUT_1, std::nullopt); + DynamicValueAttrs invocation1_output2 = + mk_value(invocation1_id, TensorSlotName::OUTPUT_2, std::nullopt); + DynamicValueAttrs invocation2_output1 = + mk_value(invocation2_id, TensorSlotName::OUTPUT_4, std::nullopt); + DynamicValueAttrs invocation3_output1 = + mk_value(invocation3_id, TensorSlotName::OUTPUT_1, std::nullopt); + DynamicNodeInvocation invocation1 = DynamicNodeInvocation{ /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, - TensorSlotName::OUTPUT, - TensorSlotName::INPUT, - std::nullopt), + graph_input1, }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, - TensorSlotName::OUTPUT, - TensorSlotName::WEIGHT, - std::nullopt), + graph_input2, }, }, /*node_attrs=*/ @@ -174,30 +123,18 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(invocation1_id, - TensorSlotName::OUTPUT_1, - TensorSlotName::OUTPUT_1, - std::nullopt), + invocation1_output1, }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(invocation1_id, - TensorSlotName::OUTPUT_2, - TensorSlotName::OUTPUT_2, - std::nullopt), + invocation1_output2, }, }, }; DynamicNodeInvocation invocation2 = DynamicNodeInvocation{ /*inputs=*/{ - { - mk_slot(TensorSlotName::INPUT), - mk_value(invocation1_id, - TensorSlotName::OUTPUT_2, - TensorSlotName::INPUT, - std::nullopt), - }, + {mk_slot(TensorSlotName::INPUT), invocation1_output2}, }, /*node_attrs=*/ DynamicNodeAttrs{ @@ -213,10 +150,7 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_4), - mk_value(invocation2_id, - TensorSlotName::OUTPUT_4, - TensorSlotName::OUTPUT_4, - std::nullopt), + invocation2_output1, }, }, }; @@ -225,24 +159,15 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{ { mk_slot(TensorSlotName::KEY), - mk_value(invocation2_id, - TensorSlotName::OUTPUT_4, - TensorSlotName::KEY, - std::nullopt), + invocation2_output1, }, { mk_slot(TensorSlotName::QUERY), - mk_value(1, - TensorSlotName::OUTPUT, - TensorSlotName::QUERY, - std::nullopt), + graph_input2, }, { mk_slot(TensorSlotName::VALUE), - mk_value(invocation1_id, - TensorSlotName::OUTPUT_1, - TensorSlotName::VALUE, - std::nullopt), + invocation1_output1, }, }, /*node_attrs=*/ @@ -259,10 +184,7 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(invocation3_id, - TensorSlotName::OUTPUT_1, - TensorSlotName::OUTPUT_1, - std::nullopt), + invocation3_output1, }, }, }; From efc8d5645c5089925dd377d8160057d34da5e81f Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 11:12:31 -0800 Subject: [PATCH 05/20] Basic test for no copies. --- .../task-spec/dynamic_graph/copy_insertion.h | 5 +- .../task-spec/dynamic_graph/copy_insertion.cc | 236 ++++++++++++++++++ 2 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h index 112730fcba..33c41d9c7e 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h @@ -12,8 +12,9 @@ bool value_is_mapped(DynamicValueAttrs const &); bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &); bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &); -std::unordered_set - perform_copy_insertion_for_invocation(DynamicNodeInvocation const &); +std::unordered_set perform_copy_insertion_for_invocation( + DynamicNodeInvocation const &i, + std::unordered_map const &sources); DynamicOpenDataflowGraph perform_copy_insertion(DynamicOpenDataflowGraph const &); diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc new file mode 100644 index 0000000000..c3bbb29111 --- /dev/null +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc @@ -0,0 +1,236 @@ +#include "task-spec/dynamic_graph/copy_insertion.h" +#include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "test/utils/doctest/fmt/unordered_set.h" +#include + +using namespace ::FlexFlow; + +TEST_SUITE(FF_TEST_SUITE) { + TEST_CASE("perform_copy_insertion_for_invocation") { + auto mk_machine_coord = + [](nonnegative_int node_idx, + nonnegative_int device_idx) -> MachineSpaceCoordinate { + return MachineSpaceCoordinate{ + /*node_idx=*/node_idx, + /*device_idx=*/device_idx, + /*device_type=*/DeviceType::GPU, + }; + }; + + auto mk_pt_coord = + [](nonnegative_int idx1, + nonnegative_int idx2, + nonnegative_int idx3, + nonnegative_int idx4) -> ParallelTensorSpaceCoordinate { + return ParallelTensorSpaceCoordinate{ + /*sum_component=*/idx1, + /*discard_copy_component=*/idx2, + /*shard_components=*/ + FFOrdered{ + idx3, + idx4, + }, + }; + }; + + auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, + ParallelTensorSpaceCoordinate const &c2, + ParallelTensorSpaceCoordinate const &c3, + ParallelTensorSpaceCoordinate const &c4) + -> OperatorAtomicTaskShardBinding { + return OperatorAtomicTaskShardBinding{ + /*tensor_coords=*/{ + { + TensorSlotName::INPUT, + c1, + }, + { + TensorSlotName::WEIGHT, + c2, + }, + { + TensorSlotName::OUTPUT_1, + c3, + }, + { + TensorSlotName::OUTPUT_2, + c4, + }, + }, + }; + }; + + MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); + MachineSpaceCoordinate mc2 = mk_machine_coord(2_n, 0_n); + + ParallelTensorSpaceCoordinate mc1_input_coord = + mk_pt_coord(0_n, 0_n, 0_n, 0_n); + ParallelTensorSpaceCoordinate mc1_weight_coord = + mk_pt_coord(0_n, 1_n, 2_n, 0_n); + ParallelTensorSpaceCoordinate mc1_output_1_coord = + mk_pt_coord(1_n, 0_n, 0_n, 1_n); + ParallelTensorSpaceCoordinate mc1_output_2_coord = + mk_pt_coord(3_n, 0_n, 0_n, 0_n); + + ParallelTensorSpaceCoordinate mc2_input_coord = + mk_pt_coord(0_n, 1_n, 0_n, 0_n); + ParallelTensorSpaceCoordinate mc2_weight_coord = + mk_pt_coord(0_n, 4_n, 2_n, 0_n); + ParallelTensorSpaceCoordinate mc2_output_1_coord = + mk_pt_coord(1_n, 2_n, 0_n, 1_n); + ParallelTensorSpaceCoordinate mc2_output_2_coord = + mk_pt_coord(0_n, 0_n, 0_n, 0_n); + + MappedOperatorTaskGroup mapped_task_group = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_shard_binding(mc1_input_coord, + mc1_weight_coord, + mc1_output_1_coord, + mc1_output_2_coord), + }, + { + mc2, + mk_shard_binding(mc2_input_coord, + mc2_weight_coord, + mc2_output_1_coord, + mc2_output_2_coord), + }, + }, + }; + + auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { + return DynamicTensorSlot{ + /*slot_name=*/slot_name, + /*slot_tensor_role=*/std::nullopt, + }; + }; + + auto mk_value = [&](size_t src_node_id, + TensorSlotName src_slot_name, + std::optional const &use_slot_name) + -> DynamicValueAttrs { + return DynamicValueAttrs{ + /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ + KwargDataflowOutput{ + Node{src_node_id}, + src_slot_name, + }, + }}, + /*parallel_tensor_shape=*/std::nullopt, + /*shard_coord=*/std::nullopt, + /*mapping=*/ + transform(use_slot_name, + [&](TensorSlotName s) { + return get_tensor_bindings_for_slot_name( + mapped_task_group, s); + }), + /*accessor=*/std::nullopt, + /*role=*/std::nullopt, + }; + }; + + size_t invocation1_id = 20; + + DynamicValueAttrs graph_input1 = + mk_value(0, TensorSlotName::OUTPUT, std::nullopt); + DynamicValueAttrs graph_input1_src = + mk_value(0, TensorSlotName::OUTPUT, TensorSlotName::INPUT); + DynamicValueAttrs graph_input1_use = + mk_value(0, TensorSlotName::OUTPUT, TensorSlotName::INPUT); + DynamicValueAttrs graph_input2 = + mk_value(1, TensorSlotName::OUTPUT, std::nullopt); + DynamicValueAttrs graph_input2_src = + mk_value(1, TensorSlotName::OUTPUT, TensorSlotName::WEIGHT); + DynamicValueAttrs graph_input2_use = + mk_value(1, TensorSlotName::OUTPUT, TensorSlotName::WEIGHT); + DynamicValueAttrs invocation1_output1 = + mk_value(invocation1_id, TensorSlotName::OUTPUT_1, std::nullopt); + DynamicValueAttrs invocation1_output1_src = mk_value( + invocation1_id, TensorSlotName::OUTPUT_1, TensorSlotName::OUTPUT_1); + DynamicValueAttrs invocation1_output2 = + mk_value(invocation1_id, TensorSlotName::OUTPUT_2, std::nullopt); + DynamicValueAttrs invocation1_output2_src = mk_value( + invocation1_id, TensorSlotName::OUTPUT_2, TensorSlotName::OUTPUT_2); + + DynamicNodeInvocation input = DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + graph_input1, + }, + { + mk_slot(TensorSlotName::WEIGHT), + graph_input2, + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/mapped_task_group, + /*op_attrs=*/std::nullopt, + /*layer_guid=*/ + dynamic_layer_guid_t{parallel_layer_guid_t{Node{20}}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT_1), + invocation1_output1, + }, + { + mk_slot(TensorSlotName::OUTPUT_2), + invocation1_output2, + }, + }, + }; + + std::unordered_map sources{ + {graph_input1, graph_input1_src}, {graph_input2, graph_input2_src}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources); + + DynamicNodeInvocation mapped = DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + graph_input1_use, + }, + { + mk_slot(TensorSlotName::WEIGHT), + graph_input2_use, + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/mapped_task_group, + /*op_attrs=*/std::nullopt, + /*layer_guid=*/ + dynamic_layer_guid_t{parallel_layer_guid_t{Node{20}}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT_1), + invocation1_output1_src, + }, + { + mk_slot(TensorSlotName::OUTPUT_2), + invocation1_output2_src, + }, + }, + }; + + std::unordered_set correct = {mapped}; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } +} From f43019f3855be7a149f86f237ca423e981547d08 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 11:54:27 -0800 Subject: [PATCH 06/20] Test copy case. --- .../task-spec/dynamic_graph/copy_insertion.cc | 18 +- .../task-spec/dynamic_graph/copy_insertion.cc | 218 +++++++++++++++--- 2 files changed, 201 insertions(+), 35 deletions(-) diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index 63eb3afdfb..b222db4491 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -1,5 +1,6 @@ #include "task-spec/dynamic_graph/copy_insertion.h" #include "op-attrs/parallel_tensor_space_coordinate.dtg.h" +#include "op-attrs/tensor_slot_name.dtg.h" #include "pcg/machine_space_coordinate.dtg.h" #include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" #include "task-spec/dynamic_graph/dynamic_node_attrs.dtg.h" @@ -70,7 +71,13 @@ std::unordered_set perform_copy_insertion_for_invocation( DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { result.insert(DynamicNodeInvocation{ - /*inputs=*/{{slot, source_value}}, + /*inputs=*/{ + { + DynamicTensorSlot{TensorSlotName::INPUT, + slot.slot_tensor_role}, + source_value, + }, + }, /*node_attrs=*/ DynamicNodeAttrs{ /*task_type=*/std::nullopt, @@ -80,7 +87,14 @@ std::unordered_set perform_copy_insertion_for_invocation( /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, /*per_device_op_state=*/std::nullopt, }, - /*outputs=*/{{slot, use_value}}, + /*outputs=*/ + { + { + DynamicTensorSlot{TensorSlotName::OUTPUT, + slot.slot_tensor_role}, + use_value, + }, + }, }); } } diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc index c3bbb29111..1916afc02e 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc @@ -1,5 +1,7 @@ #include "task-spec/dynamic_graph/copy_insertion.h" +#include "op-attrs/tensor_slot_name.dtg.h" #include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "test/utils/doctest/fmt/unordered_set.h" #include @@ -33,6 +35,18 @@ TEST_SUITE(FF_TEST_SUITE) { }; }; + auto mk_input_shard_binding = [&](ParallelTensorSpaceCoordinate const &c) + -> OperatorAtomicTaskShardBinding { + return OperatorAtomicTaskShardBinding{ + /*tensor_coords=*/{ + { + TensorSlotName::OUTPUT, + c, + }, + }, + }; + }; + auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, ParallelTensorSpaceCoordinate const &c2, ParallelTensorSpaceCoordinate const &c3, @@ -61,7 +75,9 @@ TEST_SUITE(FF_TEST_SUITE) { }; MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); - MachineSpaceCoordinate mc2 = mk_machine_coord(2_n, 0_n); + MachineSpaceCoordinate mc2 = mk_machine_coord(1_n, 0_n); + MachineSpaceCoordinate mc3 = mk_machine_coord(2_n, 0_n); + MachineSpaceCoordinate mc4 = mk_machine_coord(3_n, 0_n); ParallelTensorSpaceCoordinate mc1_input_coord = mk_pt_coord(0_n, 0_n, 0_n, 0_n); @@ -81,7 +97,69 @@ TEST_SUITE(FF_TEST_SUITE) { ParallelTensorSpaceCoordinate mc2_output_2_coord = mk_pt_coord(0_n, 0_n, 0_n, 0_n); - MappedOperatorTaskGroup mapped_task_group = MappedOperatorTaskGroup{ + MappedOperatorTaskGroup input_mapping_same = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc2, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + MappedOperatorTaskGroup input_mapping_copy1 = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc3, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + MappedOperatorTaskGroup input_mapping_copy2 = MappedOperatorTaskGroup{ + bidict{ + { + mc3, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc4, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + + MappedOperatorTaskGroup weight_mapping_same = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_weight_coord), + }, + { + mc2, + mk_input_shard_binding(mc2_weight_coord), + }, + }, + }; + MappedOperatorTaskGroup weight_mapping_copy2 = MappedOperatorTaskGroup{ + bidict{ + { + mc4, + mk_input_shard_binding(mc1_weight_coord), + }, + { + mc3, + mk_input_shard_binding(mc2_weight_coord), + }, + }, + }; + + MappedOperatorTaskGroup invocation_mapping = MappedOperatorTaskGroup{ bidict{ { mc1, @@ -109,6 +187,7 @@ TEST_SUITE(FF_TEST_SUITE) { auto mk_value = [&](size_t src_node_id, TensorSlotName src_slot_name, + MappedOperatorTaskGroup const &mapping, std::optional const &use_slot_name) -> DynamicValueAttrs { return DynamicValueAttrs{ @@ -123,8 +202,7 @@ TEST_SUITE(FF_TEST_SUITE) { /*mapping=*/ transform(use_slot_name, [&](TensorSlotName s) { - return get_tensor_bindings_for_slot_name( - mapped_task_group, s); + return get_tensor_bindings_for_slot_name(mapping, s); }), /*accessor=*/std::nullopt, /*role=*/std::nullopt, @@ -134,25 +212,44 @@ TEST_SUITE(FF_TEST_SUITE) { size_t invocation1_id = 20; DynamicValueAttrs graph_input1 = - mk_value(0, TensorSlotName::OUTPUT, std::nullopt); - DynamicValueAttrs graph_input1_src = - mk_value(0, TensorSlotName::OUTPUT, TensorSlotName::INPUT); - DynamicValueAttrs graph_input1_use = - mk_value(0, TensorSlotName::OUTPUT, TensorSlotName::INPUT); + mk_value(0, TensorSlotName::OUTPUT, invocation_mapping, std::nullopt); + DynamicValueAttrs graph_input1_use = mk_value( + 0, TensorSlotName::OUTPUT, invocation_mapping, TensorSlotName::INPUT); DynamicValueAttrs graph_input2 = - mk_value(1, TensorSlotName::OUTPUT, std::nullopt); - DynamicValueAttrs graph_input2_src = - mk_value(1, TensorSlotName::OUTPUT, TensorSlotName::WEIGHT); - DynamicValueAttrs graph_input2_use = - mk_value(1, TensorSlotName::OUTPUT, TensorSlotName::WEIGHT); - DynamicValueAttrs invocation1_output1 = - mk_value(invocation1_id, TensorSlotName::OUTPUT_1, std::nullopt); - DynamicValueAttrs invocation1_output1_src = mk_value( - invocation1_id, TensorSlotName::OUTPUT_1, TensorSlotName::OUTPUT_1); - DynamicValueAttrs invocation1_output2 = - mk_value(invocation1_id, TensorSlotName::OUTPUT_2, std::nullopt); - DynamicValueAttrs invocation1_output2_src = mk_value( - invocation1_id, TensorSlotName::OUTPUT_2, TensorSlotName::OUTPUT_2); + mk_value(1, TensorSlotName::OUTPUT, invocation_mapping, std::nullopt); + DynamicValueAttrs graph_input2_use = mk_value( + 1, TensorSlotName::OUTPUT, invocation_mapping, TensorSlotName::WEIGHT); + DynamicValueAttrs invocation1_output1 = mk_value(invocation1_id, + TensorSlotName::OUTPUT_1, + invocation_mapping, + std::nullopt); + DynamicValueAttrs invocation1_output1_src = + mk_value(invocation1_id, + TensorSlotName::OUTPUT_1, + invocation_mapping, + TensorSlotName::OUTPUT_1); + DynamicValueAttrs invocation1_output2 = mk_value(invocation1_id, + TensorSlotName::OUTPUT_2, + invocation_mapping, + std::nullopt); + DynamicValueAttrs invocation1_output2_src = + mk_value(invocation1_id, + TensorSlotName::OUTPUT_2, + invocation_mapping, + TensorSlotName::OUTPUT_2); + + DynamicValueAttrs graph_input1_src_same = mk_value( + 0, TensorSlotName::OUTPUT, input_mapping_same, TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input1_src_copy1 = mk_value( + 0, TensorSlotName::OUTPUT, input_mapping_copy1, TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input1_src_copy2 = mk_value( + 0, TensorSlotName::OUTPUT, input_mapping_copy2, TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input2_src_same = mk_value( + 1, TensorSlotName::OUTPUT, weight_mapping_same, TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input2_src_copy2 = mk_value(1, + TensorSlotName::OUTPUT, + weight_mapping_copy2, + TensorSlotName::OUTPUT); DynamicNodeInvocation input = DynamicNodeInvocation{ /*inputs=*/{ @@ -169,7 +266,7 @@ TEST_SUITE(FF_TEST_SUITE) { DynamicNodeAttrs{ /*task_type=*/std::nullopt, /*device_coord=*/std::nullopt, - /*mapping=*/mapped_task_group, + /*mapping=*/invocation_mapping, /*op_attrs=*/std::nullopt, /*layer_guid=*/ dynamic_layer_guid_t{parallel_layer_guid_t{Node{20}}}, @@ -188,12 +285,6 @@ TEST_SUITE(FF_TEST_SUITE) { }, }; - std::unordered_map sources{ - {graph_input1, graph_input1_src}, {graph_input2, graph_input2_src}}; - - std::unordered_set result = - perform_copy_insertion_for_invocation(input, sources); - DynamicNodeInvocation mapped = DynamicNodeInvocation{ /*inputs=*/{ { @@ -209,7 +300,7 @@ TEST_SUITE(FF_TEST_SUITE) { DynamicNodeAttrs{ /*task_type=*/std::nullopt, /*device_coord=*/std::nullopt, - /*mapping=*/mapped_task_group, + /*mapping=*/invocation_mapping, /*op_attrs=*/std::nullopt, /*layer_guid=*/ dynamic_layer_guid_t{parallel_layer_guid_t{Node{20}}}, @@ -228,9 +319,70 @@ TEST_SUITE(FF_TEST_SUITE) { }, }; - std::unordered_set correct = {mapped}; + auto mk_copy = [&](DynamicValueAttrs const &src, + DynamicValueAttrs const &dst) { + return DynamicNodeInvocation{ + /*inputs=*/{{mk_slot(TensorSlotName::INPUT), src}}, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/{{mk_slot(TensorSlotName::OUTPUT), dst}}, + }; + }; + + SUBCASE("same mapping, no copies") { + std::unordered_map sources_same{ + {graph_input1, graph_input1_src_same}, + {graph_input2, graph_input2_src_same}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources_same); + + std::unordered_set correct = {mapped}; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } + + SUBCASE("copy one tensor, one point") { + std::unordered_map sources_copy1{ + {graph_input1, graph_input1_src_copy1}, + {graph_input2, graph_input2_src_same}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources_copy1); + + std::unordered_set correct = { + mapped, + mk_copy(graph_input1_src_copy1, graph_input1_use), + }; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } + + SUBCASE("copy two tensors, two points") { + std::unordered_map sources_copy2{ + {graph_input1, graph_input1_src_copy2}, + {graph_input2, graph_input2_src_copy2}}; + + std::unordered_set result = + perform_copy_insertion_for_invocation(input, sources_copy2); + + std::unordered_set correct = { + mapped, + mk_copy(graph_input1_src_copy2, graph_input1_use), + mk_copy(graph_input2_src_copy2, graph_input2_use), + }; - CHECK(result.size() == correct.size()); - CHECK(result == correct); + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } } } From 890bf542c3596f6068149bd0d0a51d4e2516209b Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 13:01:28 -0800 Subject: [PATCH 07/20] Check no copies pre-exist copy insertion. --- .../include/task-spec/dynamic_graph/copy_insertion.h | 1 + .../src/task-spec/dynamic_graph/copy_insertion.cc | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h index 33c41d9c7e..8578ee49ac 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h @@ -7,6 +7,7 @@ namespace FlexFlow { +bool node_is_copy(DynamicNodeAttrs const &n); bool value_is_mapped(DynamicValueAttrs const &); bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &); diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index b222db4491..60f6ced8c9 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -15,24 +15,26 @@ namespace FlexFlow { +bool node_is_copy(DynamicNodeAttrs const &n) { + return n.op_attrs.has_value() && n.op_attrs.value().is_copy(); +} + bool value_is_mapped(DynamicValueAttrs const &n) { return n.mapping.has_value(); } bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &g) { - auto node_is_mapped = [](DynamicNodeAttrs const &) -> bool { return false; }; auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return false; }; return no_part_of_dynamic_graph_satisfies( - g, node_is_mapped, value_is_mapped, slot_is_mapped); + g, node_is_copy, value_is_mapped, slot_is_mapped); } bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &g) { - auto node_is_mapped = [](DynamicNodeAttrs const &) -> bool { return true; }; auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return true; }; return full_dynamic_graph_satisfies( - g, node_is_mapped, value_is_mapped, slot_is_mapped); + g, node_is_copy, value_is_mapped, slot_is_mapped); } static DynamicValueAttrs map_dynamic_value_attrs_for_task_group( From 9a1479c9081059ef3563bf3c52465ffa0320e94f Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 13:41:28 -0800 Subject: [PATCH 08/20] Filter to avoid degenerate copies. --- .../task-spec/dynamic_graph/copy_insertion.cc | 43 ++++++++++++++++--- .../task-spec/dynamic_graph/copy_insertion.cc | 34 ++++++++++++++- 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index 60f6ced8c9..abdaaccfa9 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -8,10 +8,13 @@ #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" #include "task-spec/dynamic_graph/dynamic_tensor_slot.dtg.h" #include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" +#include "utils/bidict/algorithms/bidict_from_pairs.h" +#include "utils/bidict/algorithms/unordered_set_of.h" +#include "utils/containers/intersection.h" #include "utils/containers/map_values2.h" +#include "utils/containers/set_difference.h" #include "utils/containers/transform.h" #include "utils/optional.h" -#include namespace FlexFlow { @@ -46,6 +49,33 @@ static DynamicValueAttrs map_dynamic_value_attrs_for_task_group( return result; } +static std::pair + filter_mapping_to_avoid_degenerate_copies(DynamicValueAttrs const &input, + DynamicValueAttrs const &output) { + std::unordered_set< + std::pair> + input_mapping = unordered_set_of(assert_unwrap(input.mapping)); + std::unordered_set< + std::pair> + output_mapping = unordered_set_of(assert_unwrap(output.mapping)); + + // Exclude the point shared between the input and output mappings, because + // those will not result in actual copies once shard expansion is performed + std::unordered_set< + std::pair> + remove = intersection(input_mapping, output_mapping); + + DynamicValueAttrs filtered_input = input; + filtered_input.mapping = + bidict_from_pairs(set_difference(input_mapping, remove)); + + DynamicValueAttrs filtered_output = output; + filtered_output.mapping = + bidict_from_pairs(set_difference(output_mapping, remove)); + + return std::pair{filtered_input, filtered_output}; +} + std::unordered_set perform_copy_insertion_for_invocation( DynamicNodeInvocation const &i, std::unordered_map const &sources) { @@ -72,12 +102,14 @@ std::unordered_set perform_copy_insertion_for_invocation( DynamicValueAttrs source_value = sources.at(input); DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { - result.insert(DynamicNodeInvocation{ + auto const &[filtered_source, filtered_use] = + filter_mapping_to_avoid_degenerate_copies(source_value, use_value); + DynamicNodeInvocation copy{ /*inputs=*/{ { DynamicTensorSlot{TensorSlotName::INPUT, slot.slot_tensor_role}, - source_value, + filtered_source, }, }, /*node_attrs=*/ @@ -94,10 +126,11 @@ std::unordered_set perform_copy_insertion_for_invocation( { DynamicTensorSlot{TensorSlotName::OUTPUT, slot.slot_tensor_role}, - use_value, + filtered_use, }, }, - }); + }; + result.insert(copy); } } diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc index 1916afc02e..49a23c1f7f 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc @@ -121,6 +121,15 @@ TEST_SUITE(FF_TEST_SUITE) { }, }, }; + MappedOperatorTaskGroup input_mapping_copy1_diff_vs_use = + MappedOperatorTaskGroup{ + bidict{ + { + mc3, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; MappedOperatorTaskGroup input_mapping_copy2 = MappedOperatorTaskGroup{ bidict{ { @@ -178,6 +187,18 @@ TEST_SUITE(FF_TEST_SUITE) { }, }; + MappedOperatorTaskGroup invocation_mapping_diff_vs_copy1 = + MappedOperatorTaskGroup{ + bidict{ + { + mc2, + mk_shard_binding(mc2_input_coord, + mc2_weight_coord, + mc2_output_1_coord, + mc2_output_2_coord), + }, + }, + }; auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { return DynamicTensorSlot{ /*slot_name=*/slot_name, @@ -215,6 +236,11 @@ TEST_SUITE(FF_TEST_SUITE) { mk_value(0, TensorSlotName::OUTPUT, invocation_mapping, std::nullopt); DynamicValueAttrs graph_input1_use = mk_value( 0, TensorSlotName::OUTPUT, invocation_mapping, TensorSlotName::INPUT); + DynamicValueAttrs graph_input1_use_diff_vs_copy1 = + mk_value(0, + TensorSlotName::OUTPUT, + invocation_mapping_diff_vs_copy1, + TensorSlotName::INPUT); DynamicValueAttrs graph_input2 = mk_value(1, TensorSlotName::OUTPUT, invocation_mapping, std::nullopt); DynamicValueAttrs graph_input2_use = mk_value( @@ -242,6 +268,11 @@ TEST_SUITE(FF_TEST_SUITE) { 0, TensorSlotName::OUTPUT, input_mapping_same, TensorSlotName::OUTPUT); DynamicValueAttrs graph_input1_src_copy1 = mk_value( 0, TensorSlotName::OUTPUT, input_mapping_copy1, TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input1_src_copy1_diff_vs_use = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy1_diff_vs_use, + TensorSlotName::OUTPUT); DynamicValueAttrs graph_input1_src_copy2 = mk_value( 0, TensorSlotName::OUTPUT, input_mapping_copy2, TensorSlotName::OUTPUT); DynamicValueAttrs graph_input2_src_same = mk_value( @@ -360,7 +391,8 @@ TEST_SUITE(FF_TEST_SUITE) { std::unordered_set correct = { mapped, - mk_copy(graph_input1_src_copy1, graph_input1_use), + mk_copy(graph_input1_src_copy1_diff_vs_use, + graph_input1_use_diff_vs_copy1), }; CHECK(result.size() == correct.size()); From 49fd444f3734d648c4969361cc5b355687a1ce7f Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 16:40:22 -0800 Subject: [PATCH 09/20] Wire up copy insertion and fix shard expansion. --- .../src/realm-execution/pcg_instance.cc | 2 ++ .../task-spec/dynamic_graph/copy_insertion.cc | 18 +++++++++--- .../task-spec/dynamic_graph/loss_insertion.cc | 14 ++-------- .../dynamic_graph/shard_expansion.cc | 28 +++++++++++++++++++ 4 files changed, 46 insertions(+), 16 deletions(-) diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index 8263a49b0a..d1b544cf3e 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -7,6 +7,7 @@ #include "realm-execution/realm_context.h" #include "realm-execution/tasks/impl/op_task.h" #include "realm-execution/tensor_instance_backing.h" +#include "task-spec/dynamic_graph/copy_insertion.h" #include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" #include "task-spec/dynamic_graph/dynamic_task_type.dtg.h" @@ -104,6 +105,7 @@ PCGInstance create_pcg_instance( } dg = perform_update_insertion(dg, optimizer_attrs); + dg = perform_copy_insertion(dg); dg = perform_shard_expansion(dg); TensorInstanceBacking tensor_instance_backing = perform_instance_allocation(dg, inputs, ctx); diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index abdaaccfa9..076024cba5 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -10,6 +10,8 @@ #include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "utils/bidict/algorithms/bidict_from_pairs.h" #include "utils/bidict/algorithms/unordered_set_of.h" +#include "utils/containers/contains_key.h" +#include "utils/containers/flatmap.h" #include "utils/containers/intersection.h" #include "utils/containers/map_values2.h" #include "utils/containers/set_difference.h" @@ -34,10 +36,11 @@ bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &g) { } bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &g) { + auto node_is_any = [](DynamicNodeAttrs const &) -> bool { return true; }; auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return true; }; return full_dynamic_graph_satisfies( - g, node_is_copy, value_is_mapped, slot_is_mapped); + g, node_is_any, value_is_mapped, slot_is_mapped); } static DynamicValueAttrs map_dynamic_value_attrs_for_task_group( @@ -99,6 +102,10 @@ std::unordered_set perform_copy_insertion_for_invocation( }}; for (auto const &[slot, input] : i.inputs) { + if (!contains_key(sources, input)) { + continue; + } + DynamicValueAttrs source_value = sources.at(input); DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { @@ -152,10 +159,13 @@ DynamicOpenDataflowGraph } } + // Use regular flatmap here to remove duplicates (we don't want to copy the + // same tensor to the same place multiple times) DynamicOpenDataflowGraph result = - flatmap_dynamic_invocation_set(g, [&](DynamicNodeInvocation const &i) { - return perform_copy_insertion_for_invocation(i, sources); - }); + dynamic_open_dataflow_graph_from_invocation_set( + flatmap(g.invocations, [&](DynamicNodeInvocation const &i) { + return perform_copy_insertion_for_invocation(i, sources); + })); ASSERT(graph_is_fully_copy_inserted(result)); diff --git a/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc index 0f33dec041..8066926262 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/loss_insertion.cc @@ -25,12 +25,7 @@ LossInsertionResult perform_loss_insertion( /*tensor_guid=*/mk_dynamic_tensor_guid_for_loss(), /*parallel_tensor_shape=*/logit_value.parallel_tensor_shape, /*shard_coord=*/logit_value.shard_coord, - /*mapping=*/ - transform(loss_mapping, - [](MappedOperatorTaskGroup const &node_mapping) { - return get_tensor_bindings_for_slot_name( - node_mapping, TensorSlotName::INPUT); - }), + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/mk_dynamic_tensor_role_loss(), }; @@ -38,12 +33,7 @@ LossInsertionResult perform_loss_insertion( /*tensor_guid=*/logit_value.tensor_guid, /*parallel_tensor_shape=*/logit_value.parallel_tensor_shape, /*shard_coord=*/logit_value.shard_coord, - /*mapping=*/ - transform(loss_mapping, - [](MappedOperatorTaskGroup const &node_mapping) { - return get_tensor_bindings_for_slot_name( - node_mapping, TensorSlotName::LOGIT); - }), + /*mapping=*/std::nullopt, /*accessor=*/std::nullopt, /*role=*/mk_dynamic_tensor_role_bwd(), }; diff --git a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc index 999bee3d92..7bef26f58a 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,7 +1,10 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "utils/bidict/algorithms/filter_keys.h" +#include "utils/containers/get_only.h" #include "utils/containers/map_values2.h" +#include "utils/containers/require_same.h" #include "utils/containers/transform.h" #include "utils/optional.h" @@ -82,8 +85,33 @@ static DynamicNodeInvocation shard_invocation_for_binding( }; } +static std::unordered_set + perform_shard_expansion_for_copy(DynamicNodeInvocation const &i) { + auto const &[input_slot, input] = get_only(i.inputs); + auto const &[output_slot, output] = get_only(i.outputs); + bidict input_mapping = + assert_unwrap(input.mapping); + bidict output_mapping = + assert_unwrap(output.mapping); + require_same(input_mapping.left_values(), output_mapping.left_values()); + + return transform( + input_mapping.left_values(), [&](ParallelTensorSpaceCoordinate const &p) { + return shard_invocation_for_binding(i, + input_mapping.at_l(p), + OperatorAtomicTaskShardBinding{{ + {input_slot.slot_name, p}, + {output_slot.slot_name, p}, + }}); + }); +} + std::unordered_set perform_shard_expansion_for_invocation(DynamicNodeInvocation const &i) { + if (i.node_attrs.op_attrs.has_value() && + i.node_attrs.op_attrs.value().is_copy()) { + return perform_shard_expansion_for_copy(i); + } MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); From 878b8224f4455b15a80e8a87dcb048b7036873e0 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 17:01:22 -0800 Subject: [PATCH 10/20] Sketch interface for issuing operations. --- .../src/realm-execution/pcg_instance.cc | 89 +++++++++++++------ 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index d1b544cf3e..18d5908f56 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -159,6 +159,57 @@ PCGInstance create_pcg_instance( /*logit_grad_tensor=*/logit_grad_tensor}; } +static Realm::Event spawn_dynamic_node_invocation( + RealmContext &ctx, + DynamicNodeInvocation const &invocation, + std::vector const &input_dependencies, + std::vector const &output_dependencies, + TensorInstanceBacking const &tensor_instance_backing, + PerDeviceOpStateBacking const &device_state_backing, + OptimizerAttrs const &optimizer_attrs, + ProfilingSettings const &profiling_settings, + DistributedDeviceHandle const &device_handle, + FFIterationConfig iteration_config) { + Realm::Event dependencies = Realm::Event::merge_events( + Realm::Event::merge_events(input_dependencies), + Realm::Event::merge_events(output_dependencies)); + + TensorInstanceBacking tensor_backing = + subset_tensor_instance_backing_for_invocation(tensor_instance_backing, + invocation); + + auto spawn_task = [&]() { + Realm::Processor target_proc = ctx.map_device_coord_to_processor( + assert_unwrap(invocation.node_attrs.device_coord)); + return spawn_op_task(ctx, + target_proc, + invocation, + tensor_backing, + try_at(device_state_backing.backing, invocation), + profiling_settings, + device_handle.at(target_proc), + iteration_config, + optimizer_attrs, + dependencies); + }; + + auto issue_copy = [&]() { return Realm::Event::NO_EVENT; }; + + TrainingOperationAttrs op_attrs = + assert_unwrap(invocation.node_attrs.op_attrs); + return op_attrs.visit(overload{ + [&](PCGOperatorAttrs const &pcg_op_attrs) { + return pcg_op_attrs.visit(overload{ + [&](InputAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](WeightAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](auto const &) { return spawn_task(); }, + }); + }, + [&](LossAttrs const &) { return spawn_task(); }, + [&](CopyAttrs const &) { return issue_copy(); }, + }); +} + static std::unordered_map execute_distributed_dynamic_node_invocation_set( RealmContext &ctx, @@ -174,14 +225,6 @@ static std::unordered_map DependencySet dependency_set{ctx.get_outstanding_events()}; return unordered_map_from_pairs( transform(invocations, [&](DynamicNodeInvocation const &invocation) { - TrainingOperationAttrs op_attrs = - assert_unwrap(invocation.node_attrs.op_attrs); - if (op_attrs.is_pcg_op() && (op_attrs.require_pcg_op().is_input() || - op_attrs.require_pcg_op().is_weight())) { - return std::pair{invocation.node_attrs.layer_guid, - Realm::Event::NO_EVENT}; - } - std::vector input_dependencies = transform(vector_of(values(invocation.inputs)), [&](DynamicValueAttrs const &value) { @@ -192,27 +235,19 @@ static std::unordered_map [&](DynamicValueAttrs const &value) { return dependency_set.get_dependency_for_writer(value); }); - Realm::Event dependencies = Realm::Event::merge_events( - Realm::Event::merge_events(input_dependencies), - Realm::Event::merge_events(output_dependencies)); - Realm::Processor target_proc = ctx.map_device_coord_to_processor( - assert_unwrap(invocation.node_attrs.device_coord)); - - TensorInstanceBacking tensor_backing = - subset_tensor_instance_backing_for_invocation( - tensor_instance_backing, invocation); Realm::Event result = - spawn_op_task(ctx, - target_proc, - invocation, - tensor_backing, - try_at(device_state_backing.backing, invocation), - profiling_settings, - device_handle.at(target_proc), - iteration_config, - optimizer_attrs, - dependencies); + spawn_dynamic_node_invocation(ctx, + invocation, + input_dependencies, + output_dependencies, + tensor_instance_backing, + device_state_backing, + optimizer_attrs, + profiling_settings, + device_handle, + iteration_config); + for (DynamicValueAttrs const &value : values(invocation.inputs)) { dependency_set.add_reader(value, result); } From b4fcf43099dc9f5d0cfbd1a7aa4c4969a910b01a Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 26 Feb 2026 17:13:44 -0800 Subject: [PATCH 11/20] Sketch interface for copies. --- .../include/realm-execution/realm_context.h | 9 +++++++++ .../src/realm-execution/pcg_instance.cc | 14 +++++++++++++- .../src/realm-execution/realm_context.cc | 8 ++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/lib/realm-execution/include/realm-execution/realm_context.h b/lib/realm-execution/include/realm-execution/realm_context.h index e1180147fd..a147da7df6 100644 --- a/lib/realm-execution/include/realm-execution/realm_context.h +++ b/lib/realm-execution/include/realm-execution/realm_context.h @@ -4,6 +4,7 @@ #include "kernels/allocation.h" #include "kernels/device_handle_t.dtg.h" #include "kernels/managed_per_device_ff_handle.h" +#include "op-attrs/parallel_tensor_shape.dtg.h" #include "op-attrs/tensor_shape.dtg.h" #include "pcg/device_id_t.dtg.h" #include "pcg/machine_space_coordinate.dtg.h" @@ -62,6 +63,14 @@ struct RealmContext { int priority = 0); ///\} + /** \name Data movement */ + ///\{ + Realm::Event issue_copy(ParallelTensorShape const &src_shape, + Realm::RegionInstance src_inst, + ParallelTensorShape const &dst_shape, + Realm::RegionInstance dst_inst); + ///\} + /** \name Instance management */ ///\{ std::pair diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index 18d5908f56..09bcdf23f2 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -19,6 +19,7 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "task-spec/dynamic_graph/training_operation_attrs.dtg.h" #include "task-spec/dynamic_graph/update_insertion.h" +#include "utils/containers/get_only.h" #include "utils/containers/map_values.h" #include "utils/containers/transform.h" #include "utils/containers/try_at.h" @@ -193,7 +194,18 @@ static Realm::Event spawn_dynamic_node_invocation( dependencies); }; - auto issue_copy = [&]() { return Realm::Event::NO_EVENT; }; + auto issue_copy = [&]() { + DynamicValueAttrs const &input = get_only(invocation.inputs).second; + DynamicValueAttrs const &output = get_only(invocation.outputs).second; + Realm::RegionInstance src_inst = + tensor_instance_backing.backing.at(input).first; + Realm::RegionInstance dst_inst = + tensor_instance_backing.backing.at(output).first; + return ctx.issue_copy(assert_unwrap(input.parallel_tensor_shape), + src_inst, + assert_unwrap(output.parallel_tensor_shape), + dst_inst); + }; TrainingOperationAttrs op_attrs = assert_unwrap(invocation.node_attrs.op_attrs); diff --git a/lib/realm-execution/src/realm-execution/realm_context.cc b/lib/realm-execution/src/realm-execution/realm_context.cc index 96beb63953..23715b147f 100644 --- a/lib/realm-execution/src/realm-execution/realm_context.cc +++ b/lib/realm-execution/src/realm-execution/realm_context.cc @@ -10,6 +10,7 @@ #include "realm-execution/tasks/task_id_t.h" #include "utils/containers/contains_key.h" #include "utils/containers/transform.h" +#include "utils/exception.h" #include "utils/nonnegative_int/nonnegative_int.h" #include "utils/one_to_many/one_to_many.h" #include "utils/positive_int/positive_int.h" @@ -137,6 +138,13 @@ Realm::Event RealmContext::collective_spawn_task(Realm::Processor target_proc, return result; } +Realm::Event RealmContext::issue_copy(ParallelTensorShape const &src_shape, + Realm::RegionInstance src_inst, + ParallelTensorShape const &dst_shape, + Realm::RegionInstance dst_inst) { + NOT_IMPLEMENTED(); +} + template static Realm::Rect rect_from_dims(TensorDims const &dims) { std::vector values{dims.ff_ordered.begin(), dims.ff_ordered.end()}; From 9d06077e53480e15d6920cbaa6c3dbfcf1d0eb66 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Fri, 27 Feb 2026 10:12:10 -0800 Subject: [PATCH 12/20] Implement copies. --- .../include/realm-execution/realm_context.h | 5 +- .../src/realm-execution/pcg_instance.cc | 12 +-- .../src/realm-execution/realm_context.cc | 82 +++++++++++++++++-- 3 files changed, 86 insertions(+), 13 deletions(-) diff --git a/lib/realm-execution/include/realm-execution/realm_context.h b/lib/realm-execution/include/realm-execution/realm_context.h index a147da7df6..ab89e916c0 100644 --- a/lib/realm-execution/include/realm-execution/realm_context.h +++ b/lib/realm-execution/include/realm-execution/realm_context.h @@ -68,7 +68,10 @@ struct RealmContext { Realm::Event issue_copy(ParallelTensorShape const &src_shape, Realm::RegionInstance src_inst, ParallelTensorShape const &dst_shape, - Realm::RegionInstance dst_inst); + Realm::RegionInstance dst_inst, + Realm::ProfilingRequestSet const &requests, + Realm::Event wait_on = Realm::Event::NO_EVENT, + int priority = 0); ///\} /** \name Instance management */ diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index 09bcdf23f2..e4485b8546 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -171,7 +171,7 @@ static Realm::Event spawn_dynamic_node_invocation( ProfilingSettings const &profiling_settings, DistributedDeviceHandle const &device_handle, FFIterationConfig iteration_config) { - Realm::Event dependencies = Realm::Event::merge_events( + Realm::Event precondition = Realm::Event::merge_events( Realm::Event::merge_events(input_dependencies), Realm::Event::merge_events(output_dependencies)); @@ -191,7 +191,7 @@ static Realm::Event spawn_dynamic_node_invocation( device_handle.at(target_proc), iteration_config, optimizer_attrs, - dependencies); + precondition); }; auto issue_copy = [&]() { @@ -204,7 +204,9 @@ static Realm::Event spawn_dynamic_node_invocation( return ctx.issue_copy(assert_unwrap(input.parallel_tensor_shape), src_inst, assert_unwrap(output.parallel_tensor_shape), - dst_inst); + dst_inst, + Realm::ProfilingRequestSet{}, + precondition); }; TrainingOperationAttrs op_attrs = @@ -212,8 +214,8 @@ static Realm::Event spawn_dynamic_node_invocation( return op_attrs.visit(overload{ [&](PCGOperatorAttrs const &pcg_op_attrs) { return pcg_op_attrs.visit(overload{ - [&](InputAttrs const &) { return Realm::Event::NO_EVENT; }, - [&](WeightAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](InputAttrs const &) { return precondition; }, + [&](WeightAttrs const &) { return precondition; }, [&](auto const &) { return spawn_task(); }, }); }, diff --git a/lib/realm-execution/src/realm-execution/realm_context.cc b/lib/realm-execution/src/realm-execution/realm_context.cc index 23715b147f..5e3e342b0e 100644 --- a/lib/realm-execution/src/realm-execution/realm_context.cc +++ b/lib/realm-execution/src/realm-execution/realm_context.cc @@ -2,6 +2,7 @@ #include "kernels/device_handle_t.dtg.h" #include "kernels/device_handle_t.h" #include "op-attrs/datatype.h" +#include "op-attrs/parallel_tensor_shape.h" #include "op-attrs/tensor_dims.dtg.h" #include "pcg/device_id_t.h" #include "pcg/device_type.dtg.h" @@ -138,13 +139,6 @@ Realm::Event RealmContext::collective_spawn_task(Realm::Processor target_proc, return result; } -Realm::Event RealmContext::issue_copy(ParallelTensorShape const &src_shape, - Realm::RegionInstance src_inst, - ParallelTensorShape const &dst_shape, - Realm::RegionInstance dst_inst) { - NOT_IMPLEMENTED(); -} - template static Realm::Rect rect_from_dims(TensorDims const &dims) { std::vector values{dims.ff_ordered.begin(), dims.ff_ordered.end()}; @@ -154,6 +148,80 @@ static Realm::Rect rect_from_dims(TensorDims const &dims) { Realm::Point::ONES()}; } +template +static Realm::IndexSpace ispace_from_dims(TensorDims const &dims) { + Realm::Rect rect = rect_from_dims(dims); + return Realm::IndexSpace{rect}; +} + +Realm::Event + RealmContext::issue_copy(ParallelTensorShape const &src_shape, + Realm::RegionInstance src_inst, + ParallelTensorShape const &dst_shape, + Realm::RegionInstance dst_inst, + Realm::ProfilingRequestSet const &requests, + Realm::Event wait_on, + int priority) { + TensorShape src_piece_shape = get_piece_shape(src_shape); + TensorShape dst_piece_shape = get_piece_shape(dst_shape); + ASSERT(src_piece_shape == dst_piece_shape); // For now, assume they match + + Realm::CopySrcDstField src_field; + src_field.set_field( + /*inst=*/src_inst, + /*field_id=*/0, + /*size=*/ + static_cast(int{size_of_datatype(src_piece_shape.data_type)}), + /*subfield_offset=*/0); + Realm::CopySrcDstField dst_field; + dst_field.set_field( + /*inst=*/dst_inst, + /*field_id=*/0, + /*size=*/ + static_cast(int{size_of_datatype(src_piece_shape.data_type)}), + /*subfield_offset=*/0); + + Realm::Event result; + switch (src_piece_shape.dims.ff_ordered.num_dims()) { +#if REALM_MAX_DIM >= 1 + case 1: + result = ispace_from_dims<1>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 2 + case 2: + result = ispace_from_dims<2>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 3 + case 3: + result = ispace_from_dims<3>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 4 + case 4: + result = ispace_from_dims<4>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif +#if REALM_MAX_DIM >= 5 + case 5: + result = ispace_from_dims<5>(src_piece_shape.dims) + .copy({src_field}, {dst_field}, requests, wait_on, priority); + break; +#endif + default: + PANIC("TensorShape dims greater than REALM_MAX_DIM", + fmt::to_string(src_piece_shape.dims.ff_ordered.num_dims())); + break; + } + this->outstanding_events.push_back(result); + return result; +} + std::pair RealmContext::create_instance(Realm::Memory memory, TensorShape const &shape, From 4560087ac8ce556ef27e0fa7d54073d8e17c5a3a Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Fri, 27 Feb 2026 11:02:24 -0800 Subject: [PATCH 13/20] Assign copies to a phase based on tensor roles. --- .../dynamic_graph/dynamic_task_type.h | 13 ++++++++++ .../task-spec/dynamic_graph/copy_insertion.cc | 4 +++- .../dynamic_graph/dynamic_task_type.cc | 24 +++++++++++++++++++ .../task-spec/dynamic_graph/copy_insertion.cc | 10 ++++---- 4 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h create mode 100644 lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h new file mode 100644 index 0000000000..2d33120f80 --- /dev/null +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h @@ -0,0 +1,13 @@ +#ifndef _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_DYNAMIC_TASK_TYPE_H +#define _FLEXFLOW_LIB_TASK_SPEC_INCLUDE_TASK_SPEC_DYNAMIC_GRAPH_DYNAMIC_TASK_TYPE_H + +#include "task-spec/dynamic_graph/dynamic_task_type.dtg.h" +#include "task-spec/dynamic_graph/dynamic_tensor_role.dtg.h" + +namespace FlexFlow { + +DynamicTaskType decide_copy_task_type(DynamicTensorRole); + +} // namespace FlexFlow + +#endif diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index 076024cba5..974181d52c 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -6,6 +6,7 @@ #include "task-spec/dynamic_graph/dynamic_node_attrs.dtg.h" #include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" +#include "task-spec/dynamic_graph/dynamic_task_type.h" #include "task-spec/dynamic_graph/dynamic_tensor_slot.dtg.h" #include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "utils/bidict/algorithms/bidict_from_pairs.h" @@ -121,7 +122,8 @@ std::unordered_set perform_copy_insertion_for_invocation( }, /*node_attrs=*/ DynamicNodeAttrs{ - /*task_type=*/std::nullopt, + /*task_type=*/transform(slot.slot_tensor_role, + decide_copy_task_type), /*device_coord=*/std::nullopt, /*mapping=*/std::nullopt, /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc b/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc new file mode 100644 index 0000000000..16fa21de22 --- /dev/null +++ b/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc @@ -0,0 +1,24 @@ +#include "task-spec/dynamic_graph/dynamic_task_type.h" +#include "utils/overload.h" + +namespace FlexFlow { + +DynamicTaskType decide_copy_task_type(DynamicTensorRole role) { + return role.visit(overload{ + [](FwbTensorType const &fwb_tensor) { + switch (fwb_tensor) { + case FwbTensorType::FORWARD: + return DynamicTaskType::FWD; + case FwbTensorType::GRADIENT: + return DynamicTaskType::BWD; + default: + PANIC("Unexpected FwbTensorType", fwb_tensor); + break; + } + }, + [](DynamicOptimizerTensorRole const &) { return DynamicTaskType::UPD; }, + [](DynamicLossTensorRole const &) { return DynamicTaskType::LOSS; }, + }); +} + +} // namespace FlexFlow diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc index 49a23c1f7f..1968d572df 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc @@ -1,6 +1,8 @@ #include "task-spec/dynamic_graph/copy_insertion.h" #include "op-attrs/tensor_slot_name.dtg.h" #include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/dynamic_task_type.dtg.h" +#include "task-spec/dynamic_graph/dynamic_tensor_role.h" #include "task-spec/dynamic_graph/dynamic_value_attrs.dtg.h" #include "test/utils/doctest/fmt/unordered_set.h" #include @@ -202,7 +204,7 @@ TEST_SUITE(FF_TEST_SUITE) { auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { return DynamicTensorSlot{ /*slot_name=*/slot_name, - /*slot_tensor_role=*/std::nullopt, + /*slot_tensor_role=*/mk_dynamic_tensor_role_fwd(), }; }; @@ -295,7 +297,7 @@ TEST_SUITE(FF_TEST_SUITE) { }, /*node_attrs=*/ DynamicNodeAttrs{ - /*task_type=*/std::nullopt, + /*task_type=*/DynamicTaskType::FWD, /*device_coord=*/std::nullopt, /*mapping=*/invocation_mapping, /*op_attrs=*/std::nullopt, @@ -329,7 +331,7 @@ TEST_SUITE(FF_TEST_SUITE) { }, /*node_attrs=*/ DynamicNodeAttrs{ - /*task_type=*/std::nullopt, + /*task_type=*/DynamicTaskType::FWD, /*device_coord=*/std::nullopt, /*mapping=*/invocation_mapping, /*op_attrs=*/std::nullopt, @@ -356,7 +358,7 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{{mk_slot(TensorSlotName::INPUT), src}}, /*node_attrs=*/ DynamicNodeAttrs{ - /*task_type=*/std::nullopt, + /*task_type=*/DynamicTaskType::FWD, /*device_coord=*/std::nullopt, /*mapping=*/std::nullopt, /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, From fe37fc9508842074a124d6863f4938c0721045df Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Fri, 27 Feb 2026 11:37:05 -0800 Subject: [PATCH 14/20] Update shard expansion test to include copy case. --- .../dynamic_graph/shard_expansion.cc | 271 ++++++++++++------ 1 file changed, 186 insertions(+), 85 deletions(-) diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc index 36a73cf1a2..f9a5da9341 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc @@ -1,39 +1,75 @@ #include "task-spec/dynamic_graph/shard_expansion.h" #include "pcg/mapped_parallel_computation_graph/mapped_operator_task_group.h" +#include "task-spec/dynamic_graph/copy_attrs.dtg.h" +#include "task-spec/dynamic_graph/dynamic_copy_layer_guid_t.dtg.h" +#include "task-spec/dynamic_graph/training_operation_attrs.dtg.h" #include "test/utils/doctest/fmt/unordered_set.h" #include "utils/bidict/algorithms/filter_keys.h" #include using namespace ::FlexFlow; -TEST_SUITE(FF_TEST_SUITE) { - TEST_CASE("perform_shard_expansion_for_invocation") { - auto mk_machine_coord = - [](nonnegative_int node_idx, - nonnegative_int device_idx) -> MachineSpaceCoordinate { - return MachineSpaceCoordinate{ - /*node_idx=*/node_idx, - /*device_idx=*/device_idx, - /*device_type=*/DeviceType::GPU, - }; - }; +static MachineSpaceCoordinate mk_machine_coord(nonnegative_int node_idx, + nonnegative_int device_idx) { + return MachineSpaceCoordinate{ + /*node_idx=*/node_idx, + /*device_idx=*/device_idx, + /*device_type=*/DeviceType::GPU, + }; +}; + +static ParallelTensorSpaceCoordinate mk_pt_coord(nonnegative_int idx1, + nonnegative_int idx2, + nonnegative_int idx3, + nonnegative_int idx4) { + return ParallelTensorSpaceCoordinate{ + /*sum_component=*/idx1, + /*discard_copy_component=*/idx2, + /*shard_components=*/ + FFOrdered{ + idx3, + idx4, + }, + }; +}; + +DynamicTensorSlot mk_slot(TensorSlotName const &slot_name) { + return DynamicTensorSlot{ + /*slot_name=*/slot_name, + /*slot_tensor_role=*/std::nullopt, + }; +}; - auto mk_pt_coord = - [](nonnegative_int idx1, - nonnegative_int idx2, - nonnegative_int idx3, - nonnegative_int idx4) -> ParallelTensorSpaceCoordinate { - return ParallelTensorSpaceCoordinate{ - /*sum_component=*/idx1, - /*discard_copy_component=*/idx2, - /*shard_components=*/ - FFOrdered{ - idx3, - idx4, +DynamicValueAttrs + mk_value(size_t src_node_id, + TensorSlotName src_slot_name, + bidict + tensor_binding, + std::optional const &shard_coord) { + if (shard_coord.has_value()) { + tensor_binding = filter_keys(tensor_binding, + [&](ParallelTensorSpaceCoordinate const &p) { + return p == shard_coord.value(); + }); + } + return DynamicValueAttrs{ + /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ + KwargDataflowOutput{ + Node{src_node_id}, + src_slot_name, }, - }; - }; + }}, + /*parallel_tensor_shape=*/std::nullopt, + /*shard_coord=*/shard_coord, + /*mapping=*/ + tensor_binding, + /*accessor=*/std::nullopt, + /*role=*/std::nullopt, + }; +}; +TEST_SUITE(FF_TEST_SUITE) { + TEST_CASE("perform_shard_expansion_for_invocation") { auto mk_shard_binding = [&](ParallelTensorSpaceCoordinate const &c1, ParallelTensorSpaceCoordinate const &c2, ParallelTensorSpaceCoordinate const &c3, @@ -101,14 +137,7 @@ TEST_SUITE(FF_TEST_SUITE) { }, }; - auto mk_slot = [](TensorSlotName const &slot_name) -> DynamicTensorSlot { - return DynamicTensorSlot{ - /*slot_name=*/slot_name, - /*slot_tensor_role=*/std::nullopt, - }; - }; - - auto mk_value = + auto mk_op_value = [&](size_t src_node_id, TensorSlotName src_slot_name, TensorSlotName use_slot_name, @@ -117,43 +146,24 @@ TEST_SUITE(FF_TEST_SUITE) { bidict tensor_binding = get_tensor_bindings_for_slot_name(mapped_task_group, use_slot_name); - if (shard_coord.has_value()) { - tensor_binding = filter_keys( - tensor_binding, [&](ParallelTensorSpaceCoordinate const &p) { - return p == shard_coord.value(); - }); - } - return DynamicValueAttrs{ - /*tensor_guid=*/dynamic_tensor_guid_t{parallel_tensor_guid_t{ - KwargDataflowOutput{ - Node{src_node_id}, - src_slot_name, - }, - }}, - /*parallel_tensor_shape=*/std::nullopt, - /*shard_coord=*/shard_coord, - /*mapping=*/ - tensor_binding, - /*accessor=*/std::nullopt, - /*role=*/std::nullopt, - }; + return mk_value(src_node_id, src_slot_name, tensor_binding, shard_coord); }; DynamicNodeInvocation input = DynamicNodeInvocation{ /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, - TensorSlotName::OUTPUT, - TensorSlotName::INPUT, - std::nullopt), + mk_op_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + std::nullopt), }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, - TensorSlotName::OUTPUT, - TensorSlotName::WEIGHT, - std::nullopt), + mk_op_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + std::nullopt), }, }, /*node_attrs=*/ @@ -170,17 +180,17 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(20, - TensorSlotName::OUTPUT_1, - TensorSlotName::OUTPUT_1, - std::nullopt), + mk_op_value(20, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + std::nullopt), }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(20, - TensorSlotName::OUTPUT_2, - TensorSlotName::OUTPUT_2, - std::nullopt), + mk_op_value(20, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + std::nullopt), }, }, }; @@ -199,17 +209,17 @@ TEST_SUITE(FF_TEST_SUITE) { /*inputs=*/{ { mk_slot(TensorSlotName::INPUT), - mk_value(0, - TensorSlotName::OUTPUT, - TensorSlotName::INPUT, - input_shard_coord), + mk_op_value(0, + TensorSlotName::OUTPUT, + TensorSlotName::INPUT, + input_shard_coord), }, { mk_slot(TensorSlotName::WEIGHT), - mk_value(1, - TensorSlotName::OUTPUT, - TensorSlotName::WEIGHT, - weight_shard_coord), + mk_op_value(1, + TensorSlotName::OUTPUT, + TensorSlotName::WEIGHT, + weight_shard_coord), }, }, /*node_attrs=*/ @@ -226,17 +236,17 @@ TEST_SUITE(FF_TEST_SUITE) { { { mk_slot(TensorSlotName::OUTPUT_1), - mk_value(20, - TensorSlotName::OUTPUT_1, - TensorSlotName::OUTPUT_1, - output_1_shard_coord), + mk_op_value(20, + TensorSlotName::OUTPUT_1, + TensorSlotName::OUTPUT_1, + output_1_shard_coord), }, { mk_slot(TensorSlotName::OUTPUT_2), - mk_value(20, - TensorSlotName::OUTPUT_2, - TensorSlotName::OUTPUT_2, - output_2_shard_coord), + mk_op_value(20, + TensorSlotName::OUTPUT_2, + TensorSlotName::OUTPUT_2, + output_2_shard_coord), }, }, }; @@ -258,4 +268,95 @@ TEST_SUITE(FF_TEST_SUITE) { CHECK(result.size() == correct.size()); CHECK(result == correct); } + + TEST_CASE("perform_shard_expansion_for_invocation (copy)") { + MachineSpaceCoordinate mc1 = mk_machine_coord(0_n, 0_n); + MachineSpaceCoordinate mc2 = mk_machine_coord(1_n, 0_n); + MachineSpaceCoordinate mc3 = mk_machine_coord(2_n, 0_n); + MachineSpaceCoordinate mc4 = mk_machine_coord(3_n, 0_n); + + ParallelTensorSpaceCoordinate pt1 = mk_pt_coord(0_n, 0_n, 0_n, 0_n); + ParallelTensorSpaceCoordinate pt2 = mk_pt_coord(0_n, 1_n, 0_n, 0_n); + + bidict src_binding{ + {pt1, mc1}, + {pt2, mc2}, + }; + bidict dst_binding{ + {pt1, mc3}, + {pt2, mc4}, + }; + + DynamicNodeInvocation input = DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + mk_value(0, TensorSlotName::OUTPUT, src_binding, std::nullopt), + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs=*/TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT), + mk_value(20, TensorSlotName::OUTPUT, dst_binding, std::nullopt), + }, + }, + }; + + std::unordered_set result = + perform_shard_expansion_for_invocation(input); + + auto mk_invocation_shard = + [&](MachineSpaceCoordinate const &device_coord, + ParallelTensorSpaceCoordinate const &tensor_shard_coord) + -> DynamicNodeInvocation { + return DynamicNodeInvocation{ + /*inputs=*/{ + { + mk_slot(TensorSlotName::INPUT), + mk_value(0, + TensorSlotName::OUTPUT, + src_binding, + tensor_shard_coord), + }, + }, + /*node_attrs=*/ + DynamicNodeAttrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/device_coord, + /*mapping=*/std::nullopt, + /*op_attrs=*/TrainingOperationAttrs{CopyAttrs{}}, + /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, + /*per_device_op_state=*/std::nullopt, + }, + /*outputs=*/ + { + { + mk_slot(TensorSlotName::OUTPUT), + mk_value(20, + TensorSlotName::OUTPUT, + dst_binding, + tensor_shard_coord), + }, + }, + }; + }; + + std::unordered_set correct = { + mk_invocation_shard(mc1, pt1), + mk_invocation_shard(mc2, pt2), + }; + + CHECK(result.size() == correct.size()); + CHECK(result == correct); + } } From fdb4ebe39d4cb55f4fef39be8d09dc34a1e879ed Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Fri, 27 Feb 2026 11:41:34 -0800 Subject: [PATCH 15/20] It is safe to return NO_EVENT for nop tasks even in presence of dependencies. --- lib/realm-execution/src/realm-execution/pcg_instance.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index e4485b8546..e5038329c5 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -214,8 +214,8 @@ static Realm::Event spawn_dynamic_node_invocation( return op_attrs.visit(overload{ [&](PCGOperatorAttrs const &pcg_op_attrs) { return pcg_op_attrs.visit(overload{ - [&](InputAttrs const &) { return precondition; }, - [&](WeightAttrs const &) { return precondition; }, + [&](InputAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](WeightAttrs const &) { return Realm::Event::NO_EVENT; }, [&](auto const &) { return spawn_task(); }, }); }, From fbb9eba3fa194de4a4bae2c40b6d116c6cf224b0 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Tue, 17 Mar 2026 21:10:35 -0700 Subject: [PATCH 16/20] Update to match Realm PR changes. --- .../realm-execution/per_device_op_state_backing.dtg.toml | 2 +- lib/realm-execution/src/realm-execution/pcg_instance.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml b/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml index 89feb11905..92e9de8145 100644 --- a/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml +++ b/lib/realm-execution/include/realm-execution/per_device_op_state_backing.dtg.toml @@ -5,7 +5,7 @@ features = [] docstring = ''' \brief Maps each shard-expanded DynamicNodeInvocation to its corresponding PerDeviceOpState. -PerDeviceOpStateBacking is to PerDeviceOpState as DistributedDeviceHandle is to \ref device_handle_t (i.e., FFHandle). +\ref PerDeviceOpStateBacking is to \ref PerDeviceOpState as \ref DistributedFfHandle is to \ref device_handle_t (i.e., FFHandle). ''' diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index e5038329c5..5c1dcd28c0 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -169,7 +169,7 @@ static Realm::Event spawn_dynamic_node_invocation( PerDeviceOpStateBacking const &device_state_backing, OptimizerAttrs const &optimizer_attrs, ProfilingSettings const &profiling_settings, - DistributedDeviceHandle const &device_handle, + DistributedFfHandle const &device_handle, FFIterationConfig iteration_config) { Realm::Event precondition = Realm::Event::merge_events( Realm::Event::merge_events(input_dependencies), From 5304a6b4ff6ee917981486009d6ee38402894dc2 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 19 Mar 2026 15:56:05 -0700 Subject: [PATCH 17/20] Updates in response to feedback. --- .../src/realm-execution/pcg_instance.cc | 6 + .../src/realm-execution/realm_context.cc | 14 +- .../dynamic_graph/dynamic_task_type.h | 2 +- .../include/task-spec/dynamic_graph/index.dox | 1 + .../task-spec/dynamic_graph/copy_insertion.cc | 20 +-- .../dynamic_graph/dynamic_task_type.cc | 3 +- .../dynamic_graph/shard_expansion.cc | 4 +- .../task-spec/dynamic_graph/copy_insertion.cc | 127 ++++++++++-------- 8 files changed, 101 insertions(+), 76 deletions(-) diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index 5c1dcd28c0..0ecd02143e 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -160,6 +160,12 @@ PCGInstance create_pcg_instance( /*logit_grad_tensor=*/logit_grad_tensor}; } +/** + * \brief Spawn the Realm operations (tasks, copies, etc.) for a given \ref + * DynamicNodeInvocation, given the specified dependencies, instances, etc. Note + * that one \ref DynamicNodeInvocation may become multiple Realm operations + * (e.g., a parallel operator may turn into multiple copies). + */ static Realm::Event spawn_dynamic_node_invocation( RealmContext &ctx, DynamicNodeInvocation const &invocation, diff --git a/lib/realm-execution/src/realm-execution/realm_context.cc b/lib/realm-execution/src/realm-execution/realm_context.cc index 5e3e342b0e..790c1bd613 100644 --- a/lib/realm-execution/src/realm-execution/realm_context.cc +++ b/lib/realm-execution/src/realm-execution/realm_context.cc @@ -171,14 +171,16 @@ Realm::Event /*inst=*/src_inst, /*field_id=*/0, /*size=*/ - static_cast(int{size_of_datatype(src_piece_shape.data_type)}), + static_cast( + size_of_datatype(src_piece_shape.data_type).int_from_positive_int()), /*subfield_offset=*/0); Realm::CopySrcDstField dst_field; dst_field.set_field( /*inst=*/dst_inst, /*field_id=*/0, /*size=*/ - static_cast(int{size_of_datatype(src_piece_shape.data_type)}), + static_cast( + size_of_datatype(src_piece_shape.data_type).int_from_positive_int()), /*subfield_offset=*/0); Realm::Event result; @@ -214,8 +216,8 @@ Realm::Event break; #endif default: - PANIC("TensorShape dims greater than REALM_MAX_DIM", - fmt::to_string(src_piece_shape.dims.ff_ordered.num_dims())); + PANIC("TensorShape dims greater than REALM_MAX_DIM: {}", + src_piece_shape.dims.ff_ordered.num_dims()); break; } this->outstanding_events.push_back(result); @@ -227,8 +229,8 @@ std::pair TensorShape const &shape, Realm::ProfilingRequestSet const &prs, Realm::Event wait_on) { - std::vector field_sizes{ - static_cast(int{size_of_datatype(shape.data_type)})}; + std::vector field_sizes{static_cast( + size_of_datatype(shape.data_type).int_from_positive_int())}; Realm::RegionInstance inst; Realm::Event ready; switch (shape.dims.ff_ordered.num_dims()) { diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h index 2d33120f80..7b13b480c8 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_task_type.h @@ -6,7 +6,7 @@ namespace FlexFlow { -DynamicTaskType decide_copy_task_type(DynamicTensorRole); +DynamicTaskType dynamic_task_type_from_tensor_role_for_copy(DynamicTensorRole); } // namespace FlexFlow diff --git a/lib/task-spec/include/task-spec/dynamic_graph/index.dox b/lib/task-spec/include/task-spec/dynamic_graph/index.dox index 04ceaf4935..1c012fb330 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/index.dox +++ b/lib/task-spec/include/task-spec/dynamic_graph/index.dox @@ -11,6 +11,7 @@ namespace FlexFlow { - \ref shard_expansion.h - \ref update_insertion.h - \ref loss_insertion.h +- \ref copy_insertion.h - \ref machine_slicing.h */ diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index 974181d52c..b5c789d2b4 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -82,7 +82,8 @@ static std::pair std::unordered_set perform_copy_insertion_for_invocation( DynamicNodeInvocation const &i, - std::unordered_map const &sources) { + std::unordered_map const + &unmapped_value_to_source) { MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); @@ -103,11 +104,11 @@ std::unordered_set perform_copy_insertion_for_invocation( }}; for (auto const &[slot, input] : i.inputs) { - if (!contains_key(sources, input)) { + if (!contains_key(unmapped_value_to_source, input)) { continue; } - DynamicValueAttrs source_value = sources.at(input); + DynamicValueAttrs source_value = unmapped_value_to_source.at(input); DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { auto const &[filtered_source, filtered_use] = @@ -122,8 +123,9 @@ std::unordered_set perform_copy_insertion_for_invocation( }, /*node_attrs=*/ DynamicNodeAttrs{ - /*task_type=*/transform(slot.slot_tensor_role, - decide_copy_task_type), + /*task_type=*/transform( + slot.slot_tensor_role, + dynamic_task_type_from_tensor_role_for_copy), /*device_coord=*/std::nullopt, /*mapping=*/std::nullopt, /*op_attrs*/ TrainingOperationAttrs{CopyAttrs{}}, @@ -151,10 +153,11 @@ DynamicOpenDataflowGraph ASSERT(no_part_of_graph_is_copy_inserted(g)); - std::unordered_map sources; + std::unordered_map + unmapped_value_to_source; for (DynamicNodeInvocation const &i : g.invocations) { for (auto const &[slot, value] : i.outputs) { - sources.insert( + unmapped_value_to_source.insert( std::pair{value, map_dynamic_value_attrs_for_task_group( slot, value, assert_unwrap(i.node_attrs.mapping))}); @@ -166,7 +169,8 @@ DynamicOpenDataflowGraph DynamicOpenDataflowGraph result = dynamic_open_dataflow_graph_from_invocation_set( flatmap(g.invocations, [&](DynamicNodeInvocation const &i) { - return perform_copy_insertion_for_invocation(i, sources); + return perform_copy_insertion_for_invocation( + i, unmapped_value_to_source); })); ASSERT(graph_is_fully_copy_inserted(result)); diff --git a/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc b/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc index 16fa21de22..216fa1b0ff 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc @@ -3,7 +3,8 @@ namespace FlexFlow { -DynamicTaskType decide_copy_task_type(DynamicTensorRole role) { +DynamicTaskType + dynamic_task_type_from_tensor_role_for_copy(DynamicTensorRole role) { return role.visit(overload{ [](FwbTensorType const &fwb_tensor) { switch (fwb_tensor) { diff --git a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc index 7bef26f58a..ceea90ead7 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc @@ -87,8 +87,8 @@ static DynamicNodeInvocation shard_invocation_for_binding( static std::unordered_set perform_shard_expansion_for_copy(DynamicNodeInvocation const &i) { - auto const &[input_slot, input] = get_only(i.inputs); - auto const &[output_slot, output] = get_only(i.outputs); + auto [input_slot, input] = get_only(i.inputs); + auto [output_slot, output] = get_only(i.outputs); bidict input_mapping = assert_unwrap(input.mapping); bidict output_mapping = diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc index 1968d572df..2160f6bf82 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc @@ -111,39 +111,6 @@ TEST_SUITE(FF_TEST_SUITE) { }, }, }; - MappedOperatorTaskGroup input_mapping_copy1 = MappedOperatorTaskGroup{ - bidict{ - { - mc1, - mk_input_shard_binding(mc1_input_coord), - }, - { - mc3, - mk_input_shard_binding(mc2_input_coord), - }, - }, - }; - MappedOperatorTaskGroup input_mapping_copy1_diff_vs_use = - MappedOperatorTaskGroup{ - bidict{ - { - mc3, - mk_input_shard_binding(mc2_input_coord), - }, - }, - }; - MappedOperatorTaskGroup input_mapping_copy2 = MappedOperatorTaskGroup{ - bidict{ - { - mc3, - mk_input_shard_binding(mc1_input_coord), - }, - { - mc4, - mk_input_shard_binding(mc2_input_coord), - }, - }, - }; MappedOperatorTaskGroup weight_mapping_same = MappedOperatorTaskGroup{ bidict{ @@ -157,18 +124,6 @@ TEST_SUITE(FF_TEST_SUITE) { }, }, }; - MappedOperatorTaskGroup weight_mapping_copy2 = MappedOperatorTaskGroup{ - bidict{ - { - mc4, - mk_input_shard_binding(mc1_weight_coord), - }, - { - mc3, - mk_input_shard_binding(mc2_weight_coord), - }, - }, - }; MappedOperatorTaskGroup invocation_mapping = MappedOperatorTaskGroup{ bidict{ @@ -268,21 +223,8 @@ TEST_SUITE(FF_TEST_SUITE) { DynamicValueAttrs graph_input1_src_same = mk_value( 0, TensorSlotName::OUTPUT, input_mapping_same, TensorSlotName::OUTPUT); - DynamicValueAttrs graph_input1_src_copy1 = mk_value( - 0, TensorSlotName::OUTPUT, input_mapping_copy1, TensorSlotName::OUTPUT); - DynamicValueAttrs graph_input1_src_copy1_diff_vs_use = - mk_value(0, - TensorSlotName::OUTPUT, - input_mapping_copy1_diff_vs_use, - TensorSlotName::OUTPUT); - DynamicValueAttrs graph_input1_src_copy2 = mk_value( - 0, TensorSlotName::OUTPUT, input_mapping_copy2, TensorSlotName::OUTPUT); DynamicValueAttrs graph_input2_src_same = mk_value( 1, TensorSlotName::OUTPUT, weight_mapping_same, TensorSlotName::OUTPUT); - DynamicValueAttrs graph_input2_src_copy2 = mk_value(1, - TensorSlotName::OUTPUT, - weight_mapping_copy2, - TensorSlotName::OUTPUT); DynamicNodeInvocation input = DynamicNodeInvocation{ /*inputs=*/{ @@ -384,6 +326,39 @@ TEST_SUITE(FF_TEST_SUITE) { } SUBCASE("copy one tensor, one point") { + MappedOperatorTaskGroup input_mapping_copy1 = MappedOperatorTaskGroup{ + bidict{ + { + mc1, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc3, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + MappedOperatorTaskGroup input_mapping_copy1_diff_vs_use = + MappedOperatorTaskGroup{ + bidict{ + { + mc3, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + + DynamicValueAttrs graph_input1_src_copy1 = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy1, + TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input1_src_copy1_diff_vs_use = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy1_diff_vs_use, + TensorSlotName::OUTPUT); + std::unordered_map sources_copy1{ {graph_input1, graph_input1_src_copy1}, {graph_input2, graph_input2_src_same}}; @@ -402,6 +377,42 @@ TEST_SUITE(FF_TEST_SUITE) { } SUBCASE("copy two tensors, two points") { + MappedOperatorTaskGroup input_mapping_copy2 = MappedOperatorTaskGroup{ + bidict{ + { + mc3, + mk_input_shard_binding(mc1_input_coord), + }, + { + mc4, + mk_input_shard_binding(mc2_input_coord), + }, + }, + }; + MappedOperatorTaskGroup weight_mapping_copy2 = MappedOperatorTaskGroup{ + bidict{ + { + mc4, + mk_input_shard_binding(mc1_weight_coord), + }, + { + mc3, + mk_input_shard_binding(mc2_weight_coord), + }, + }, + }; + + DynamicValueAttrs graph_input1_src_copy2 = + mk_value(0, + TensorSlotName::OUTPUT, + input_mapping_copy2, + TensorSlotName::OUTPUT); + DynamicValueAttrs graph_input2_src_copy2 = + mk_value(1, + TensorSlotName::OUTPUT, + weight_mapping_copy2, + TensorSlotName::OUTPUT); + std::unordered_map sources_copy2{ {graph_input1, graph_input1_src_copy2}, {graph_input2, graph_input2_src_copy2}}; From ca4de4d7d5af90fcdffe0940ad58cf146a74fbd0 Mon Sep 17 00:00:00 2001 From: Elliott Slaughter Date: Thu, 19 Mar 2026 21:18:02 -0700 Subject: [PATCH 18/20] Respond to PR feedback. --- .../task-spec/dynamic_graph/copy_insertion.h | 3 +- .../include/task-spec/dynamic_graph/index.dox | 2 +- .../task-spec/dynamic_graph/copy_insertion.cc | 15 ++++--- .../dynamic_graph/shard_expansion.cc | 22 ++++++---- .../dynamic_graph/shard_expansion.cc | 40 +++++++------------ 5 files changed, 39 insertions(+), 43 deletions(-) diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h index 8578ee49ac..5e5d52bdac 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h @@ -15,7 +15,8 @@ bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &); std::unordered_set perform_copy_insertion_for_invocation( DynamicNodeInvocation const &i, - std::unordered_map const &sources); + std::unordered_map const + &mapped_source_value); DynamicOpenDataflowGraph perform_copy_insertion(DynamicOpenDataflowGraph const &); diff --git a/lib/task-spec/include/task-spec/dynamic_graph/index.dox b/lib/task-spec/include/task-spec/dynamic_graph/index.dox index 1c012fb330..c48e67f4b3 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/index.dox +++ b/lib/task-spec/include/task-spec/dynamic_graph/index.dox @@ -11,7 +11,7 @@ namespace FlexFlow { - \ref shard_expansion.h - \ref update_insertion.h - \ref loss_insertion.h -- \ref copy_insertion.h +- \ref copy_insertion.h Inserts copies into the dynamic graph by inspecting the mappings of operators and inferring where tensors are used on devices other than where they were created. - \ref machine_slicing.h */ diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index b5c789d2b4..abafad0116 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -83,7 +83,7 @@ static std::pair std::unordered_set perform_copy_insertion_for_invocation( DynamicNodeInvocation const &i, std::unordered_map const - &unmapped_value_to_source) { + &mapped_source_value) { MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); @@ -104,11 +104,11 @@ std::unordered_set perform_copy_insertion_for_invocation( }}; for (auto const &[slot, input] : i.inputs) { - if (!contains_key(unmapped_value_to_source, input)) { + if (!contains_key(mapped_source_value, input)) { continue; } - DynamicValueAttrs source_value = unmapped_value_to_source.at(input); + DynamicValueAttrs source_value = mapped_source_value.at(input); DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { auto const &[filtered_source, filtered_use] = @@ -153,11 +153,10 @@ DynamicOpenDataflowGraph ASSERT(no_part_of_graph_is_copy_inserted(g)); - std::unordered_map - unmapped_value_to_source; + std::unordered_map mapped_source_value; for (DynamicNodeInvocation const &i : g.invocations) { for (auto const &[slot, value] : i.outputs) { - unmapped_value_to_source.insert( + mapped_source_value.insert( std::pair{value, map_dynamic_value_attrs_for_task_group( slot, value, assert_unwrap(i.node_attrs.mapping))}); @@ -169,8 +168,8 @@ DynamicOpenDataflowGraph DynamicOpenDataflowGraph result = dynamic_open_dataflow_graph_from_invocation_set( flatmap(g.invocations, [&](DynamicNodeInvocation const &i) { - return perform_copy_insertion_for_invocation( - i, unmapped_value_to_source); + return perform_copy_insertion_for_invocation(i, + mapped_source_value); })); ASSERT(graph_is_fully_copy_inserted(result)); diff --git a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc index ceea90ead7..fb6efb96d0 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc @@ -41,7 +41,7 @@ bool graph_is_fully_shard_expanded(DynamicOpenDataflowGraph const &g) { } static bidict - subset_tensor_mapping_for_coord( + restrict_tensor_mapping_keys_to_coord( bidict const &mapping, ParallelTensorSpaceCoordinate const ¶llel_tensor_coord) { @@ -66,8 +66,8 @@ static DynamicNodeInvocation shard_invocation_for_binding( v.mapping, [&](bidict const &mapping) { - return subset_tensor_mapping_for_coord(mapping, - parallel_tensor_coord); + return restrict_tensor_mapping_keys_to_coord(mapping, + parallel_tensor_coord); }); return result; }; @@ -91,14 +91,22 @@ static std::unordered_set auto [output_slot, output] = get_only(i.outputs); bidict input_mapping = assert_unwrap(input.mapping); - bidict output_mapping = - assert_unwrap(output.mapping); - require_same(input_mapping.left_values(), output_mapping.left_values()); + require_same(input_mapping.left_values(), + assert_unwrap(output.mapping).left_values()); return transform( input_mapping.left_values(), [&](ParallelTensorSpaceCoordinate const &p) { + // The machine coord for a copy is inherently nebulous because it + // doesn't strictly run in any single location. Further, Realm has the + // flexibility to issue a copy operation from anywhere in the machine, + // including remotely. Here we choose machine_coord based on the input + // because we expect this to align with the most efficient way to issue + // copies in Realm, although the current Realm backend uses a + // centralized controller and thus issues copies all from a single node. + MachineSpaceCoordinate machine_coord = input_mapping.at_l(p); + return shard_invocation_for_binding(i, - input_mapping.at_l(p), + machine_coord, OperatorAtomicTaskShardBinding{{ {input_slot.slot_name, p}, {output_slot.slot_name, p}, diff --git a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc index f9a5da9341..efe21146db 100644 --- a/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc @@ -319,36 +319,24 @@ TEST_SUITE(FF_TEST_SUITE) { [&](MachineSpaceCoordinate const &device_coord, ParallelTensorSpaceCoordinate const &tensor_shard_coord) -> DynamicNodeInvocation { - return DynamicNodeInvocation{ - /*inputs=*/{ - { - mk_slot(TensorSlotName::INPUT), - mk_value(0, - TensorSlotName::OUTPUT, - src_binding, - tensor_shard_coord), - }, - }, - /*node_attrs=*/ - DynamicNodeAttrs{ - /*task_type=*/std::nullopt, - /*device_coord=*/device_coord, - /*mapping=*/std::nullopt, - /*op_attrs=*/TrainingOperationAttrs{CopyAttrs{}}, - /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}}, - /*per_device_op_state=*/std::nullopt, + DynamicNodeInvocation result = input; + result.inputs = { + { + mk_slot(TensorSlotName::INPUT), + mk_value( + 0, TensorSlotName::OUTPUT, src_binding, tensor_shard_coord), }, - /*outputs=*/ + }; + // See perform_shard_expansion_for_copy in shard_expansion.cc for explanation of the choice of device placement. + result.node_attrs.device_coord = device_coord; + result.outputs = { { - { - mk_slot(TensorSlotName::OUTPUT), - mk_value(20, - TensorSlotName::OUTPUT, - dst_binding, - tensor_shard_coord), - }, + mk_slot(TensorSlotName::OUTPUT), + mk_value( + 20, TensorSlotName::OUTPUT, dst_binding, tensor_shard_coord), }, }; + return result; }; std::unordered_set correct = { From d47668af8bfe41ba59f21c7d21a4db3a1f62c776 Mon Sep 17 00:00:00 2001 From: Colin Unger Date: Fri, 20 Mar 2026 15:12:38 -0700 Subject: [PATCH 19/20] Fixes from PR review --- .../include/task-spec/dynamic_graph/copy_insertion.h | 2 +- .../dynamic_graph/dynamic_node_attrs.dtg.toml | 10 ++++++++++ .../src/task-spec/dynamic_graph/copy_insertion.cc | 12 ++++++------ 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h index 5e5d52bdac..a1726c2ae1 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h +++ b/lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h @@ -16,7 +16,7 @@ bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &); std::unordered_set perform_copy_insertion_for_invocation( DynamicNodeInvocation const &i, std::unordered_map const - &mapped_source_value); + &unmapped_value_to_mapped_source_value); DynamicOpenDataflowGraph perform_copy_insertion(DynamicOpenDataflowGraph const &); diff --git a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml index 128e305dc6..73c023fd40 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/dynamic_node_attrs.dtg.toml @@ -28,6 +28,16 @@ type = "std::optional<::FlexFlow::DynamicTaskType>" [[fields]] name = "device_coord" type = "std::optional<::FlexFlow::MachineSpaceCoordinate>" +docstring = ''' +\note Right now the \c device_coord for a copy node is sort of meaningless +because we have one controller issuing all copies for the entire graph, no +matter where they are. However the intention is this to be the "owner" or +"issuer" of the copy, which matters a lot more down the road once we write the +control replicated version of the Realm backend. At that point, you will have +to pick one specific node to issue the copy, and the program should be correct +no matter what you pick, but there may be performance implications to those +choices. +''' [[fields]] name = "mapping" diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index abafad0116..4a88d11882 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -83,7 +83,7 @@ static std::pair std::unordered_set perform_copy_insertion_for_invocation( DynamicNodeInvocation const &i, std::unordered_map const - &mapped_source_value) { + &unmapped_value_to_mapped_source_value) { MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); @@ -104,11 +104,11 @@ std::unordered_set perform_copy_insertion_for_invocation( }}; for (auto const &[slot, input] : i.inputs) { - if (!contains_key(mapped_source_value, input)) { + if (!contains_key(unmapped_value_to_mapped_source_value, input)) { continue; } - DynamicValueAttrs source_value = mapped_source_value.at(input); + DynamicValueAttrs source_value = unmapped_value_to_mapped_source_value.at(input); DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { auto const &[filtered_source, filtered_use] = @@ -153,10 +153,10 @@ DynamicOpenDataflowGraph ASSERT(no_part_of_graph_is_copy_inserted(g)); - std::unordered_map mapped_source_value; + std::unordered_map unmapped_value_to_mapped_source_value; for (DynamicNodeInvocation const &i : g.invocations) { for (auto const &[slot, value] : i.outputs) { - mapped_source_value.insert( + unmapped_value_to_mapped_source_value.insert( std::pair{value, map_dynamic_value_attrs_for_task_group( slot, value, assert_unwrap(i.node_attrs.mapping))}); @@ -169,7 +169,7 @@ DynamicOpenDataflowGraph dynamic_open_dataflow_graph_from_invocation_set( flatmap(g.invocations, [&](DynamicNodeInvocation const &i) { return perform_copy_insertion_for_invocation(i, - mapped_source_value); + unmapped_value_to_mapped_source_value); })); ASSERT(graph_is_fully_copy_inserted(result)); From 679b25fe76023ab1a08dc9e3e4b479f14f42c615 Mon Sep 17 00:00:00 2001 From: Colin Unger Date: Fri, 20 Mar 2026 15:13:43 -0700 Subject: [PATCH 20/20] Format --- .../src/task-spec/dynamic_graph/copy_insertion.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index 4a88d11882..4c1b9d4609 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -108,7 +108,8 @@ std::unordered_set perform_copy_insertion_for_invocation( continue; } - DynamicValueAttrs source_value = unmapped_value_to_mapped_source_value.at(input); + DynamicValueAttrs source_value = + unmapped_value_to_mapped_source_value.at(input); DynamicValueAttrs use_value = mapped_inputs.at(slot); if (source_value != use_value) { auto const &[filtered_source, filtered_use] = @@ -153,7 +154,8 @@ DynamicOpenDataflowGraph ASSERT(no_part_of_graph_is_copy_inserted(g)); - std::unordered_map unmapped_value_to_mapped_source_value; + std::unordered_map + unmapped_value_to_mapped_source_value; for (DynamicNodeInvocation const &i : g.invocations) { for (auto const &[slot, value] : i.outputs) { unmapped_value_to_mapped_source_value.insert( @@ -168,8 +170,8 @@ DynamicOpenDataflowGraph DynamicOpenDataflowGraph result = dynamic_open_dataflow_graph_from_invocation_set( flatmap(g.invocations, [&](DynamicNodeInvocation const &i) { - return perform_copy_insertion_for_invocation(i, - unmapped_value_to_mapped_source_value); + return perform_copy_insertion_for_invocation( + i, unmapped_value_to_mapped_source_value); })); ASSERT(graph_is_fully_copy_inserted(result));