From 7b130cd72e38234b9f52db3d6ac380b4edf72dc5 Mon Sep 17 00:00:00 2001 From: Olatunji Ruwase Date: Wed, 19 Feb 2025 14:05:18 -0500 Subject: [PATCH] Rename aio_thread_count Signed-off-by: Olatunji Ruwase --- deepspeed/runtime/swap_tensor/aio_config.py | 20 ++++++++++++------- deepspeed/runtime/swap_tensor/constants.py | 6 +++--- .../runtime/swap_tensor/optimizer_utils.py | 2 +- .../partitioned_optimizer_swapper.py | 8 +++++--- .../swap_tensor/partitioned_param_swapper.py | 20 +++++++++++-------- .../pipelined_optimizer_swapper.py | 18 ++++++++++------- 6 files changed, 45 insertions(+), 29 deletions(-) diff --git a/deepspeed/runtime/swap_tensor/aio_config.py b/deepspeed/runtime/swap_tensor/aio_config.py index 46c3f2a0c954..be6c7d93c86a 100644 --- a/deepspeed/runtime/swap_tensor/aio_config.py +++ b/deepspeed/runtime/swap_tensor/aio_config.py @@ -10,7 +10,7 @@ AIO_DEFAULT_DICT = { AIO_BLOCK_SIZE: AIO_BLOCK_SIZE_DEFAULT, AIO_QUEUE_DEPTH: AIO_QUEUE_DEPTH_DEFAULT, - AIO_THREAD_COUNT: AIO_THREAD_COUNT_DEFAULT, + AIO_INTRA_OP_PARALLELISM: AIO_INTRA_OP_PARALLELISM_DEFAULT, AIO_SINGLE_SUBMIT: AIO_SINGLE_SUBMIT_DEFAULT, AIO_OVERLAP_EVENTS: AIO_OVERLAP_EVENTS_DEFAULT, AIO_USE_GDS: AIO_USE_GDS_DEFAULT @@ -21,12 +21,18 @@ def get_aio_config(param_dict): if AIO in param_dict.keys() and param_dict[AIO] is not None: aio_dict = param_dict[AIO] aio_config = { - AIO_BLOCK_SIZE: get_scalar_param(aio_dict, AIO_BLOCK_SIZE, AIO_BLOCK_SIZE_DEFAULT), - AIO_QUEUE_DEPTH: get_scalar_param(aio_dict, AIO_QUEUE_DEPTH, AIO_QUEUE_DEPTH_DEFAULT), - AIO_THREAD_COUNT: get_scalar_param(aio_dict, AIO_THREAD_COUNT, AIO_THREAD_COUNT_DEFAULT), - AIO_SINGLE_SUBMIT: get_scalar_param(aio_dict, AIO_SINGLE_SUBMIT, AIO_SINGLE_SUBMIT_DEFAULT), - AIO_OVERLAP_EVENTS: get_scalar_param(aio_dict, AIO_OVERLAP_EVENTS, AIO_OVERLAP_EVENTS_DEFAULT), - AIO_USE_GDS: get_scalar_param(aio_dict, AIO_USE_GDS, AIO_USE_GDS_DEFAULT) + AIO_BLOCK_SIZE: + get_scalar_param(aio_dict, AIO_BLOCK_SIZE, AIO_BLOCK_SIZE_DEFAULT), + AIO_QUEUE_DEPTH: + get_scalar_param(aio_dict, AIO_QUEUE_DEPTH, AIO_QUEUE_DEPTH_DEFAULT), + AIO_INTRA_OP_PARALLELISM: + get_scalar_param(aio_dict, AIO_INTRA_OP_PARALLELISM, AIO_INTRA_OP_PARALLELISM_DEFAULT), + AIO_SINGLE_SUBMIT: + get_scalar_param(aio_dict, AIO_SINGLE_SUBMIT, AIO_SINGLE_SUBMIT_DEFAULT), + AIO_OVERLAP_EVENTS: + get_scalar_param(aio_dict, AIO_OVERLAP_EVENTS, AIO_OVERLAP_EVENTS_DEFAULT), + AIO_USE_GDS: + get_scalar_param(aio_dict, AIO_USE_GDS, AIO_USE_GDS_DEFAULT) } if aio_config[AIO_USE_GDS]: diff --git a/deepspeed/runtime/swap_tensor/constants.py b/deepspeed/runtime/swap_tensor/constants.py index cee20ac7b78c..c1207749eac6 100644 --- a/deepspeed/runtime/swap_tensor/constants.py +++ b/deepspeed/runtime/swap_tensor/constants.py @@ -9,7 +9,7 @@ "aio": { "block_size": 1048576, "queue_depth": 8, - "thread_count": 1, + "intra_op_parallelism": 1, "single_submit": false, "overlap_events": true, "use_gds": false @@ -20,8 +20,8 @@ AIO_BLOCK_SIZE_DEFAULT = 1048576 AIO_QUEUE_DEPTH = "queue_depth" AIO_QUEUE_DEPTH_DEFAULT = 8 -AIO_THREAD_COUNT = "thread_count" -AIO_THREAD_COUNT_DEFAULT = 1 +AIO_INTRA_OP_PARALLELISM = "intra_op_parallelism" +AIO_INTRA_OP_PARALLELISM_DEFAULT = 1 AIO_SINGLE_SUBMIT = "single_submit" AIO_SINGLE_SUBMIT_DEFAULT = False AIO_OVERLAP_EVENTS = "overlap_events" diff --git a/deepspeed/runtime/swap_tensor/optimizer_utils.py b/deepspeed/runtime/swap_tensor/optimizer_utils.py index 5d837e386a95..d7b0ea9634b2 100644 --- a/deepspeed/runtime/swap_tensor/optimizer_utils.py +++ b/deepspeed/runtime/swap_tensor/optimizer_utils.py @@ -130,7 +130,7 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume # Read/Write alignment for each thread during Intra-request parallelism self.min_aio_bytes = max(MIN_AIO_BYTES, aio_config[AIO_BLOCK_SIZE]) - self.aligned_bytes = AIO_ALIGNED_BYTES * aio_config[AIO_THREAD_COUNT] + self.aligned_bytes = AIO_ALIGNED_BYTES * aio_config[AIO_INTRA_OP_PARALLELISM] self.numel_alignment = self.aligned_bytes // self.swap_element_size # Swap buffer management diff --git a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py index e53a280befe4..8b6cbe8fbb51 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py @@ -33,9 +33,11 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume largest_numel, device, dtype, timers) aio_op = AsyncIOBuilder().load() - self.aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH], - aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS], - aio_config[AIO_THREAD_COUNT]) + self.aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE], + queue_depth=aio_config[AIO_QUEUE_DEPTH], + single_submit=aio_config[AIO_SINGLE_SUBMIT], + overlap_events=aio_config[AIO_OVERLAP_EVENTS], + intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM]) # Overlap swapping out self.gradient_swapper = AsyncTensorSwapper(aio_handle=self.aio_handle, diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 26fbf6164d54..f80fe1501c00 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -98,7 +98,7 @@ def _configure_aio(self, ds_config): # Read/Write alignment for each thread during Intra-request parallelism self.min_aio_bytes = max(MIN_AIO_BYTES, self.aio_config[AIO_BLOCK_SIZE]) - self.aligned_bytes = AIO_ALIGNED_BYTES * self.aio_config[AIO_THREAD_COUNT] + self.aligned_bytes = AIO_ALIGNED_BYTES * self.aio_config[AIO_INTRA_OP_PARALLELISM] self.numel_alignment = self.aligned_bytes // self.swap_element_size self.elements_per_buffer = self.swap_config.buffer_size @@ -108,13 +108,17 @@ def _configure_aio(self, ds_config): self.available_buffer_ids = [i for i in range(self.param_buffer_count)] self.reserved_buffer_ids = [] - self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], - self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], - self.aio_config[AIO_THREAD_COUNT]) - - self.aio_write_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], - self.aio_config[AIO_SINGLE_SUBMIT], - self.aio_config[AIO_OVERLAP_EVENTS], self.aio_config[AIO_THREAD_COUNT]) + self.aio_read_handle = self.aio_handle(block_size=self.aio_config[AIO_BLOCK_SIZE], + queue_depth=self.aio_config[AIO_QUEUE_DEPTH], + single_submit=self.aio_config[AIO_SINGLE_SUBMIT], + overlap_events=self.aio_config[AIO_OVERLAP_EVENTS], + intra_op_parallelism=self.aio_config[AIO_INTRA_OP_PARALLELISM]) + + self.aio_write_handle = self.aio_handle(block_size=self.aio_config[AIO_BLOCK_SIZE], + queue_depth=self.aio_config[AIO_QUEUE_DEPTH], + single_submit=self.aio_config[AIO_SINGLE_SUBMIT], + overlap_events=self.aio_config[AIO_OVERLAP_EVENTS], + intra_op_parallelism=self.aio_config[AIO_INTRA_OP_PARALLELISM]) if self.use_gds: self.buffers = torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count), diff --git a/deepspeed/runtime/swap_tensor/pipelined_optimizer_swapper.py b/deepspeed/runtime/swap_tensor/pipelined_optimizer_swapper.py index 66a372877d38..8f6d72e35f63 100644 --- a/deepspeed/runtime/swap_tensor/pipelined_optimizer_swapper.py +++ b/deepspeed/runtime/swap_tensor/pipelined_optimizer_swapper.py @@ -56,13 +56,17 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume device, dtype, timers) aio_op = AsyncIOBuilder().load() - self.write_aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH], - aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS], - aio_config[AIO_THREAD_COUNT]) - - self.read_aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH], - aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS], - aio_config[AIO_THREAD_COUNT]) + self.write_aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE], + queue_depth=aio_config[AIO_QUEUE_DEPTH], + single_submit=aio_config[AIO_SINGLE_SUBMIT], + overlap_events=aio_config[AIO_OVERLAP_EVENTS], + intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM]) + + self.read_aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE], + queue_depth=aio_config[AIO_QUEUE_DEPTH], + single_submit=aio_config[AIO_SINGLE_SUBMIT], + overlap_events=aio_config[AIO_OVERLAP_EVENTS], + intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM]) # Overlap gradient swap out self.gradient_swapper = AsyncTensorSwapper(aio_handle=self.write_aio_handle,