Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.
37 changes: 24 additions & 13 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,30 @@ try {
}
}
},
'dist-kvstore tests GPU': {
node('mxnetlinux-gpu') {
ws('workspace/it-dist-kvstore') {
timeout(time: max_time, unit: 'MINUTES') {
init_git()
unpack_lib('gpu')
docker_run('ubuntu_gpu', 'integrationtest_ubuntu_gpu_dist_kvstore', true)
publish_test_coverage()
}
}
}
},
'dist-kvstore tests CPU': {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome!

node('mxnetlinux-cpu') {
ws('workspace/it-dist-kvstore') {
timeout(time: max_time, unit: 'MINUTES') {
init_git()
unpack_lib('cpu')
docker_run('ubuntu_cpu', 'integrationtest_ubuntu_cpu_dist_kvstore', false)
publish_test_coverage()
}
}
}
},
'Scala: GPU': {
node('mxnetlinux-gpu') {
ws('workspace/ut-scala-gpu') {
Expand All @@ -980,19 +1004,6 @@ try {
}
}
}
// Disable until fixed https://github.com/apache/incubator-mxnet/issues/11441
// 'dist-kvstore tests GPU': {
// node('mxnetlinux-gpu') {
// ws('workspace/it-dist-kvstore') {
// timeout(time: max_time, unit: 'MINUTES') {
// init_git()
// unpack_lib('gpu')
// docker_run('ubuntu_gpu', 'integrationtest_ubuntu_gpu_dist_kvstore', true)
// publish_test_coverage()
// }
// }
// }
//}
}

