From 1d282f19d7ba21dac090c157c8584d0b97c48ce3 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Oct 2017 00:47:23 -0700 Subject: [PATCH 1/6] fix segfault in kvstore_dist for row sparse by: moving copy before zpull, and handling the case of a nullptr --- python/mxnet/kvstore.py | 10 ++- src/kvstore/comm.h | 8 +- src/kvstore/kvstore_dist.h | 50 ++++++++----- tests/nightly/dist_sync_kvstore.py | 114 ++++++++++++++--------------- 4 files changed, 97 insertions(+), 85 deletions(-) diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index 5fd07a5b7748..5e862c4ffc81 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -140,10 +140,12 @@ def push(self, key, value, priority=0): """ Pushes a single or a sequence of key-value pairs into the store. This function returns immediately after adding an operator to the engine. - The actual operation is executed asynchronously after all previous `push` - for the same input key(s) are finished. - There is no synchronization between workers. One can use ``_barrier()`` - to sync all workers. + The actual operation is executed asynchronously. If there are consecutive + pushes to the same key, there is no guarantee on the serialization of pushes. + Simultaneous pushes to the same key may conflict and + overwrite the previous pending push as of now. + There is no synchronization between workers. + One can use ``_barrier()`` to sync all workers. Parameters ---------- diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 5f3ae4e078c4..320eeb932be5 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -68,7 +68,7 @@ class Comm { provided row_ids */ virtual void BroadcastRowSparse(int key, const NDArray& src, - const std::vector>& dst, + const std::vector> dst, const bool use_copy, const int priority) = 0; @@ -129,7 +129,7 @@ class CommCPU : public Comm { if (buf.copy_buf.empty()) { buf.copy_buf.resize(src.size()-1); for (size_t j = 0; j < src.size() - 1; ++j) { - // allocate NDArray basd on storage type + // allocate NDArray based on storage type buf.copy_buf[j] = NDArray( src[0].shape(), pinned_ctx_, false, src[0].dtype()); } @@ -191,7 +191,7 @@ class CommCPU : public Comm { } void BroadcastRowSparse(int key, const NDArray& src, - const std::vector>& dst, + const std::vector> dst, const bool use_copy, const int priority) override { using namespace mshadow; @@ -542,7 +542,7 @@ class CommDevice : public Comm { } void BroadcastRowSparse(int key, const NDArray& src, - const std::vector>& dst, + const std::vector> dst, const bool use_copy, const int priority) override { LOG(FATAL) << "Not implemented yet"; diff --git a/src/kvstore/kvstore_dist.h b/src/kvstore/kvstore_dist.h index e701b0451bcd..4c7ac949365e 100644 --- a/src/kvstore/kvstore_dist.h +++ b/src/kvstore/kvstore_dist.h @@ -237,7 +237,8 @@ class KVStoreDist : public KVStoreLocal { // TODO(haibin) refactor this for loop for (size_t i = 0; i < num_vals; i++) { auto &row_id = target_val_rowids[i].second; - NDArray indices = row_id.Copy(pinned_ctx_); + NDArray indices(row_id.shape(), pinned_ctx_, false, mshadow::kInt64); + CopyFromTo(row_id, &indices, 0); Unique(&indices, priority); target_val_rowids[i].second = indices; num_rows += indices.shape().Size(); @@ -271,7 +272,11 @@ class KVStoreDist : public KVStoreLocal { auto& send_buf = comm_buf_[key]; const auto storage_type = merged.storage_type(); if (merged.ctx().dev_mask() == cpu::kDevMask) { - send_buf = merged; // avoid memory copy + // A push right after a previous push may conflict and overwrite the previous push. + // This shouldn't affect training of networks though because training involves + // a sequence of push, pull, then push. This imposes ordering that the + // second push happens after the first pull, and the pull happens after first push. + send_buf = merged; // avoid memory copy } else { if (send_buf.is_none()) { if (storage_type == kDefaultStorage) { @@ -317,7 +322,7 @@ class KVStoreDist : public KVStoreLocal { } // pull row sparse weight into `recv_buf` based on indices given by `indices` - void PullRowSparse_(const int key, NDArray *recv_buf, const NDArray& indices, int priority) { + void PullRowSparse_(const int key, NDArray *recv_buf, const NDArray indices, int priority) { using namespace rowsparse; auto pull_from_servers = [this, key, recv_buf, indices] (RunContext rctx, Engine::CallbackOnComplete cb) { @@ -340,11 +345,13 @@ class KVStoreDist : public KVStoreLocal { << pskv.keys << " size: " << size; } auto vals = new ps::SArray(data, size, false); - CHECK_NOTNULL(ps_worker_)->ZPull(pskv.keys, vals, &pskv.lens, kRowSparsePushPull, - [vals, cb]() { delete vals; cb(); }); - // copy indices to recv_buf + // copy indices to recv_buf. this needs to be done before ZPull + // because after pull is done, the callback function returns and locks are released. + // at this point, later functions may access the indices variable while copy happens mshadow::Copy(recv_buf->aux_data(kIdx).FlatTo1D(), indices_data.FlatTo1D()); + CHECK_NOTNULL(ps_worker_)->ZPull(pskv.keys, vals, &pskv.lens, kRowSparsePushPull, + [vals, cb]() { delete vals; cb(); }); }; CHECK_NOTNULL(Engine::Get())->PushAsync( pull_from_servers, @@ -485,24 +492,27 @@ class KVStoreDist : public KVStoreLocal { int64_t start_row = 0; // parition it to all servers for (int i = 0; i < num_servers; ++i) { - // calculate partition ranges - int64_t part_num_rows = - llround(static_cast(total_num_rows) / num_servers * (i + 1)) - - llround(static_cast(total_num_rows) / num_servers * i); - auto end_row = start_row + part_num_rows; - auto lb = std::lower_bound(offsets, offsets + num_rows, start_row); - auto ub = std::upper_bound(offsets, offsets + num_rows, end_row - 1); ps::Key master_key = krs[i].begin() + key; pskv.keys.push_back(master_key); pskv.lens.push_back(0); - for (auto offset = lb; offset < ub; offset++) { - ps::Key ps_key = krs[i].begin() + key + (*offset - start_row); - CHECK_LT(ps_key, krs[i].end()); - pskv.keys.push_back(ps_key); - pskv.lens.push_back(unit_len); - pskv.size += unit_len; + if (offsets) { + // calculate partition ranges + int64_t part_num_rows = + llround(static_cast(total_num_rows) / num_servers * (i + 1)) - + llround(static_cast(total_num_rows) / num_servers * i); + auto end_row = start_row + part_num_rows; + auto lb = std::lower_bound(offsets, offsets + num_rows, start_row); + auto ub = std::upper_bound(offsets, offsets + num_rows, end_row - 1); + + for (auto offset = lb; offset < ub; offset++) { + ps::Key ps_key = krs[i].begin() + key + (*offset - start_row); + CHECK_LT(ps_key, krs[i].end()); + pskv.keys.push_back(ps_key); + pskv.lens.push_back(unit_len); + pskv.size += unit_len; + } + start_row = end_row; } - start_row = end_row; } CHECK_EQ(static_cast(pskv.size), size); } else { diff --git a/tests/nightly/dist_sync_kvstore.py b/tests/nightly/dist_sync_kvstore.py index af1ecfc5036f..b29b8725065f 100644 --- a/tests/nightly/dist_sync_kvstore.py +++ b/tests/nightly/dist_sync_kvstore.py @@ -57,18 +57,18 @@ def test_sync_push_pull(): kv, my_rank, nworker = init_kv() def check_default_keys(kv, my_rank, nworker): nrepeat = 3 + # checks pull after push in loop, because behavior during + # consecutive pushes doesn't offer any guarantees for i in range(nrepeat): kv.push('3', mx.nd.ones(shape)*(my_rank+1)) kv.push('99', mx.nd.ones(big_shape)*(my_rank+1)) - - num = (nworker + 1) * nworker * rate / 2 * nrepeat + 1 - val = mx.nd.zeros(shape) - kv.pull('3', out=val) - check_diff_to_scalar(val, num) - - val2 = mx.nd.zeros(big_shape) - kv.pull('99', out=val2) - check_diff_to_scalar(val2, num) + num = (nworker + 1) * nworker * rate / 2 * (i + 1) + 1 + val = mx.nd.zeros(shape) + kv.pull('3', out=val) + check_diff_to_scalar(val, num) + val2 = mx.nd.zeros(big_shape) + kv.pull('99', out=val2) + check_diff_to_scalar(val2, num) def check_row_sparse_keys(kv, my_rank, nworker): nrepeat = 3 @@ -79,23 +79,23 @@ def check_row_sparse_keys(kv, my_rank, nworker): # push for i in range(nrepeat): kv.push('9', v.tostype('row_sparse')) - # select a random subset of rows this worker is interested in - num_rows = shape[0] - row_ids_np = np.random.randint(num_rows, size=num_rows) - row_ids = mx.nd.array(row_ids_np, dtype='int64') - # perform pull - val = mx.nd.zeros(shape, stype='row_sparse') - kv.row_sparse_pull('9', out=val, row_ids=row_ids) - # prepare updated values - updated_val = mx.nd.ones(shape) - for rank in range(nworker): - row = rank % shape[0] - updated_val[row] += (rank + 1) * rate * nrepeat - # verify subset of updated values - expected = mx.nd.zeros(shape) - for row in row_ids_np: - expected[row] = updated_val[row] - check_diff_to_scalar(val, expected) + # select a random subset of rows this worker is interested in + num_rows = shape[0] + row_ids_np = np.random.randint(num_rows, size=num_rows) + row_ids = mx.nd.array(row_ids_np, dtype='int64') + # perform pull + val = mx.nd.zeros(shape, stype='row_sparse') + kv.row_sparse_pull('9', out=val, row_ids=row_ids) + # prepare updated values + updated_val = mx.nd.ones(shape) + for rank in range(nworker): + row = rank % shape[0] + updated_val[row] += (rank + 1) * rate * (i+1) + # verify subset of updated values + expected = mx.nd.zeros(shape) + for row in row_ids_np: + expected[row] = updated_val[row] + check_diff_to_scalar(val, expected) def check_row_sparse_keys_with_zeros(kv, my_rank, nworker): nrepeat = 3 @@ -107,17 +107,17 @@ def check_row_sparse_keys_with_zeros(kv, my_rank, nworker): kv.push('11', v.tostype('row_sparse')) kv.push('100', big_v.tostype('row_sparse')) - # pull a subset of rows this worker is interested in - all_row_ids = np.arange(shape[0]) - val = mx.nd.ones(shape).tostype('row_sparse') - big_val = mx.nd.ones(big_shape).tostype('row_sparse') - kv.row_sparse_pull('11', out=val, row_ids=mx.nd.array(all_row_ids, dtype='int64')) - big_num_rows = shape[0] - big_all_row_ids = np.arange(big_shape[0]) - kv.row_sparse_pull('100', out=big_val, row_ids=mx.nd.array(big_all_row_ids, dtype='int64')) - # verify results - check_diff_to_scalar(val, mx.nd.ones(shape)) - check_diff_to_scalar(big_val, mx.nd.ones(big_shape)) + # pull a subset of rows this worker is interested in + all_row_ids = np.arange(shape[0]) + val = mx.nd.ones(shape).tostype('row_sparse') + big_val = mx.nd.ones(big_shape).tostype('row_sparse') + kv.row_sparse_pull('11', out=val, row_ids=mx.nd.array(all_row_ids, dtype='int64')) + big_num_rows = shape[0] + big_all_row_ids = np.arange(big_shape[0]) + kv.row_sparse_pull('100', out=big_val, row_ids=mx.nd.array(big_all_row_ids, dtype='int64')) + # verify results + check_diff_to_scalar(val, mx.nd.ones(shape)) + check_diff_to_scalar(big_val, mx.nd.ones(big_shape)) def check_big_row_sparse_keys(kv, my_rank, nworker): mx.random.seed(123) @@ -145,26 +145,26 @@ def check_big_row_sparse_keys(kv, my_rank, nworker): for i in range(nrepeat): kv.push('100', v.tostype('row_sparse')) - # select a random subset of rows this worker is interested in - mx.random.seed(my_rank) - rnd.seed(my_rank) - num_rows = big_shape[0] - row_ids_np = np.random.randint(num_rows, size=num_rows) - row_ids = mx.nd.array(row_ids_np, dtype='int64') - # perform pull - val = mx.nd.zeros(big_shape, stype='row_sparse') - kv.row_sparse_pull('100', out=val, row_ids=row_ids) - # prepare expected result - updated_val = mx.nd.ones(big_shape) - # apply updates from each worker - for rank in range(nworker): - for row in update_rows[rank]: - updated_val[row] += (rank + 1) * rate * nrepeat - - expected = mx.nd.zeros(big_shape) - for row in row_ids_np: - expected[row] = updated_val[row] - check_diff_to_scalar(val, expected, rank=my_rank) + # select a random subset of rows this worker is interested in + mx.random.seed(my_rank) + rnd.seed(my_rank) + num_rows = big_shape[0] + row_ids_np = np.random.randint(num_rows, size=num_rows) + row_ids = mx.nd.array(row_ids_np, dtype='int64') + # perform pull + val = mx.nd.zeros(big_shape, stype='row_sparse') + kv.row_sparse_pull('100', out=val, row_ids=row_ids) + # prepare expected result + updated_val = mx.nd.ones(big_shape) + # apply updates from each worker + for rank in range(nworker): + for row in update_rows[rank]: + updated_val[row] += (rank + 1) * rate * (i+1) + + expected = mx.nd.zeros(big_shape) + for row in row_ids_np: + expected[row] = updated_val[row] + check_diff_to_scalar(val, expected, rank=my_rank) check_default_keys(kv, my_rank, nworker) check_row_sparse_keys(kv, my_rank, nworker) From 2faa10abd838ae823f57ee6c907420df6207b313 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Oct 2017 13:12:21 -0700 Subject: [PATCH 2/6] fix indent, and bring back references --- src/kvstore/comm.h | 6 +++--- src/kvstore/kvstore_dist.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 320eeb932be5..deed1a15c981 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -68,7 +68,7 @@ class Comm { provided row_ids */ virtual void BroadcastRowSparse(int key, const NDArray& src, - const std::vector> dst, + const std::vector>& dst, const bool use_copy, const int priority) = 0; @@ -191,7 +191,7 @@ class CommCPU : public Comm { } void BroadcastRowSparse(int key, const NDArray& src, - const std::vector> dst, + const std::vector>& dst, const bool use_copy, const int priority) override { using namespace mshadow; @@ -542,7 +542,7 @@ class CommDevice : public Comm { } void BroadcastRowSparse(int key, const NDArray& src, - const std::vector> dst, + const std::vector>& dst, const bool use_copy, const int priority) override { LOG(FATAL) << "Not implemented yet"; diff --git a/src/kvstore/kvstore_dist.h b/src/kvstore/kvstore_dist.h index 4c7ac949365e..4afae9b857d9 100644 --- a/src/kvstore/kvstore_dist.h +++ b/src/kvstore/kvstore_dist.h @@ -276,7 +276,7 @@ class KVStoreDist : public KVStoreLocal { // This shouldn't affect training of networks though because training involves // a sequence of push, pull, then push. This imposes ordering that the // second push happens after the first pull, and the pull happens after first push. - send_buf = merged; // avoid memory copy + send_buf = merged; // avoid memory copy } else { if (send_buf.is_none()) { if (storage_type == kDefaultStorage) { @@ -322,7 +322,7 @@ class KVStoreDist : public KVStoreLocal { } // pull row sparse weight into `recv_buf` based on indices given by `indices` - void PullRowSparse_(const int key, NDArray *recv_buf, const NDArray indices, int priority) { + void PullRowSparse_(const int key, NDArray *recv_buf, const NDArray& indices, int priority) { using namespace rowsparse; auto pull_from_servers = [this, key, recv_buf, indices] (RunContext rctx, Engine::CallbackOnComplete cb) { From e1aa3a3d539cb6528004d5e94d121dc2a3713f96 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Oct 2017 13:45:57 -0700 Subject: [PATCH 3/6] Update kvstore.py Update docs --- python/mxnet/kvstore.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index 5e862c4ffc81..53f7eb1696fb 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -142,8 +142,8 @@ def push(self, key, value, priority=0): This function returns immediately after adding an operator to the engine. The actual operation is executed asynchronously. If there are consecutive pushes to the same key, there is no guarantee on the serialization of pushes. - Simultaneous pushes to the same key may conflict and - overwrite the previous pending push as of now. + The execution of a push does not guarantee that all previous pushes are + finished. There is no synchronization between workers. One can use ``_barrier()`` to sync all workers. @@ -223,12 +223,14 @@ def pull(self, key, out=None, priority=0): Subsequent attempts to read from the `out` variable will be blocked until the pull operation completes. - `pull` is executed asynchronously after all previous `push` and `pull` calls - for the same input key(s) are finished. + `pull` is executed asynchronously after all previous `pull` calls and only + the last `push` call for the same input key(s) are finished. - The returned values are gauranteed to be the latest values in the store. + The returned values are guaranteed to be the latest values in the store. - For `RowSparseNDArray` values, please use ``row_sparse_pull`` instead. + For `RowSparseNDArray` values, this call is ignored, + please use ``row_sparse_pull`` instead. + Parameters ---------- @@ -289,7 +291,8 @@ def row_sparse_pull(self, key, out=None, priority=0, row_ids=None): from the store with specified row_ids. `row_sparse_pull` is executed asynchronously after all previous - `push`/`pull`/`row_sparse_pull` calls for the same input key(s) are finished. + `pull`/`row_sparse_pull` calls and the last `push` call for the + same input key(s) are finished. The returned values are guaranteed to be the latest values in the store. From 67e1caccaafbc04ab73edfb96e43d9778ff02bc3 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Oct 2017 13:47:43 -0700 Subject: [PATCH 4/6] Update kvstore_dist.h Update comment --- src/kvstore/kvstore_dist.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kvstore/kvstore_dist.h b/src/kvstore/kvstore_dist.h index 4afae9b857d9..bb561d9ce42b 100644 --- a/src/kvstore/kvstore_dist.h +++ b/src/kvstore/kvstore_dist.h @@ -272,7 +272,7 @@ class KVStoreDist : public KVStoreLocal { auto& send_buf = comm_buf_[key]; const auto storage_type = merged.storage_type(); if (merged.ctx().dev_mask() == cpu::kDevMask) { - // A push right after a previous push may conflict and overwrite the previous push. + // Start of a push doesn't guarantee that the previous pushes are completed. // This shouldn't affect training of networks though because training involves // a sequence of push, pull, then push. This imposes ordering that the // second push happens after the first pull, and the pull happens after first push. From 14dfe40742bf5f965353678fabaa6eea36eb797f Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Oct 2017 14:26:10 -0700 Subject: [PATCH 5/6] Update kvstore.py lint issues --- python/mxnet/kvstore.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index 53f7eb1696fb..adfef9a94964 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -230,7 +230,6 @@ def pull(self, key, out=None, priority=0): For `RowSparseNDArray` values, this call is ignored, please use ``row_sparse_pull`` instead. - Parameters ---------- From 39eb018424a246d8904792803f6973d118cb6924 Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Oct 2017 14:28:40 -0700 Subject: [PATCH 6/6] warning updated --- src/kvstore/kvstore_local.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/kvstore/kvstore_local.h b/src/kvstore/kvstore_local.h index e83dca60f02b..15a4c6055bfe 100644 --- a/src/kvstore/kvstore_local.h +++ b/src/kvstore/kvstore_local.h @@ -267,6 +267,7 @@ class KVStoreLocal : public KVStore { // invalid, print warning messages once if (this->warnings_printed_.find(key) == this->warnings_printed_.end()) { LOG(INFO) << "Warning: non-default weights detected during kvstore pull. " + << "This call has been ignored. " << "Please make sure to use row_sparse_pull with row_ids."; this->warnings_printed_.insert(key); }