Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions deepspeed/runtime/swap_tensor/aio_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
AIO_DEFAULT_DICT = {
AIO_BLOCK_SIZE: AIO_BLOCK_SIZE_DEFAULT,
AIO_QUEUE_DEPTH: AIO_QUEUE_DEPTH_DEFAULT,
AIO_THREAD_COUNT: AIO_THREAD_COUNT_DEFAULT,
AIO_INTRA_OP_PARALLELISM: AIO_INTRA_OP_PARALLELISM_DEFAULT,
AIO_SINGLE_SUBMIT: AIO_SINGLE_SUBMIT_DEFAULT,
AIO_OVERLAP_EVENTS: AIO_OVERLAP_EVENTS_DEFAULT,
AIO_USE_GDS: AIO_USE_GDS_DEFAULT
Expand All @@ -21,12 +21,18 @@ def get_aio_config(param_dict):
if AIO in param_dict.keys() and param_dict[AIO] is not None:
aio_dict = param_dict[AIO]
aio_config = {
AIO_BLOCK_SIZE: get_scalar_param(aio_dict, AIO_BLOCK_SIZE, AIO_BLOCK_SIZE_DEFAULT),
AIO_QUEUE_DEPTH: get_scalar_param(aio_dict, AIO_QUEUE_DEPTH, AIO_QUEUE_DEPTH_DEFAULT),
AIO_THREAD_COUNT: get_scalar_param(aio_dict, AIO_THREAD_COUNT, AIO_THREAD_COUNT_DEFAULT),
AIO_SINGLE_SUBMIT: get_scalar_param(aio_dict, AIO_SINGLE_SUBMIT, AIO_SINGLE_SUBMIT_DEFAULT),
AIO_OVERLAP_EVENTS: get_scalar_param(aio_dict, AIO_OVERLAP_EVENTS, AIO_OVERLAP_EVENTS_DEFAULT),
AIO_USE_GDS: get_scalar_param(aio_dict, AIO_USE_GDS, AIO_USE_GDS_DEFAULT)
AIO_BLOCK_SIZE:
get_scalar_param(aio_dict, AIO_BLOCK_SIZE, AIO_BLOCK_SIZE_DEFAULT),
AIO_QUEUE_DEPTH:
get_scalar_param(aio_dict, AIO_QUEUE_DEPTH, AIO_QUEUE_DEPTH_DEFAULT),
AIO_INTRA_OP_PARALLELISM:
get_scalar_param(aio_dict, AIO_INTRA_OP_PARALLELISM, AIO_INTRA_OP_PARALLELISM_DEFAULT),
AIO_SINGLE_SUBMIT:
get_scalar_param(aio_dict, AIO_SINGLE_SUBMIT, AIO_SINGLE_SUBMIT_DEFAULT),
AIO_OVERLAP_EVENTS:
get_scalar_param(aio_dict, AIO_OVERLAP_EVENTS, AIO_OVERLAP_EVENTS_DEFAULT),
AIO_USE_GDS:
get_scalar_param(aio_dict, AIO_USE_GDS, AIO_USE_GDS_DEFAULT)
}

if aio_config[AIO_USE_GDS]:
Expand Down
6 changes: 3 additions & 3 deletions deepspeed/runtime/swap_tensor/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"aio": {
"block_size": 1048576,
"queue_depth": 8,
"thread_count": 1,
"intra_op_parallelism": 1,
"single_submit": false,
"overlap_events": true,
"use_gds": false
Expand All @@ -20,8 +20,8 @@
AIO_BLOCK_SIZE_DEFAULT = 1048576
AIO_QUEUE_DEPTH = "queue_depth"
AIO_QUEUE_DEPTH_DEFAULT = 8
AIO_THREAD_COUNT = "thread_count"
AIO_THREAD_COUNT_DEFAULT = 1
AIO_INTRA_OP_PARALLELISM = "intra_op_parallelism"
AIO_INTRA_OP_PARALLELISM_DEFAULT = 1
AIO_SINGLE_SUBMIT = "single_submit"
AIO_SINGLE_SUBMIT_DEFAULT = False
AIO_OVERLAP_EVENTS = "overlap_events"
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/swap_tensor/optimizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume

# Read/Write alignment for each thread during Intra-request parallelism
self.min_aio_bytes = max(MIN_AIO_BYTES, aio_config[AIO_BLOCK_SIZE])
self.aligned_bytes = AIO_ALIGNED_BYTES * aio_config[AIO_THREAD_COUNT]
self.aligned_bytes = AIO_ALIGNED_BYTES * aio_config[AIO_INTRA_OP_PARALLELISM]
self.numel_alignment = self.aligned_bytes // self.swap_element_size

# Swap buffer management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume
largest_numel, device, dtype, timers)

aio_op = AsyncIOBuilder().load()
self.aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH],
aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS],
aio_config[AIO_THREAD_COUNT])
self.aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE],
queue_depth=aio_config[AIO_QUEUE_DEPTH],
single_submit=aio_config[AIO_SINGLE_SUBMIT],
overlap_events=aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM])

