From b25659a602e0cc21e7e2993bde1fb80cf75f830f Mon Sep 17 00:00:00 2001 From: ver217 Date: Tue, 12 Sep 2023 13:51:50 +0800 Subject: [PATCH] [legacy] remove outdated codes of pipeline --- colossalai/legacy/pipeline/__init__.py | 4 + .../{ => legacy}/pipeline/layer_spec.py | 6 +- .../legacy/pipeline/middleware/__init__.py | 3 + .../pipeline/middleware/adaptor/__init__.py | 2 +- .../pipeline/middleware/adaptor/fx.py | 34 +++++--- .../{ => legacy}/pipeline/middleware/topo.py | 86 ++++++++++--------- .../{ => legacy}/pipeline/pipelinable.py | 0 .../pipeline/pipeline_process_group.py | 4 +- colossalai/legacy/pipeline/rpc/__init__.py | 4 + .../pipeline/rpc/_pipeline_base.py | 6 +- .../pipeline/rpc/_pipeline_schedule.py | 8 +- colossalai/{ => legacy}/pipeline/rpc/utils.py | 2 +- colossalai/{ => legacy}/pipeline/utils.py | 0 colossalai/pipeline/__init__.py | 13 ++- colossalai/pipeline/middleware/__init__.py | 3 - colossalai/pipeline/rpc/__init__.py | 4 - colossalai/pipeline/schedule/__init__.py | 2 + docs/source/en/features/pipeline_parallel.md | 2 +- .../zh-Hans/features/pipeline_parallel.md | 2 +- .../pipeline_parallel/train_gpt_pp.py | 8 +- .../gpt/titans/model/pipeline_gpt1d.py | 2 +- examples/tutorial/hybrid_parallel/train.py | 2 +- .../tutorial/sequence_parallel/model/bert.py | 2 +- .../test_pipeline/test_topo/topo_utils.py | 33 ++++--- .../test_pipeline/rpc_test_utils.py | 2 +- .../test_pipeline/test_cuda_rpc_chimera.py | 6 +- .../test_pipeline/test_cuda_rpc_optimizer.py | 9 +- .../test_pipeline/test_cuda_rpc_pipeline.py | 4 +- .../test_cuda_rpc_value_correctness.py | 7 +- .../test_pipeline/test_middleware_1f1b.py | 6 +- .../test_pipeline/test_pipelinable.py | 2 +- .../test_pipeline_process_group.py | 2 +- .../test_checkpoint/test_checkpoint_1d.py | 2 +- .../test_checkpoint/test_checkpoint_2d.py | 2 +- .../test_checkpoint/test_checkpoint_2p5d.py | 2 +- .../test_checkpoint/test_checkpoint_3d.py | 2 +- 36 files changed, 157 insertions(+), 121 deletions(-) create mode 100644 colossalai/legacy/pipeline/__init__.py rename colossalai/{ => legacy}/pipeline/layer_spec.py (97%) create mode 100644 colossalai/legacy/pipeline/middleware/__init__.py rename colossalai/{ => legacy}/pipeline/middleware/adaptor/__init__.py (62%) rename colossalai/{ => legacy}/pipeline/middleware/adaptor/fx.py (92%) rename colossalai/{ => legacy}/pipeline/middleware/topo.py (95%) rename colossalai/{ => legacy}/pipeline/pipelinable.py (100%) rename colossalai/{ => legacy}/pipeline/pipeline_process_group.py (99%) create mode 100644 colossalai/legacy/pipeline/rpc/__init__.py rename colossalai/{ => legacy}/pipeline/rpc/_pipeline_base.py (99%) rename colossalai/{ => legacy}/pipeline/rpc/_pipeline_schedule.py (97%) rename colossalai/{ => legacy}/pipeline/rpc/utils.py (98%) rename colossalai/{ => legacy}/pipeline/utils.py (100%) delete mode 100644 colossalai/pipeline/middleware/__init__.py delete mode 100644 colossalai/pipeline/rpc/__init__.py rename tests/{ => test_legacy}/test_pipeline/rpc_test_utils.py (98%) rename tests/{ => test_legacy}/test_pipeline/test_cuda_rpc_chimera.py (94%) rename tests/{ => test_legacy}/test_pipeline/test_cuda_rpc_optimizer.py (89%) rename tests/{ => test_legacy}/test_pipeline/test_cuda_rpc_pipeline.py (87%) rename tests/{ => test_legacy}/test_pipeline/test_cuda_rpc_value_correctness.py (91%) rename tests/{ => test_legacy}/test_pipeline/test_middleware_1f1b.py (95%) rename tests/{ => test_legacy}/test_pipeline/test_pipelinable.py (96%) rename tests/{ => test_legacy}/test_pipeline/test_pipeline_process_group.py (94%) diff --git a/colossalai/legacy/pipeline/__init__.py b/colossalai/legacy/pipeline/__init__.py new file mode 100644 index 000000000000..f36f54ac9307 --- /dev/null +++ b/colossalai/legacy/pipeline/__init__.py @@ -0,0 +1,4 @@ +from .layer_spec import LayerSpec +from .pipelinable import PipelinableContext, PipelinableModel + +__all__ = ['PipelinableModel', 'PipelinableContext', 'LayerSpec'] diff --git a/colossalai/pipeline/layer_spec.py b/colossalai/legacy/pipeline/layer_spec.py similarity index 97% rename from colossalai/pipeline/layer_spec.py rename to colossalai/legacy/pipeline/layer_spec.py index 7e9169efff78..3960debd7f72 100644 --- a/colossalai/pipeline/layer_spec.py +++ b/colossalai/legacy/pipeline/layer_spec.py @@ -1,9 +1,11 @@ import torch + from colossalai.utils.model.utils import call_to_str + class LayerSpec: """ - + """ def __init__(self, typename, *module_args, **module_kwargs): @@ -52,4 +54,4 @@ def count_params(self): return self._param_count def reset_param_count(self): - self._param_count = 0 \ No newline at end of file + self._param_count = 0 diff --git a/colossalai/legacy/pipeline/middleware/__init__.py b/colossalai/legacy/pipeline/middleware/__init__.py new file mode 100644 index 000000000000..481741bfee31 --- /dev/null +++ b/colossalai/legacy/pipeline/middleware/__init__.py @@ -0,0 +1,3 @@ +from .topo import Partition, PartitionInputVal, PartitionOutputVal, Topo + +__all__ = ['Topo', 'Partition', 'PartitionOutputVal', 'PartitionInputVal'] diff --git a/colossalai/pipeline/middleware/adaptor/__init__.py b/colossalai/legacy/pipeline/middleware/adaptor/__init__.py similarity index 62% rename from colossalai/pipeline/middleware/adaptor/__init__.py rename to colossalai/legacy/pipeline/middleware/adaptor/__init__.py index 949700a2c49d..0b0d36d2ffe5 100644 --- a/colossalai/pipeline/middleware/adaptor/__init__.py +++ b/colossalai/legacy/pipeline/middleware/adaptor/__init__.py @@ -1,3 +1,3 @@ from .fx import get_topology as get_fx_topology -__all__ = ['get_fx_topology'] \ No newline at end of file +__all__ = ['get_fx_topology'] diff --git a/colossalai/pipeline/middleware/adaptor/fx.py b/colossalai/legacy/pipeline/middleware/adaptor/fx.py similarity index 92% rename from colossalai/pipeline/middleware/adaptor/fx.py rename to colossalai/legacy/pipeline/middleware/adaptor/fx.py index 8437c5194762..8cc40f120f15 100644 --- a/colossalai/pipeline/middleware/adaptor/fx.py +++ b/colossalai/legacy/pipeline/middleware/adaptor/fx.py @@ -1,6 +1,8 @@ -from torch.fx.graph_module import GraphModule -from colossalai.pipeline.middleware.topo import Partition, PartitionInputVal, PartitionOutputVal, Topo import torch +from torch.fx.graph_module import GraphModule + +from colossalai.legacy.pipeline.middleware.topo import Partition, PartitionInputVal, PartitionOutputVal, Topo + def partition_name_to_id(partition_name, is_input=False, is_output=False): if is_input: @@ -12,6 +14,7 @@ def partition_name_to_id(partition_name, is_input=False, is_output=False): partition_id = int(partition_name.split(prefix)[-1]) + 2 return partition_id + # There are two kinds of def in fx.graph # 1. non direct_use & non direct_def, which means the output is used by next partition with a temporary mid value. # e.g. submod1 = call_module(...) @@ -20,6 +23,8 @@ def partition_name_to_id(partition_name, is_input=False, is_output=False): # 2. direct_use & direct_def, which means the output is used by next partition directly. # e.g. submod1 = call_module(...) # submod2 = call_module(submod1, ...) + + def find_input_in_partition(node, partitions, input_partitions=None): p_input_val = None direct_def = not node.name.startswith('getitem') @@ -45,9 +50,10 @@ def find_input_in_partition(node, partitions, input_partitions=None): partition_id = partition_name_to_id(partition.name) p_input_val = PartitionInputVal(partition_id=partition_id, offset=offset) return p_input_val - + return p_input_val - + + def find_output_in_partition(node, partitions, output_partitions=None): p_output_val = PartitionOutputVal() for user in node.users: @@ -70,7 +76,7 @@ def find_output_in_partition(node, partitions, output_partitions=None): if arg == user: p_output_val.add(partition_id=partition_id, offset=i) break - + # user is output if output_partitions is not None: output_node = output_partitions[0] @@ -84,10 +90,11 @@ def find_output_in_partition(node, partitions, output_partitions=None): break return p_output_val + def get_topology(gm: GraphModule): topo = Topo() topo_output_partition = Partition() - + input_partitions = [] partitions = [] output_partitions = [] @@ -109,7 +116,7 @@ def get_topology(gm: GraphModule): topo_input_partition.add_output_val(p_output_val) topo.set_partitions(partition_id=0, partition=topo_input_partition) topo.set_input_partition_id(partition_id=0) - + for i, partition in enumerate(partitions): topo_mid_partition = Partition() # set input for submodule @@ -131,15 +138,16 @@ def get_topology(gm: GraphModule): for user in partition.users: cur_node = user p_output_val = find_output_in_partition(cur_node, partitions, output_partitions) - topo_mid_partition.add_output_val(p_output_val) - topo.set_partitions(partition_id=i+2, partition=topo_mid_partition) - + topo_mid_partition.add_output_val(p_output_val) + topo.set_partitions(partition_id=i + 2, partition=topo_mid_partition) + # set input for output_partition for partition in output_partitions: topo_output_partition = Partition() - torch.fx.graph.map_arg(partition.args[0], lambda n: topo_output_partition.add_input_val( - find_input_in_partition(n, partitions, input_partitions))) + torch.fx.graph.map_arg( + partition.args[0], + lambda n: topo_output_partition.add_input_val(find_input_in_partition(n, partitions, input_partitions))) topo.set_partitions(partition_id=1, partition=topo_output_partition) topo.set_output_partition_id(partition_id=1) - return topo \ No newline at end of file + return topo diff --git a/colossalai/pipeline/middleware/topo.py b/colossalai/legacy/pipeline/middleware/topo.py similarity index 95% rename from colossalai/pipeline/middleware/topo.py rename to colossalai/legacy/pipeline/middleware/topo.py index e798e2ed9cab..3c21cce6dc0e 100644 --- a/colossalai/pipeline/middleware/topo.py +++ b/colossalai/legacy/pipeline/middleware/topo.py @@ -1,49 +1,54 @@ -from typing import Dict, List from dataclasses import dataclass +from typing import Dict, List # This file includes data structure used by Pipeline Middleware. + @dataclass class ValPosition: partition_id: int offset: int - + def __str__(self) -> str: res = f'[partition_id:{self.partition_id},offset:{self.offset}]' return res - + def __repr__(self) -> str: return self.__str__() + class PartitionInputVal(object): + def __init__(self, partition_id, offset) -> None: # every input from which partition_id and which offset val_pos = ValPosition(partition_id, offset) self._from_partition_and_offset: ValPosition = val_pos - + def get(self): return self._from_partition_and_offset - + def __str__(self) -> str: res = '' res += f'<-({self._from_partition_and_offset})' return res - + def __repr__(self) -> str: return self.__str__() - + + class PartitionOutputVal(object): + def __init__(self) -> None: # every output to which partition_id and which offset self._to_partition_and_offset: List[ValPosition] = [] - + def add(self, partition_id, offset): val_pos = ValPosition(partition_id, offset) self._to_partition_and_offset.append(val_pos) - + def get(self): return self._to_partition_and_offset - + def __str__(self) -> str: res = '' res += '->(' @@ -51,27 +56,29 @@ def __str__(self) -> str: res += f'{val_pos},' res += ')' return res - + def __repr__(self) -> str: return self.__str__() + class Partition(object): + def __init__(self) -> None: self._input_vals: List[PartitionInputVal] = [] self._output_vals: List[PartitionOutputVal] = [] - + def add_input_val(self, input_val: PartitionInputVal): self._input_vals.append(input_val) - + def add_output_val(self, output_val: PartitionOutputVal): self._output_vals.append(output_val) - + def get_input_vals(self): return self._input_vals - + def get_output_vals(self): return self._output_vals - + # get the output offsets sent to dst_partition_id def get_output_offsets(self, dst_partition_id): res = [] @@ -80,9 +87,9 @@ def get_output_offsets(self, dst_partition_id): for val_pos in outputs: if val_pos.partition_id == dst_partition_id: res.append(offset) - + return res - + # get all input dst partition_ids def get_input_partition_ids(self): res = [] @@ -91,7 +98,7 @@ def get_input_partition_ids(self): if val_pos.partition_id not in res: res.append(val_pos.partition_id) return res - + # get all output dst partition_ids def get_output_partition_ids(self): res = [] @@ -101,24 +108,25 @@ def get_output_partition_ids(self): if val_pos.partition_id not in res: res.append(val_pos.partition_id) return res - + def __str__(self) -> str: res = '' res += f' input:\n' res += f' length:{len(self._input_vals)}\n' for i, input_val in enumerate(self._input_vals): res += f' offset={i}:{input_val}\n' - + res += f' output:\n' res += f' length:{len(self._output_vals)}\n' for i, output_val in enumerate(self._output_vals): res += f' offset={i}:{output_val}\n' - + return res - + def __repr__(self) -> str: return self.__str__() + # This class is a middleware between partition splitter # and Pipeline Scheduler. It records the graph info about # partition input/output and provides it to scheduler. @@ -132,42 +140,43 @@ def __repr__(self) -> str: # _input_partition_id: the key represents input_partition # _output_partition_id: the key represents output_partition class Topo(object): + def __init__(self, input_partition_id=None, output_partition_id=None) -> None: self._partitions: Dict[int, Partition] = {} self._input_partition_id = input_partition_id self._output_partition_id = output_partition_id - + def set_input_partition_id(self, partition_id: int): self._input_partition_id = partition_id - + def set_output_partition_id(self, partition_id: int): self._output_partition_id = partition_id - + def get_input_partition_id(self): return self._input_partition_id - + def get_output_partition_id(self): return self._output_partition_id - + def set_partitions(self, partition_id: int, partition: Partition): self._partitions[partition_id] = partition - + def get_mid_partitions(self): - res = {} #{partition_id: Partition} + res = {} #{partition_id: Partition} for partition_id, partition in self._partitions.items(): if self._input_partition_id == partition_id or self._output_partition_id == partition_id: continue res[partition_id] = partition return res - + def get_mid_partition_ids(self): return list(self.get_mid_partitions().keys()) - + def get_input_partition(self): if self._input_partition_id is not None: return self._partitions[self._input_partition_id] return None - + def get_output_partition(self): if self._output_partition_id is not None: return self._partitions[self._output_partition_id] @@ -175,7 +184,7 @@ def get_output_partition(self): def get_partition_by_id(self, partition_id): return self._partitions[partition_id] - + def __str__(self) -> str: res = '' if len(self._partitions) == 0: @@ -186,21 +195,20 @@ def __str__(self) -> str: res += '{\n' res += f'InputPartition:\n partition_id={self._input_partition_id}\n{input_part}' res += '}\n' - + mid_parts = self.get_mid_partitions() for i, (partition_id, part) in enumerate(mid_parts.items()): res += '{\n' res += f'SubPartition_{i}:\n partition_id={partition_id}\n {part}' res += '}\n' - + output_part = self.get_output_partition() if output_part is not None: res += '{\n' res += f'OutputPartition:\n partition_id={self._output_partition_id}\n{output_part}' res += '}\n' - + return res - + def __repr__(self) -> str: return self.__str__() - \ No newline at end of file diff --git a/colossalai/pipeline/pipelinable.py b/colossalai/legacy/pipeline/pipelinable.py similarity index 100% rename from colossalai/pipeline/pipelinable.py rename to colossalai/legacy/pipeline/pipelinable.py diff --git a/colossalai/pipeline/pipeline_process_group.py b/colossalai/legacy/pipeline/pipeline_process_group.py similarity index 99% rename from colossalai/pipeline/pipeline_process_group.py rename to colossalai/legacy/pipeline/pipeline_process_group.py index c61d97ebabfa..c0ee0286787f 100644 --- a/colossalai/pipeline/pipeline_process_group.py +++ b/colossalai/legacy/pipeline/pipeline_process_group.py @@ -1,9 +1,9 @@ -from typing import List, Dict, Tuple import os import threading +from typing import Dict, List, Tuple -from torch.distributed import rpc import torch.distributed as dist +from torch.distributed import rpc from colossalai.tensor import ProcessGroup diff --git a/colossalai/legacy/pipeline/rpc/__init__.py b/colossalai/legacy/pipeline/rpc/__init__.py new file mode 100644 index 000000000000..15b65a4138a8 --- /dev/null +++ b/colossalai/legacy/pipeline/rpc/__init__.py @@ -0,0 +1,4 @@ +from ._pipeline_schedule import ChimeraPipelineEngine, FillDrainPipelineEngine, OneFOneBPipelineEngine +from .utils import pytree_map + +__all__ = ['FillDrainPipelineEngine', 'OneFOneBPipelineEngine', 'ChimeraPipelineEngine', 'pytree_map'] diff --git a/colossalai/pipeline/rpc/_pipeline_base.py b/colossalai/legacy/pipeline/rpc/_pipeline_base.py similarity index 99% rename from colossalai/pipeline/rpc/_pipeline_base.py rename to colossalai/legacy/pipeline/rpc/_pipeline_base.py index 9e549df58214..88ddb9e98eb2 100644 --- a/colossalai/pipeline/rpc/_pipeline_base.py +++ b/colossalai/legacy/pipeline/rpc/_pipeline_base.py @@ -12,9 +12,9 @@ from torch._C._distributed_rpc import PyRRef from torch.futures import Future -from colossalai.pipeline.middleware import Partition, PartitionInputVal, PartitionOutputVal, Topo -from colossalai.pipeline.pipeline_process_group import ppg -from colossalai.pipeline.rpc.utils import ( +from colossalai.legacy.pipeline.middleware import Partition, PartitionInputVal, PartitionOutputVal, Topo +from colossalai.legacy.pipeline.pipeline_process_group import ppg +from colossalai.legacy.pipeline.rpc.utils import ( get_batch_lengths, pyobj_map, pytree_filter, diff --git a/colossalai/pipeline/rpc/_pipeline_schedule.py b/colossalai/legacy/pipeline/rpc/_pipeline_schedule.py similarity index 97% rename from colossalai/pipeline/rpc/_pipeline_schedule.py rename to colossalai/legacy/pipeline/rpc/_pipeline_schedule.py index 6eda8f3b34b7..f53a4835edf2 100644 --- a/colossalai/pipeline/rpc/_pipeline_schedule.py +++ b/colossalai/legacy/pipeline/rpc/_pipeline_schedule.py @@ -6,8 +6,8 @@ from torch._C._distributed_rpc import PyRRef from torch.futures import Future -from colossalai.pipeline.pipeline_process_group import ppg -from colossalai.pipeline.rpc._pipeline_base import Phase, PipelineEngineBase, UniqueKey, WorkerBase, WorkItem +from colossalai.legacy.pipeline.pipeline_process_group import ppg +from colossalai.legacy.pipeline.rpc._pipeline_base import Phase, PipelineEngineBase, UniqueKey, WorkerBase, WorkItem # Implementation of different Pipeline schedule # Worker defines the worker for each stage @@ -78,7 +78,7 @@ def _get_work_item_key(self) -> UniqueKey: # 1. forward times reach actual_stage_num, this is the end of continuous forward # 2. forward times reach num_microbatches, this is the end of 1F1B mode if not is_last_stage and \ - target_key.phase == Phase.FORWARD: + target_key.phase == Phase.FORWARD: if target_key.microbatch_id == actual_stage_num - 1 and num_microbatches > 2: # Why need num_microbatches > 2 ? Because there is no steady stage when num_microbatches <= 2 outstanding_min = actual_stage_num - pp_rank - 1 @@ -144,7 +144,7 @@ def _get_work_item_key(self) -> UniqueKey: forward_block_num = self.forward_times // forward_block_size if self.forward_times >= real_microbatch_num or \ - ((pp_rank + 1) % stage_num == 0 and forward_block_num > self.backward_times): + ((pp_rank + 1) % stage_num == 0 and forward_block_num > self.backward_times): target_phase = Phase.BACKWARD target_microbatch_id = self.backward_times else: # others diff --git a/colossalai/pipeline/rpc/utils.py b/colossalai/legacy/pipeline/rpc/utils.py similarity index 98% rename from colossalai/pipeline/rpc/utils.py rename to colossalai/legacy/pipeline/rpc/utils.py index 06e6d976d771..d1033fbde920 100644 --- a/colossalai/pipeline/rpc/utils.py +++ b/colossalai/legacy/pipeline/rpc/utils.py @@ -10,7 +10,7 @@ from torch.futures import Future from colossalai.initialize import launch -from colossalai.pipeline.pipeline_process_group import ppg +from colossalai.legacy.pipeline.pipeline_process_group import ppg def pyobj_map(obj: Any, fn: Callable, process_types: Union[Type, Tuple[Type]] = ()) -> Any: diff --git a/colossalai/pipeline/utils.py b/colossalai/legacy/pipeline/utils.py similarity index 100% rename from colossalai/pipeline/utils.py rename to colossalai/legacy/pipeline/utils.py diff --git a/colossalai/pipeline/__init__.py b/colossalai/pipeline/__init__.py index 0fcde9707646..e88a1f00a1b7 100644 --- a/colossalai/pipeline/__init__.py +++ b/colossalai/pipeline/__init__.py @@ -1,4 +1,11 @@ -from .pipelinable import PipelinableContext, PipelinableModel -from .layer_spec import LayerSpec +from .p2p import PipelineP2PCommunication +from .schedule import InterleavedSchedule, OneForwardOneBackwardSchedule, PipelineSchedule +from .stage_manager import PipelineStageManager -__all__ = ['PipelinableModel', 'PipelinableContext', 'LayerSpec'] \ No newline at end of file +__all__ = [ + 'PipelineSchedule', + 'OneForwardOneBackwardSchedule', + 'InterleavedSchedule', + 'PipelineP2PCommunication', + 'PipelineStageManager', +] diff --git a/colossalai/pipeline/middleware/__init__.py b/colossalai/pipeline/middleware/__init__.py deleted file mode 100644 index 79e19f9eaf77..000000000000 --- a/colossalai/pipeline/middleware/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .topo import Topo, Partition, PartitionOutputVal, PartitionInputVal - -__all__ = ['Topo', 'Partition', 'PartitionOutputVal', 'PartitionInputVal'] \ No newline at end of file diff --git a/colossalai/pipeline/rpc/__init__.py b/colossalai/pipeline/rpc/__init__.py deleted file mode 100644 index 9d9e9d44f46c..000000000000 --- a/colossalai/pipeline/rpc/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from ._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine, ChimeraPipelineEngine -from .utils import pytree_map - -__all__ = ['FillDrainPipelineEngine', 'OneFOneBPipelineEngine', 'ChimeraPipelineEngine', 'pytree_map'] \ No newline at end of file diff --git a/colossalai/pipeline/schedule/__init__.py b/colossalai/pipeline/schedule/__init__.py index 8b13413b1a31..07c0f5927060 100644 --- a/colossalai/pipeline/schedule/__init__.py +++ b/colossalai/pipeline/schedule/__init__.py @@ -1,7 +1,9 @@ from .base import PipelineSchedule +from .interleaved_pp import InterleavedSchedule from .one_f_one_b import OneForwardOneBackwardSchedule __all__ = [ 'PipelineSchedule', 'OneForwardOneBackwardSchedule', + 'InterleavedSchedule', ] diff --git a/docs/source/en/features/pipeline_parallel.md b/docs/source/en/features/pipeline_parallel.md index 8b5f228a9e5e..665a1063f112 100644 --- a/docs/source/en/features/pipeline_parallel.md +++ b/docs/source/en/features/pipeline_parallel.md @@ -82,7 +82,7 @@ from colossalai.logging import disable_existing_loggers, get_dist_logger from colossalai.legacy.trainer import Trainer, hooks from colossalai.utils import MultiTimer, get_dataloader from colossalai.context import ParallelMode -from colossalai.pipeline.pipelinable import PipelinableContext +from colossalai.legacy.pipeline.pipelinable import PipelinableContext from titans.dataloader.cifar10 import build_cifar from torchvision.models import resnet50 diff --git a/docs/source/zh-Hans/features/pipeline_parallel.md b/docs/source/zh-Hans/features/pipeline_parallel.md index 1497dc399f6c..75c41fa7256b 100644 --- a/docs/source/zh-Hans/features/pipeline_parallel.md +++ b/docs/source/zh-Hans/features/pipeline_parallel.md @@ -81,7 +81,7 @@ from colossalai.logging import disable_existing_loggers, get_dist_logger from colossalai.legacy.trainer import Trainer, hooks from colossalai.utils import MultiTimer, get_dataloader from colossalai.context import ParallelMode -from colossalai.pipeline.pipelinable import PipelinableContext +from colossalai.legacy.pipeline.pipelinable import PipelinableContext from titans.dataloader.cifar10 import build_cifar from torchvision.models import resnet50 diff --git a/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py b/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py index ad69888b8cc8..30d6aab4f12f 100644 --- a/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py +++ b/examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py @@ -3,7 +3,6 @@ from functools import partial import torch -from model_zoo import model_builder from torch import nn from tqdm import tqdm @@ -14,11 +13,12 @@ split_with_split_nodes_pass, ) from colossalai.fx.passes.meta_info_prop import MetaInfoProp +from colossalai.legacy.pipeline.middleware.adaptor import get_fx_topology +from colossalai.legacy.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine +from colossalai.legacy.pipeline.rpc.utils import rpc_run from colossalai.logging import disable_existing_loggers, get_dist_logger from colossalai.nn.optimizer import HybridAdam -from colossalai.pipeline.middleware.adaptor import get_fx_topology -from colossalai.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine -from colossalai.pipeline.rpc.utils import rpc_run +from model_zoo import model_builder def parse_args(): diff --git a/examples/language/gpt/titans/model/pipeline_gpt1d.py b/examples/language/gpt/titans/model/pipeline_gpt1d.py index 9b22d156bbcd..31844a1a78d8 100644 --- a/examples/language/gpt/titans/model/pipeline_gpt1d.py +++ b/examples/language/gpt/titans/model/pipeline_gpt1d.py @@ -10,8 +10,8 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.legacy.nn.layer.wrapper import PipelineSharedModuleWrapper +from colossalai.legacy.pipeline.utils import partition_uniform from colossalai.logging import get_dist_logger -from colossalai.pipeline.utils import partition_uniform from .embed import HiddenParallelEmbedding, HiddenParallelGPTLMHead1D, VocabParallelEmbedding, VocabParallelGPTLMHead1D from .gpt1d import FusedGPTTransformerLayer1D, GPTTransformerLayer1D diff --git a/examples/tutorial/hybrid_parallel/train.py b/examples/tutorial/hybrid_parallel/train.py index 12cdec902400..266c82731441 100644 --- a/examples/tutorial/hybrid_parallel/train.py +++ b/examples/tutorial/hybrid_parallel/train.py @@ -8,9 +8,9 @@ from colossalai.context import ParallelMode from colossalai.core import global_context as gpc from colossalai.legacy.nn import CrossEntropyLoss +from colossalai.legacy.pipeline.pipelinable import PipelinableContext from colossalai.logging import get_dist_logger from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR -from colossalai.pipeline.pipelinable import PipelinableContext from colossalai.utils import is_using_pp diff --git a/examples/tutorial/sequence_parallel/model/bert.py b/examples/tutorial/sequence_parallel/model/bert.py index b8adb501f95e..c33e2f9fc6ae 100644 --- a/examples/tutorial/sequence_parallel/model/bert.py +++ b/examples/tutorial/sequence_parallel/model/bert.py @@ -8,8 +8,8 @@ from colossalai.core import global_context as gpc from colossalai.kernel import LayerNorm from colossalai.legacy.nn.layer.wrapper import PipelineSharedModuleWrapper +from colossalai.legacy.pipeline.utils import partition_uniform from colossalai.logging import get_dist_logger -from colossalai.pipeline.utils import partition_uniform from .layers import BertDualHead, BertLayer, Embedding, PreProcessor, VocabEmbedding from .layers.init_method import init_normal, output_init_normal diff --git a/tests/test_fx/test_pipeline/test_topo/topo_utils.py b/tests/test_fx/test_pipeline/test_topo/topo_utils.py index 55dd65201acd..db6cadfc544c 100644 --- a/tests/test_fx/test_pipeline/test_topo/topo_utils.py +++ b/tests/test_fx/test_pipeline/test_topo/topo_utils.py @@ -1,18 +1,22 @@ +import random + +import numpy as np import torch from torch.fx import GraphModule -from colossalai.fx.passes.adding_split_node_pass import split_with_split_nodes_pass, balanced_split_pass + from colossalai.fx import ColoTracer -from colossalai.pipeline.middleware import Partition, PartitionInputVal, PartitionOutputVal, Topo -from colossalai.pipeline.middleware.adaptor import get_fx_topology -import random -import numpy as np +from colossalai.fx.passes.adding_split_node_pass import balanced_split_pass, split_with_split_nodes_pass +from colossalai.legacy.pipeline.middleware import Partition, PartitionInputVal, PartitionOutputVal, Topo +from colossalai.legacy.pipeline.middleware.adaptor import get_fx_topology MANUAL_SEED = 0 random.seed(MANUAL_SEED) np.random.seed(MANUAL_SEED) torch.manual_seed(MANUAL_SEED) + class MLP(torch.nn.Module): + def __init__(self, config={}): super().__init__() dim = config['dim'] @@ -27,6 +31,7 @@ def forward(self, x): x = layer(x) return x + def split_model_and_get_DAG(model, data_gen): model.eval() @@ -46,7 +51,7 @@ def split_model_and_get_DAG(model, data_gen): # apply transform passes annotated_model = balanced_split_pass(gm, 2) top_module, split_submodules = split_with_split_nodes_pass(annotated_model) - + topo = get_fx_topology(top_module) for submodule in split_submodules: if isinstance(submodule, torch.fx.GraphModule): @@ -54,6 +59,7 @@ def split_model_and_get_DAG(model, data_gen): return top_module, split_submodules[0]._topo + def check_input(top_module, input_partition: Partition): partition_output = input_partition.get_output_vals() arg_pos = 0 @@ -63,13 +69,14 @@ def check_input(top_module, input_partition: Partition): to_partition_and_offset = cur_checkee.get() assert len(to_partition_and_offset) == len(node.users.keys()) arg_pos += 1 - + assert arg_pos == len(partition_output) - + + def check_submod(top_module, part_id, mid_partition: Partition): partition_input = mid_partition.get_input_vals() partition_output = mid_partition.get_output_vals() - + cnt = 1 cur_node = None for node in top_module.graph.nodes: @@ -78,15 +85,15 @@ def check_submod(top_module, part_id, mid_partition: Partition): if cnt == part_id: cur_node = node break - + assert len(partition_input) == len(cur_node.args) assert len(partition_output) == len(cur_node.users) -def check_topo(top_module, topo: Topo): + +def check_topo(top_module, topo: Topo): input_partition = topo.get_input_partition() mid_partitions = topo.get_mid_partitions() - + check_input(top_module, input_partition) for part_id, submod in mid_partitions.items(): check_submod(top_module, part_id, submod) - \ No newline at end of file diff --git a/tests/test_pipeline/rpc_test_utils.py b/tests/test_legacy/test_pipeline/rpc_test_utils.py similarity index 98% rename from tests/test_pipeline/rpc_test_utils.py rename to tests/test_legacy/test_pipeline/rpc_test_utils.py index dab474a4ee21..91733dd60ab1 100644 --- a/tests/test_pipeline/rpc_test_utils.py +++ b/tests/test_legacy/test_pipeline/rpc_test_utils.py @@ -11,8 +11,8 @@ from torch.optim import SGD, Adam, Optimizer, RMSprop from colossalai import launch +from colossalai.legacy.pipeline.pipeline_process_group import ppg from colossalai.logging import disable_existing_loggers -from colossalai.pipeline.pipeline_process_group import ppg rpc_is_initialized = _is_current_rpc_agent_set diff --git a/tests/test_pipeline/test_cuda_rpc_chimera.py b/tests/test_legacy/test_pipeline/test_cuda_rpc_chimera.py similarity index 94% rename from tests/test_pipeline/test_cuda_rpc_chimera.py rename to tests/test_legacy/test_pipeline/test_cuda_rpc_chimera.py index 45ad8f828e61..3bff08318d40 100644 --- a/tests/test_pipeline/test_cuda_rpc_chimera.py +++ b/tests/test_legacy/test_pipeline/test_cuda_rpc_chimera.py @@ -1,10 +1,10 @@ import torch -from torch import nn import torch.autograd as autograd +from rpc_test_utils import RpcTestModel, parse_args, rpc_run +from torch import nn -from colossalai.pipeline.rpc import ChimeraPipelineEngine +from colossalai.legacy.pipeline.rpc import ChimeraPipelineEngine from colossalai.testing import assert_close -from rpc_test_utils import rpc_run, parse_args, RpcTestModel # global variable for model created feat_num = 100 diff --git a/tests/test_pipeline/test_cuda_rpc_optimizer.py b/tests/test_legacy/test_pipeline/test_cuda_rpc_optimizer.py similarity index 89% rename from tests/test_pipeline/test_cuda_rpc_optimizer.py rename to tests/test_legacy/test_pipeline/test_cuda_rpc_optimizer.py index 842566730caf..eff031ff8faa 100644 --- a/tests/test_pipeline/test_cuda_rpc_optimizer.py +++ b/tests/test_legacy/test_pipeline/test_cuda_rpc_optimizer.py @@ -1,11 +1,10 @@ import torch -from torch import nn -from torch import autograd -from torch.optim import SGD, Adam, RMSprop, Optimizer +from rpc_test_utils import RpcTestModel, parse_args, rpc_run +from torch import autograd, nn +from torch.optim import SGD, Adam, Optimizer, RMSprop -from colossalai.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine +from colossalai.legacy.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine from colossalai.testing import assert_close -from rpc_test_utils import rpc_run, parse_args, RpcTestModel # global variable for model created feat_num = 100 diff --git a/tests/test_pipeline/test_cuda_rpc_pipeline.py b/tests/test_legacy/test_pipeline/test_cuda_rpc_pipeline.py similarity index 87% rename from tests/test_pipeline/test_cuda_rpc_pipeline.py rename to tests/test_legacy/test_pipeline/test_cuda_rpc_pipeline.py index 8d03e79813e8..1a6077f8d3e9 100644 --- a/tests/test_pipeline/test_cuda_rpc_pipeline.py +++ b/tests/test_legacy/test_pipeline/test_cuda_rpc_pipeline.py @@ -1,8 +1,8 @@ import torch +from rpc_test_utils import RpcTestModel, parse_args, rpc_run from torch import nn -from colossalai.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine -from rpc_test_utils import rpc_run, parse_args, RpcTestModel +from colossalai.legacy.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine # global variable for model created feat_num = 100 diff --git a/tests/test_pipeline/test_cuda_rpc_value_correctness.py b/tests/test_legacy/test_pipeline/test_cuda_rpc_value_correctness.py similarity index 91% rename from tests/test_pipeline/test_cuda_rpc_value_correctness.py rename to tests/test_legacy/test_pipeline/test_cuda_rpc_value_correctness.py index e6713478baec..43966ce3dbda 100644 --- a/tests/test_pipeline/test_cuda_rpc_value_correctness.py +++ b/tests/test_legacy/test_pipeline/test_cuda_rpc_value_correctness.py @@ -1,10 +1,9 @@ import torch -from torch import nn -from torch import autograd +from rpc_test_utils import RpcTestModel, parse_args, rpc_run +from torch import autograd, nn -from colossalai.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine +from colossalai.legacy.pipeline.rpc._pipeline_schedule import FillDrainPipelineEngine, OneFOneBPipelineEngine from colossalai.testing import assert_close -from rpc_test_utils import rpc_run, parse_args, RpcTestModel feat_num = 100 h = 100 diff --git a/tests/test_pipeline/test_middleware_1f1b.py b/tests/test_legacy/test_pipeline/test_middleware_1f1b.py similarity index 95% rename from tests/test_pipeline/test_middleware_1f1b.py rename to tests/test_legacy/test_pipeline/test_middleware_1f1b.py index 5b3aad703275..b800a6e64d11 100644 --- a/tests/test_pipeline/test_middleware_1f1b.py +++ b/tests/test_legacy/test_pipeline/test_middleware_1f1b.py @@ -10,10 +10,10 @@ from colossalai import launch from colossalai.fx import ColoTracer from colossalai.fx.passes.adding_split_node_pass import balanced_split_pass, split_with_split_nodes_pass +from colossalai.legacy.pipeline.middleware.adaptor import get_fx_topology +from colossalai.legacy.pipeline.pipeline_process_group import ppg +from colossalai.legacy.pipeline.rpc._pipeline_schedule import OneFOneBPipelineEngine from colossalai.logging import disable_existing_loggers -from colossalai.pipeline.middleware.adaptor import get_fx_topology -from colossalai.pipeline.pipeline_process_group import ppg -from colossalai.pipeline.rpc._pipeline_schedule import OneFOneBPipelineEngine from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn # global variable for model created diff --git a/tests/test_pipeline/test_pipelinable.py b/tests/test_legacy/test_pipeline/test_pipelinable.py similarity index 96% rename from tests/test_pipeline/test_pipelinable.py rename to tests/test_legacy/test_pipeline/test_pipelinable.py index bb016596beea..2ba5d0aa24d8 100644 --- a/tests/test_pipeline/test_pipelinable.py +++ b/tests/test_legacy/test_pipeline/test_pipelinable.py @@ -1,7 +1,7 @@ import pytest import torch -from colossalai.pipeline.pipelinable import PipelinableContext +from colossalai.legacy.pipeline.pipelinable import PipelinableContext from colossalai.testing import rerun_if_address_is_in_use, rerun_on_exception, spawn NUM_CHUNKS = 1 diff --git a/tests/test_pipeline/test_pipeline_process_group.py b/tests/test_legacy/test_pipeline/test_pipeline_process_group.py similarity index 94% rename from tests/test_pipeline/test_pipeline_process_group.py rename to tests/test_legacy/test_pipeline/test_pipeline_process_group.py index 2a00e3ac55b1..8171ac37a955 100644 --- a/tests/test_pipeline/test_pipeline_process_group.py +++ b/tests/test_legacy/test_pipeline/test_pipeline_process_group.py @@ -4,8 +4,8 @@ from rpc_test_utils import pg_parse_args, rpc_is_initialized from colossalai.initialize import launch +from colossalai.legacy.pipeline.pipeline_process_group import ppg from colossalai.logging import disable_existing_loggers -from colossalai.pipeline.pipeline_process_group import ppg from colossalai.testing import spawn diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_1d.py b/tests/test_utils/test_checkpoint/test_checkpoint_1d.py index 9c3a7e2161d2..9e4f5934c92b 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_1d.py +++ b/tests/test_utils/test_checkpoint/test_checkpoint_1d.py @@ -18,7 +18,7 @@ def build_pipeline(model): - from colossalai.pipeline.utils import partition_uniform + from colossalai.legacy.pipeline.utils import partition_uniform pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_2d.py b/tests/test_utils/test_checkpoint/test_checkpoint_2d.py index 03b2e4f2a9b2..d3aaa783e073 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_2d.py +++ b/tests/test_utils/test_checkpoint/test_checkpoint_2d.py @@ -18,7 +18,7 @@ def build_pipeline(model): - from colossalai.pipeline.utils import partition_uniform + from colossalai.legacy.pipeline.utils import partition_uniform pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py b/tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py index cafffd0a6202..9e52263a2b76 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py +++ b/tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py @@ -18,7 +18,7 @@ def build_pipeline(model): - from colossalai.pipeline.utils import partition_uniform + from colossalai.legacy.pipeline.utils import partition_uniform pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE) diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_3d.py b/tests/test_utils/test_checkpoint/test_checkpoint_3d.py index 9b43be9e8cc5..9f090efd7362 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_3d.py +++ b/tests/test_utils/test_checkpoint/test_checkpoint_3d.py @@ -18,7 +18,7 @@ def build_pipeline(model): - from colossalai.pipeline.utils import partition_uniform + from colossalai.legacy.pipeline.utils import partition_uniform pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE) pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)