stage('Deploy') {
Expand Down
21 changes: 17 additions & 4 deletions ci/docker/runtime_functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,22 @@ integrationtest_ubuntu_gpu_cpp_package() {
cpp-package/tests/ci_test.sh
}

integrationtest_ubuntu_cpu_dist_kvstore() {
set -ex
export PYTHONPATH=./python/
export MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
export MXNET_USE_OPERATOR_TUNING=0
cd tests/nightly/
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon_step_cpu
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon_sparse_step_cpu
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=invalid_cpu
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon_type_cpu
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --no-multiprecision
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=compressed_cpu
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=compressed_cpu --no-multiprecision
}

integrationtest_ubuntu_gpu_scala() {
set -ex
make scalapkg USE_OPENCV=1 USE_BLAS=openblas USE_CUDA=1 USE_CUDA_PATH=/usr/local/cuda USE_CUDNN=1 USE_DIST_KVSTORE=1 SCALA_ON_GPU=1
Expand All @@ -729,11 +745,8 @@ integrationtest_ubuntu_gpu_dist_kvstore() {
export PYTHONPATH=./python/
export MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
cd tests/nightly/
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --no-multiprecision
../../tools/launch.py -n 7 --launcher local python dist_device_sync_kvstore.py
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=invalid
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon
../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=init_gpu
}

test_ubuntu_cpu_python2() {
Expand Down
9 changes: 6 additions & 3 deletions python/mxnet/gluon/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(self, params, optimizer, optimizer_params=None, kvstore='device',
self._kv_initialized = False
self._kvstore = None
self._update_on_kvstore = None
self._distributed = None
self._params_to_init = []
self._reset_kvstore()

Expand Down Expand Up @@ -150,6 +151,7 @@ def _reset_kvstore(self):
raise RuntimeError("Cannot reset distributed KVStore.")
self._kv_initialized = False
self._kvstore = None
self._distributed = None
self._update_on_kvstore = None
self._params_to_init = [param for param in self._params]

Expand Down Expand Up @@ -190,7 +192,8 @@ def _init_kvstore(self):
if kvstore:
if self._compression_params:
kvstore.set_gradient_compression(self._compression_params)
if 'dist' in kvstore.type:
self._distributed = 'dist' in kvstore.type
if self._distributed:
# kv.pull(row_sparse_grad) is not supported for dist kvstore
update_on_kvstore = self._contains_sparse_weight or self._contains_sparse_grad
if update_on_kvstore:
Expand Down Expand Up @@ -291,9 +294,9 @@ def _allreduce_grads(self):
if param.grad_req != 'null':

self._kvstore.push(i, param.list_grad(), priority=-i)

if not self._update_on_kvstore:
self._kvstore.pull(i, param.list_grad(), priority=-i, ignore_sparse=False)
self._kvstore.pull(i, param.list_grad(), priority=-i,
ignore_sparse=self._distributed)

def update(self, batch_size, ignore_stale_grad=False):
"""Makes one step of parameter update.
Expand Down
2 changes: 0 additions & 2 deletions src/kvstore/kvstore_dist_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ class KVStoreDistServer {
if (has_multi_precision_copy(type)) CopyFromTo(recved, updateBuf->temp_array);
const NDArray& to_merge = has_multi_precision_copy(type) ? updateBuf->temp_array : recved;
// accumulate row_sparse gradients
// TODO(haibin) override + operator for row_sparse NDArray
// instead of calling BinaryComputeRspRsp directly
using namespace mshadow;
Engine::Get()->PushAsync(
[to_merge, updateBuf, out](RunContext ctx, Engine::CallbackOnComplete on_complete) {
Expand Down
10 changes: 8 additions & 2 deletions src/operator/operator_tune-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,14 @@ class OperatorTune : public OperatorTuneByType<DType> {
// Not especially concerned with a race condition, since this hsould
// run when only one thread is active (static init), just don't cache this variable
OperatorTuneBase::calculated_.store(true);
OperatorTuneBase::omp_overhead_ns_ = GetOMPLoopOverhead();
std::string config = dmlc::GetEnv("MXNET_USE_OPERATOR_TUNING", std::string());
StringUtil::trim(&config);
// disabled
if (!config.empty() && ::isdigit(config[0]) && std::atoi(config.c_str()) == 0) {
OperatorTuneBase::omp_overhead_ns_ = INT_MAX;
} else {
OperatorTuneBase::omp_overhead_ns_ = GetOMPLoopOverhead();
}
ParseEnablerConfig(config);
}

Expand Down Expand Up @@ -435,7 +441,7 @@ class OperatorTune : public OperatorTuneByType<DType> {
}

/*!
* \brief Parse MXNET_ENABLE_OPERATOR_TUNING environment variable
* \brief Parse MXNET_USE_OPERATOR_TUNING environment variable
* \param config String representation of MXNET_ENABLE_OPERATOR_TUNING environment variable
* Values:
* 0=disable all
Expand Down
63 changes: 51 additions & 12 deletions tests/nightly/dist_sync_kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def check_invalid_pull():
check_invalid_gluon_trainer_reset()
check_invalid_pull()

def test_gluon_trainer():
def test_gluon_trainer_type():
def check_trainer_kv_type(stype, grad_stype, update_on_kv):
params = mx.gluon.ParameterDict()
x = params.get('x', shape=(10,1), lr_mult=1.0, stype=stype, grad_stype=grad_stype)
Expand All @@ -388,28 +388,67 @@ def check_trainer_kv_type(stype, grad_stype, update_on_kv):
check_trainer_kv_type('default', 'default', False)
check_trainer_kv_type('default', 'row_sparse', True)
check_trainer_kv_type('row_sparse', 'row_sparse', True)
print('worker ' + str(my_rank) + ' passed test_gluon_trainer')

print('worker ' + str(my_rank) + ' passed test_gluon_trainer_type')

def test_gluon_trainer_step():
def check_trainer_step():
ctx = mx.cpu(0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this on purpose? We run our KVstore tests on a GPU instance. if we don't require GPU, please downgrade to using a cpu instance.

shape = (10, 1)
x = mx.gluon.Parameter('x', shape=shape)
x.initialize(ctx=ctx, init='ones')
trainer = mx.gluon.Trainer([x], 'sgd', {'learning_rate': 1.0, 'multi_precision': False}, kvstore=kv)
with mx.autograd.record():
w = x.data(ctx)
y = (my_rank + 1) * w
y.backward()
trainer.step(1)
expected = 1 - (1 + nworker) * nworker / 2
assert_almost_equal(x.data(ctx).asnumpy(), np.full(shape, expected))
check_trainer_step()
print('worker ' + str(my_rank) + ' passed test_gluon_trainer_step')

def test_gluon_trainer_sparse_step():
def check_trainer_sparse_step():
ctx = mx.cpu(0)
shape = (2, 10)
all_rows = mx.nd.arange(0, shape[0], ctx=ctx)
x = mx.gluon.Parameter('x', shape=shape, stype='row_sparse', grad_stype='row_sparse')
x.initialize(ctx=ctx, init='ones')
trainer = mx.gluon.Trainer([x], 'sgd', {'learning_rate': 1.0}, kvstore=kv)
with mx.autograd.record():
w = x.row_sparse_data(all_rows)
y = (my_rank + 1) * w
y.backward()
trainer.step(1)
expected = 1 - (1 + nworker) * nworker / 2
assert_almost_equal(x.row_sparse_data(all_rows).asnumpy(), np.full(shape, expected))
check_trainer_sparse_step()
print('worker ' + str(my_rank) + ' passed test_gluon_trainer_sparse_step')

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='test distributed kvstore in dist_sync mode')
parser.add_argument('--nrepeat', type=int, default=7)
parser.add_argument('--type', type=str, default='all')
parser.add_argument('--type', type=str, default='default_cpu')
parser.add_argument('--no-gpu', dest='gpu', action='store_false')
parser.add_argument('--no-multiprecision', dest='multiprecision', action='store_false')
opt = parser.parse_args()
if opt.type == 'gluon':
test_gluon_trainer()
if opt.type == 'invalid':
if opt.type == 'gluon_type_cpu':
test_gluon_trainer_type()
elif opt.type == 'gluon_step_cpu':
test_gluon_trainer_step()
elif opt.type == 'gluon_sparse_step_cpu':
test_gluon_trainer_sparse_step()
elif opt.type == 'invalid_cpu':
test_invalid_operations()
if opt.type == 'all' or opt.type == 'init':
elif opt.type == 'init_gpu':
test_sync_init(opt.gpu)
if opt.type == 'all' or opt.type == 'default':
elif opt.type == 'default_cpu':
kv = init_kv()
kv = set_optimizer(use_multiprecision=opt.multiprecision)
test_sync_push_pull(opt.nrepeat)
# dont run non compressed tests after this as kvstore compression will be set here
if opt.type == 'all' or opt.type == 'compressed':
kv = init_kv()
elif opt.type == 'compressed_cpu':
kv, threshold = init_kv_compressed(kv)
kv = set_optimizer(use_multiprecision=opt.multiprecision)
test_sync_2bit_compression(threshold, opt.nrepeat)
else:
raise RuntimeError("Unknown test type")