# Overlap swapping out
self.gradient_swapper = AsyncTensorSwapper(aio_handle=self.aio_handle,
Expand Down
20 changes: 12 additions & 8 deletions deepspeed/runtime/swap_tensor/partitioned_param_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _configure_aio(self, ds_config):

# Read/Write alignment for each thread during Intra-request parallelism
self.min_aio_bytes = max(MIN_AIO_BYTES, self.aio_config[AIO_BLOCK_SIZE])
self.aligned_bytes = AIO_ALIGNED_BYTES * self.aio_config[AIO_THREAD_COUNT]
self.aligned_bytes = AIO_ALIGNED_BYTES * self.aio_config[AIO_INTRA_OP_PARALLELISM]
self.numel_alignment = self.aligned_bytes // self.swap_element_size

self.elements_per_buffer = self.swap_config.buffer_size
Expand All @@ -108,13 +108,17 @@ def _configure_aio(self, ds_config):
self.available_buffer_ids = [i for i in range(self.param_buffer_count)]
self.reserved_buffer_ids = []

self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH],
self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS],
self.aio_config[AIO_THREAD_COUNT])

self.aio_write_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH],
self.aio_config[AIO_SINGLE_SUBMIT],
self.aio_config[AIO_OVERLAP_EVENTS], self.aio_config[AIO_THREAD_COUNT])
self.aio_read_handle = self.aio_handle(block_size=self.aio_config[AIO_BLOCK_SIZE],
queue_depth=self.aio_config[AIO_QUEUE_DEPTH],
single_submit=self.aio_config[AIO_SINGLE_SUBMIT],
overlap_events=self.aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=self.aio_config[AIO_INTRA_OP_PARALLELISM])

self.aio_write_handle = self.aio_handle(block_size=self.aio_config[AIO_BLOCK_SIZE],
queue_depth=self.aio_config[AIO_QUEUE_DEPTH],
single_submit=self.aio_config[AIO_SINGLE_SUBMIT],
overlap_events=self.aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=self.aio_config[AIO_INTRA_OP_PARALLELISM])

if self.use_gds:
self.buffers = torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count),
Expand Down
18 changes: 11 additions & 7 deletions deepspeed/runtime/swap_tensor/pipelined_optimizer_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume
device, dtype, timers)

aio_op = AsyncIOBuilder().load()
self.write_aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH],
aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS],
aio_config[AIO_THREAD_COUNT])

self.read_aio_handle = aio_op.aio_handle(aio_config[AIO_BLOCK_SIZE], aio_config[AIO_QUEUE_DEPTH],
aio_config[AIO_SINGLE_SUBMIT], aio_config[AIO_OVERLAP_EVENTS],
aio_config[AIO_THREAD_COUNT])
self.write_aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE],
queue_depth=aio_config[AIO_QUEUE_DEPTH],
single_submit=aio_config[AIO_SINGLE_SUBMIT],
overlap_events=aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM])

self.read_aio_handle = aio_op.aio_handle(block_size=aio_config[AIO_BLOCK_SIZE],
queue_depth=aio_config[AIO_QUEUE_DEPTH],
single_submit=aio_config[AIO_SINGLE_SUBMIT],
overlap_events=aio_config[AIO_OVERLAP_EVENTS],
intra_op_parallelism=aio_config[AIO_INTRA_OP_PARALLELISM])

# Overlap gradient swap out
self.gradient_swapper = AsyncTensorSwapper(aio_handle=self.write_aio_handle,
Expand Down