diff --git a/Jenkinsfile b/Jenkinsfile index c78a88e3b768..540cbef170a1 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -968,6 +968,30 @@ try { } } }, + 'dist-kvstore tests GPU': { + node('mxnetlinux-gpu') { + ws('workspace/it-dist-kvstore') { + timeout(time: max_time, unit: 'MINUTES') { + init_git() + unpack_lib('gpu') + docker_run('ubuntu_gpu', 'integrationtest_ubuntu_gpu_dist_kvstore', true) + publish_test_coverage() + } + } + } + }, + 'dist-kvstore tests CPU': { + node('mxnetlinux-cpu') { + ws('workspace/it-dist-kvstore') { + timeout(time: max_time, unit: 'MINUTES') { + init_git() + unpack_lib('cpu') + docker_run('ubuntu_cpu', 'integrationtest_ubuntu_cpu_dist_kvstore', false) + publish_test_coverage() + } + } + } + }, 'Scala: GPU': { node('mxnetlinux-gpu') { ws('workspace/ut-scala-gpu') { @@ -980,19 +1004,6 @@ try { } } } - // Disable until fixed https://github.com/apache/incubator-mxnet/issues/11441 - // 'dist-kvstore tests GPU': { - // node('mxnetlinux-gpu') { - // ws('workspace/it-dist-kvstore') { - // timeout(time: max_time, unit: 'MINUTES') { - // init_git() - // unpack_lib('gpu') - // docker_run('ubuntu_gpu', 'integrationtest_ubuntu_gpu_dist_kvstore', true) - // publish_test_coverage() - // } - // } - // } - //} } stage('Deploy') { diff --git a/ci/docker/runtime_functions.sh b/ci/docker/runtime_functions.sh index 3c196164d57a..0b315019d738 100755 --- a/ci/docker/runtime_functions.sh +++ b/ci/docker/runtime_functions.sh @@ -718,6 +718,22 @@ integrationtest_ubuntu_gpu_cpp_package() { cpp-package/tests/ci_test.sh } +integrationtest_ubuntu_cpu_dist_kvstore() { + set -ex + export PYTHONPATH=./python/ + export MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0 + export MXNET_USE_OPERATOR_TUNING=0 + cd tests/nightly/ + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon_step_cpu + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon_sparse_step_cpu + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=invalid_cpu + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon_type_cpu + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --no-multiprecision + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=compressed_cpu + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=compressed_cpu --no-multiprecision +} + integrationtest_ubuntu_gpu_scala() { set -ex make scalapkg USE_OPENCV=1 USE_BLAS=openblas USE_CUDA=1 USE_CUDA_PATH=/usr/local/cuda USE_CUDNN=1 USE_DIST_KVSTORE=1 SCALA_ON_GPU=1 @@ -729,11 +745,8 @@ integrationtest_ubuntu_gpu_dist_kvstore() { export PYTHONPATH=./python/ export MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0 cd tests/nightly/ - ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py - ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --no-multiprecision ../../tools/launch.py -n 7 --launcher local python dist_device_sync_kvstore.py - ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=invalid - ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=gluon + ../../tools/launch.py -n 7 --launcher local python dist_sync_kvstore.py --type=init_gpu } test_ubuntu_cpu_python2() { diff --git a/python/mxnet/gluon/trainer.py b/python/mxnet/gluon/trainer.py index 09ad96314d5a..b4263410a50b 100644 --- a/python/mxnet/gluon/trainer.py +++ b/python/mxnet/gluon/trainer.py @@ -93,6 +93,7 @@ def __init__(self, params, optimizer, optimizer_params=None, kvstore='device', self._kv_initialized = False self._kvstore = None self._update_on_kvstore = None + self._distributed = None self._params_to_init = [] self._reset_kvstore() @@ -150,6 +151,7 @@ def _reset_kvstore(self): raise RuntimeError("Cannot reset distributed KVStore.") self._kv_initialized = False self._kvstore = None + self._distributed = None self._update_on_kvstore = None self._params_to_init = [param for param in self._params] @@ -190,7 +192,8 @@ def _init_kvstore(self): if kvstore: if self._compression_params: kvstore.set_gradient_compression(self._compression_params) - if 'dist' in kvstore.type: + self._distributed = 'dist' in kvstore.type + if self._distributed: # kv.pull(row_sparse_grad) is not supported for dist kvstore update_on_kvstore = self._contains_sparse_weight or self._contains_sparse_grad if update_on_kvstore: @@ -291,9 +294,9 @@ def _allreduce_grads(self): if param.grad_req != 'null': self._kvstore.push(i, param.list_grad(), priority=-i) - if not self._update_on_kvstore: - self._kvstore.pull(i, param.list_grad(), priority=-i, ignore_sparse=False) + self._kvstore.pull(i, param.list_grad(), priority=-i, + ignore_sparse=self._distributed) def update(self, batch_size, ignore_stale_grad=False): """Makes one step of parameter update. diff --git a/src/kvstore/kvstore_dist_server.h b/src/kvstore/kvstore_dist_server.h index a150ff42f57e..451fb78a6229 100644 --- a/src/kvstore/kvstore_dist_server.h +++ b/src/kvstore/kvstore_dist_server.h @@ -329,8 +329,6 @@ class KVStoreDistServer { if (has_multi_precision_copy(type)) CopyFromTo(recved, updateBuf->temp_array); const NDArray& to_merge = has_multi_precision_copy(type) ? updateBuf->temp_array : recved; // accumulate row_sparse gradients - // TODO(haibin) override + operator for row_sparse NDArray - // instead of calling BinaryComputeRspRsp directly using namespace mshadow; Engine::Get()->PushAsync( [to_merge, updateBuf, out](RunContext ctx, Engine::CallbackOnComplete on_complete) { diff --git a/src/operator/operator_tune-inl.h b/src/operator/operator_tune-inl.h index 2dfc103b1008..127691bccccd 100644 --- a/src/operator/operator_tune-inl.h +++ b/src/operator/operator_tune-inl.h @@ -165,8 +165,14 @@ class OperatorTune : public OperatorTuneByType { // Not especially concerned with a race condition, since this hsould // run when only one thread is active (static init), just don't cache this variable OperatorTuneBase::calculated_.store(true); - OperatorTuneBase::omp_overhead_ns_ = GetOMPLoopOverhead(); std::string config = dmlc::GetEnv("MXNET_USE_OPERATOR_TUNING", std::string()); + StringUtil::trim(&config); + // disabled + if (!config.empty() && ::isdigit(config[0]) && std::atoi(config.c_str()) == 0) { + OperatorTuneBase::omp_overhead_ns_ = INT_MAX; + } else { + OperatorTuneBase::omp_overhead_ns_ = GetOMPLoopOverhead(); + } ParseEnablerConfig(config); } @@ -435,7 +441,7 @@ class OperatorTune : public OperatorTuneByType { } /*! - * \brief Parse MXNET_ENABLE_OPERATOR_TUNING environment variable + * \brief Parse MXNET_USE_OPERATOR_TUNING environment variable * \param config String representation of MXNET_ENABLE_OPERATOR_TUNING environment variable * Values: * 0=disable all diff --git a/tests/nightly/dist_sync_kvstore.py b/tests/nightly/dist_sync_kvstore.py index 8ba1edab3a0d..861b85913ac8 100644 --- a/tests/nightly/dist_sync_kvstore.py +++ b/tests/nightly/dist_sync_kvstore.py @@ -375,7 +375,7 @@ def check_invalid_pull(): check_invalid_gluon_trainer_reset() check_invalid_pull() -def test_gluon_trainer(): +def test_gluon_trainer_type(): def check_trainer_kv_type(stype, grad_stype, update_on_kv): params = mx.gluon.ParameterDict() x = params.get('x', shape=(10,1), lr_mult=1.0, stype=stype, grad_stype=grad_stype) @@ -388,28 +388,67 @@ def check_trainer_kv_type(stype, grad_stype, update_on_kv): check_trainer_kv_type('default', 'default', False) check_trainer_kv_type('default', 'row_sparse', True) check_trainer_kv_type('row_sparse', 'row_sparse', True) - print('worker ' + str(my_rank) + ' passed test_gluon_trainer') - + print('worker ' + str(my_rank) + ' passed test_gluon_trainer_type') + +def test_gluon_trainer_step(): + def check_trainer_step(): + ctx = mx.cpu(0) + shape = (10, 1) + x = mx.gluon.Parameter('x', shape=shape) + x.initialize(ctx=ctx, init='ones') + trainer = mx.gluon.Trainer([x], 'sgd', {'learning_rate': 1.0, 'multi_precision': False}, kvstore=kv) + with mx.autograd.record(): + w = x.data(ctx) + y = (my_rank + 1) * w + y.backward() + trainer.step(1) + expected = 1 - (1 + nworker) * nworker / 2 + assert_almost_equal(x.data(ctx).asnumpy(), np.full(shape, expected)) + check_trainer_step() + print('worker ' + str(my_rank) + ' passed test_gluon_trainer_step') + +def test_gluon_trainer_sparse_step(): + def check_trainer_sparse_step(): + ctx = mx.cpu(0) + shape = (2, 10) + all_rows = mx.nd.arange(0, shape[0], ctx=ctx) + x = mx.gluon.Parameter('x', shape=shape, stype='row_sparse', grad_stype='row_sparse') + x.initialize(ctx=ctx, init='ones') + trainer = mx.gluon.Trainer([x], 'sgd', {'learning_rate': 1.0}, kvstore=kv) + with mx.autograd.record(): + w = x.row_sparse_data(all_rows) + y = (my_rank + 1) * w + y.backward() + trainer.step(1) + expected = 1 - (1 + nworker) * nworker / 2 + assert_almost_equal(x.row_sparse_data(all_rows).asnumpy(), np.full(shape, expected)) + check_trainer_sparse_step() + print('worker ' + str(my_rank) + ' passed test_gluon_trainer_sparse_step') if __name__ == "__main__": parser = argparse.ArgumentParser(description='test distributed kvstore in dist_sync mode') parser.add_argument('--nrepeat', type=int, default=7) - parser.add_argument('--type', type=str, default='all') + parser.add_argument('--type', type=str, default='default_cpu') parser.add_argument('--no-gpu', dest='gpu', action='store_false') parser.add_argument('--no-multiprecision', dest='multiprecision', action='store_false') opt = parser.parse_args() - if opt.type == 'gluon': - test_gluon_trainer() - if opt.type == 'invalid': + if opt.type == 'gluon_type_cpu': + test_gluon_trainer_type() + elif opt.type == 'gluon_step_cpu': + test_gluon_trainer_step() + elif opt.type == 'gluon_sparse_step_cpu': + test_gluon_trainer_sparse_step() + elif opt.type == 'invalid_cpu': test_invalid_operations() - if opt.type == 'all' or opt.type == 'init': + elif opt.type == 'init_gpu': test_sync_init(opt.gpu) - if opt.type == 'all' or opt.type == 'default': + elif opt.type == 'default_cpu': kv = init_kv() kv = set_optimizer(use_multiprecision=opt.multiprecision) test_sync_push_pull(opt.nrepeat) - # dont run non compressed tests after this as kvstore compression will be set here - if opt.type == 'all' or opt.type == 'compressed': - kv = init_kv() + elif opt.type == 'compressed_cpu': kv, threshold = init_kv_compressed(kv) + kv = set_optimizer(use_multiprecision=opt.multiprecision) test_sync_2bit_compression(threshold, opt.nrepeat) + else: + raise RuntimeError("Unknown test type")