Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bin/ds_io
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3

from deepspeed.nvme import ds_io_main

if __name__ == '__main__':
ds_io_main()
9 changes: 9 additions & 0 deletions bin/ds_nvme_tune
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env python3

from deepspeed.nvme import sweep_main, generate_main, parse_sweep_arguments

if __name__ == '__main__':
args = parse_sweep_arguments()
print(f"Running DeepNVMe performance tuning on {args.nvme_dir}")
sweep_main(args)
generate_main(args.log_dir)
8 changes: 8 additions & 0 deletions deepspeed/nvme/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

from .perf_run_sweep import sweep_main, parse_sweep_arguments
from .perf_generate_param import generate_main
from .test_ds_aio import ds_io_main
175 changes: 175 additions & 0 deletions deepspeed/nvme/ds_aio_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
"""
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""

import argparse
import os
from .test_ds_aio_utils import refine_integer_value
from deepspeed.accelerator import get_accelerator

MAPPING_DELIMITER = ':'


def refine_args(args):
if args.io_size and type(args.io_size) == str:
args.io_size = refine_integer_value(args.io_size)

if args.block_size and type(args.block_size) == str:
args.block_size = refine_integer_value(args.block_size)

return args


def _get_mapping_dict(args):
if args.folder is not None:
d = {i: args.folder for i in range(args.multi_process)}
else:
d = {}
for m in args.folder_to_device_mapping:
fields = m.split(MAPPING_DELIMITER)
d[fields[1]] = fields[0]

return d


def _validate_folder_mapping(args):
no_error = True
error_messages = []
invalid_mappings = [m for m in args.folder_to_device_mapping if MAPPING_DELIMITER not in m]
if len(invalid_mappings) > 0:
error_messages.append(
f'Missing delimiter ({MAPPING_DELIMITER}) in folder_to_device_mapping {invalid_mappings}')
no_error = False

folder_list = [m.split(MAPPING_DELIMITER)[0] for m in args.folder_to_device_mapping]
invalid_folders = [d for d in folder_list if not os.path.exists(d)]
if len(invalid_folders) > 0:
error_messages.append(f'Invalid folders in folder_to_device_mapping: {invalid_folders}')
no_error = False

if args.gpu:
device_list = [int(m.split(MAPPING_DELIMITER)[1]) for m in args.folder_to_device_mapping]
invalid_device_list = [dev_id for dev_id in device_list if not dev_id < get_accelerator().device_count()]
if len(invalid_device_list) > 0:
error_messages.append(f'Invalid device ids in folder_to_device_mapping: {invalid_device_list}')
no_error = False

return no_error, error_messages


def validate_args(args):
no_error = True
error_messages = []

if args.folder is not None and len(args.folder_to_device_mapping) > 0:
error_messages.append(f'--folder and --folder_to_device_mapping cannot be specified together.')
no_error = False
elif args.folder is None and len(args.folder_to_device_mapping) == 0:
error_messages.append(f'At least one of --folder or --folder_to_device_mapping must be specified.')
no_error = False

# Validate --folder
if args.folder is not None and not os.path.exists(args.folder):
no_error = False
error_messages.append(f'Invalid folder in --folder: {args.folder} ')

# Validate --folder_mapping_to_device
if len(args.folder_to_device_mapping) > 0:
no_mapping_error, mapping_error_messages = _validate_folder_mapping(args)
no_error = no_error and no_mapping_error
error_messages += mapping_error_messages

# Validate --gpu, --use_gds
if args.use_gds and not args.gpu:
error_messages.append(f'--gpu must be set to transfer with --use_gds')
no_error = False

if not no_error:
print(f'Found {len(error_messages)} validation errors')
for i, msg in enumerate(error_messages):
print(f'{i+1}: {msg}')

return no_error


def parse_arguments():
parser = argparse.ArgumentParser()

parser.add_argument('--folder', default=None, type=str, help='Folder to use for I/O.')

parser.add_argument('--folder_to_device_mapping',
default=[],
nargs='+',
help='Specification of mapping of folder to (gpu) device id, (ignored for cpu accesses).'
'Can be specified multiple times for multi-process runs,'
'e.g. --folder_to_device_mapping /mnt/nvme0:0 --folder_to_device_mapping /mnt/nvme1:15 --gpu'
'means access /mnt/nvme0 with gpu 0 and /mnt/nvme1 with gpu 15')

parser.add_argument('--io_size', type=str, default=None, required=True, help='Number of bytes to read or write.')

parser.add_argument('--read', action='store_true', help='Perform read I/O (default is write)')

parser.add_argument('--multi_process',
type=int,
default=1,
help='Number of parallel processes doing I/O (default 1).')

parser.add_argument('--block_size',
type=str,
default='1M',
help='I/O block size. Can use K, M, or G suffix (default 1M for 1 megabytes).')

parser.add_argument('--queue_depth', type=int, default=32, help='I/O queue depth (default 32).')

parser.add_argument('--single_submit',
action='store_true',
help='Submit I/O requests in singles (default is submit queue_depth amount at once.).')

parser.add_argument(
'--sequential_requests',
action='store_true',
help=
'Delay I/O request submission until completion of prior requests (default is overlap I/O submission and completion requests.).'
)

parser.add_argument('--validate', action='store_true', help='Perform validation of I/O transfer in library.')

parser.add_argument('--handle', action='store_true', help='Use AIO handle.')

parser.add_argument('--loops', type=int, default=3, help='Count of operation repetitions')

parser.add_argument('--io_parallel', type=int, default=None, help='Per iop parallelism')

parser.add_argument('--gpu', action='store_true', help='Use GPU memory')

parser.add_argument('--use_gds', action='store_true', help='Enable GDS AIO')

parser.add_argument('--slow_bounce_buffer',
action='store_true',
help='For GPU memory transfers, measure impact of bounce buffer pinning on critical path.')

args = parser.parse_args()
print(f'args = {args}')
return args


def get_validated_args():
args = parse_arguments()
args = refine_args(args)
if not validate_args(args):
quit()
print(f'Successful validation of command line arguments')

peer_tag = 'gpu' if args.gpu else 'process'
args.mapping_dict = _get_mapping_dict(args)
args.mapping_list = [(device_id, folder) for device_id, folder in args.mapping_dict.items()]
assert len(args.mapping_dict) == len(args.mapping_list)
print(f'Configuring {len(args.mapping_list)} {peer_tag} to folder mapping')
for i, (device_id, folder) in enumerate(args.mapping_list):
print(f'[{i}]: {peer_tag} {device_id} <----> {folder}')

return args
134 changes: 134 additions & 0 deletions deepspeed/nvme/ds_aio_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
"""
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""

import torch
import os
import time
from deepspeed.ops.aio import AsyncIOBuilder
from multiprocessing import Pool, Barrier
from .test_ds_aio_utils import report_results, task_log, task_barrier


def pre_basic(args, tid, read_op):
io_string = "Read" if read_op else "Write"
num_bytes = os.path.getsize(args.read_file) if read_op else args.write_size
file = args.read_file if read_op else f'{args.write_file}.{tid}'

task_log(tid, f'Allocate tensor of size {num_bytes} bytes')
buffer = torch.empty(num_bytes, dtype=torch.uint8, device='cpu').pin_memory()
task_log(tid, f'{io_string} file {file} of size {num_bytes} bytes from buffer on device {buffer.device}')

ctxt = {}
ctxt['file'] = file
ctxt['num_bytes'] = num_bytes
ctxt['buffer'] = buffer
ctxt['elapsed_sec'] = 0

return ctxt


def pre_basic_read(pool_params):
args, tid = pool_params
ctxt = pre_basic(args, tid, True)
return ctxt


def pre_basic_write(pool_params):
args, tid = pool_params
ctxt = pre_basic(args, tid, False)
return ctxt


def post_basic(pool_params):
_, _, ctxt = pool_params
ctxt["buffer"].detach()
ctxt["buffer"] = None
return ctxt


def main_basic_read(pool_params):
args, tid, ctxt = pool_params
start_time = time.time()
AsyncIOBuilder().load().aio_read(ctxt['buffer'], ctxt['file'], args.block_size, args.queue_depth,
args.single_submit, not args.sequential_requests, args.validate)
end_time = time.time()
ctxt['elapsed_sec'] += end_time - start_time

return ctxt


def main_basic_write(pool_params):
args, tid, ctxt = pool_params
start_time = time.time()
AsyncIOBuilder().load().aio_write(ctxt['buffer'], ctxt['file'], args.block_size, args.queue_depth,
args.single_submit, not args.sequential_requests, args.validate)
end_time = time.time()
ctxt['elapsed_sec'] += end_time - start_time

return ctxt


def get_schedule(args, read_op):
schedule = {}
if read_op:
schedule['pre'] = pre_basic_read
schedule['post'] = post_basic
schedule['main'] = main_basic_read
else:
schedule['pre'] = pre_basic_write
schedule['post'] = post_basic
schedule['main'] = main_basic_write

return schedule


def _aio_handle_tasklet(pool_params):
args, tid, read_op = pool_params
num_processes = len(args.mapping_dict)

# Create schedule
schedule = get_schedule(args, read_op)
task_log(tid, f'schedule = {schedule}')
task_barrier(aio_barrier, num_processes)

# Run pre task
task_log(tid, f'running pre-task')
ctxt = schedule["pre"]((args, tid))
task_barrier(aio_barrier, num_processes)

# Run main tasks in a loop
ctxt["main_task_sec"] = 0
for i in range(args.loops):
task_log(tid, f'running main task {i}')
start_time = time.time()
ctxt = schedule["main"]((args, tid, ctxt))
task_barrier(aio_barrier, num_processes)
stop_time = time.time()
ctxt["main_task_sec"] += stop_time - start_time

# Run post task
task_log(tid, f'running post-task')
ctxt = schedule["post"]((args, tid, ctxt))
task_barrier(aio_barrier, num_processes)

return ctxt["main_task_sec"], ctxt["elapsed_sec"], ctxt["num_bytes"] * args.loops


def _init_tasklet(b):
global aio_barrier
aio_barrier = b


def aio_basic_multiprocessing(args, read_op):
num_processes = len(args.mapping_dict)
b = Barrier(num_processes)
pool_params = [(args, p, read_op) for p in range(num_processes)]
with Pool(processes=num_processes, initializer=_init_tasklet, initargs=(b, )) as p:
pool_results = p.map(_aio_handle_tasklet, pool_params)

report_results(args, read_op, pool_results)
Loading