diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index 5fd07a5b7748..adfef9a94964 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -140,10 +140,12 @@ def push(self, key, value, priority=0): """ Pushes a single or a sequence of key-value pairs into the store. This function returns immediately after adding an operator to the engine. - The actual operation is executed asynchronously after all previous `push` - for the same input key(s) are finished. - There is no synchronization between workers. One can use ``_barrier()`` - to sync all workers. + The actual operation is executed asynchronously. If there are consecutive + pushes to the same key, there is no guarantee on the serialization of pushes. + The execution of a push does not guarantee that all previous pushes are + finished. + There is no synchronization between workers. + One can use ``_barrier()`` to sync all workers. Parameters ---------- @@ -221,12 +223,13 @@ def pull(self, key, out=None, priority=0): Subsequent attempts to read from the `out` variable will be blocked until the pull operation completes. - `pull` is executed asynchronously after all previous `push` and `pull` calls - for the same input key(s) are finished. + `pull` is executed asynchronously after all previous `pull` calls and only + the last `push` call for the same input key(s) are finished. - The returned values are gauranteed to be the latest values in the store. + The returned values are guaranteed to be the latest values in the store. - For `RowSparseNDArray` values, please use ``row_sparse_pull`` instead. + For `RowSparseNDArray` values, this call is ignored, + please use ``row_sparse_pull`` instead. Parameters ---------- @@ -287,7 +290,8 @@ def row_sparse_pull(self, key, out=None, priority=0, row_ids=None): from the store with specified row_ids. `row_sparse_pull` is executed asynchronously after all previous - `push`/`pull`/`row_sparse_pull` calls for the same input key(s) are finished. + `pull`/`row_sparse_pull` calls and the last `push` call for the + same input key(s) are finished. The returned values are guaranteed to be the latest values in the store. diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 5f3ae4e078c4..deed1a15c981 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -129,7 +129,7 @@ class CommCPU : public Comm { if (buf.copy_buf.empty()) { buf.copy_buf.resize(src.size()-1); for (size_t j = 0; j < src.size() - 1; ++j) { - // allocate NDArray basd on storage type + // allocate NDArray based on storage type buf.copy_buf[j] = NDArray( src[0].shape(), pinned_ctx_, false, src[0].dtype()); } diff --git a/src/kvstore/kvstore_dist.h b/src/kvstore/kvstore_dist.h index e701b0451bcd..bb561d9ce42b 100644 --- a/src/kvstore/kvstore_dist.h +++ b/src/kvstore/kvstore_dist.h @@ -237,7 +237,8 @@ class KVStoreDist : public KVStoreLocal { // TODO(haibin) refactor this for loop for (size_t i = 0; i < num_vals; i++) { auto &row_id = target_val_rowids[i].second; - NDArray indices = row_id.Copy(pinned_ctx_); + NDArray indices(row_id.shape(), pinned_ctx_, false, mshadow::kInt64); + CopyFromTo(row_id, &indices, 0); Unique(&indices, priority); target_val_rowids[i].second = indices; num_rows += indices.shape().Size(); @@ -271,6 +272,10 @@ class KVStoreDist : public KVStoreLocal { auto& send_buf = comm_buf_[key]; const auto storage_type = merged.storage_type(); if (merged.ctx().dev_mask() == cpu::kDevMask) { + // Start of a push doesn't guarantee that the previous pushes are completed. + // This shouldn't affect training of networks though because training involves + // a sequence of push, pull, then push. This imposes ordering that the + // second push happens after the first pull, and the pull happens after first push. send_buf = merged; // avoid memory copy } else { if (send_buf.is_none()) { @@ -340,11 +345,13 @@ class KVStoreDist : public KVStoreLocal { << pskv.keys << " size: " << size; } auto vals = new ps::SArray(data, size, false); - CHECK_NOTNULL(ps_worker_)->ZPull(pskv.keys, vals, &pskv.lens, kRowSparsePushPull, - [vals, cb]() { delete vals; cb(); }); - // copy indices to recv_buf + // copy indices to recv_buf. this needs to be done before ZPull + // because after pull is done, the callback function returns and locks are released. + // at this point, later functions may access the indices variable while copy happens mshadow::Copy(recv_buf->aux_data(kIdx).FlatTo1D(), indices_data.FlatTo1D()); + CHECK_NOTNULL(ps_worker_)->ZPull(pskv.keys, vals, &pskv.lens, kRowSparsePushPull, + [vals, cb]() { delete vals; cb(); }); }; CHECK_NOTNULL(Engine::Get())->PushAsync( pull_from_servers, @@ -485,24 +492,27 @@ class KVStoreDist : public KVStoreLocal { int64_t start_row = 0; // parition it to all servers for (int i = 0; i < num_servers; ++i) { - // calculate partition ranges - int64_t part_num_rows = - llround(static_cast(total_num_rows) / num_servers * (i + 1)) - - llround(static_cast(total_num_rows) / num_servers * i); - auto end_row = start_row + part_num_rows; - auto lb = std::lower_bound(offsets, offsets + num_rows, start_row); - auto ub = std::upper_bound(offsets, offsets + num_rows, end_row - 1); ps::Key master_key = krs[i].begin() + key; pskv.keys.push_back(master_key); pskv.lens.push_back(0); - for (auto offset = lb; offset < ub; offset++) { - ps::Key ps_key = krs[i].begin() + key + (*offset - start_row); - CHECK_LT(ps_key, krs[i].end()); - pskv.keys.push_back(ps_key); - pskv.lens.push_back(unit_len); - pskv.size += unit_len; + if (offsets) { + // calculate partition ranges + int64_t part_num_rows = + llround(static_cast(total_num_rows) / num_servers * (i + 1)) - + llround(static_cast(total_num_rows) / num_servers * i); + auto end_row = start_row + part_num_rows; + auto lb = std::lower_bound(offsets, offsets + num_rows, start_row); + auto ub = std::upper_bound(offsets, offsets + num_rows, end_row - 1); + + for (auto offset = lb; offset < ub; offset++) { + ps::Key ps_key = krs[i].begin() + key + (*offset - start_row); + CHECK_LT(ps_key, krs[i].end()); + pskv.keys.push_back(ps_key); + pskv.lens.push_back(unit_len); + pskv.size += unit_len; + } + start_row = end_row; } - start_row = end_row; } CHECK_EQ(static_cast(pskv.size), size); } else { diff --git a/src/kvstore/kvstore_local.h b/src/kvstore/kvstore_local.h index e83dca60f02b..15a4c6055bfe 100644 --- a/src/kvstore/kvstore_local.h +++ b/src/kvstore/kvstore_local.h @@ -267,6 +267,7 @@ class KVStoreLocal : public KVStore { // invalid, print warning messages once if (this->warnings_printed_.find(key) == this->warnings_printed_.end()) { LOG(INFO) << "Warning: non-default weights detected during kvstore pull. " + << "This call has been ignored. " << "Please make sure to use row_sparse_pull with row_ids."; this->warnings_printed_.insert(key); } diff --git a/tests/nightly/dist_sync_kvstore.py b/tests/nightly/dist_sync_kvstore.py index af1ecfc5036f..b29b8725065f 100644 --- a/tests/nightly/dist_sync_kvstore.py +++ b/tests/nightly/dist_sync_kvstore.py @@ -57,18 +57,18 @@ def test_sync_push_pull(): kv, my_rank, nworker = init_kv() def check_default_keys(kv, my_rank, nworker): nrepeat = 3 + # checks pull after push in loop, because behavior during + # consecutive pushes doesn't offer any guarantees for i in range(nrepeat): kv.push('3', mx.nd.ones(shape)*(my_rank+1)) kv.push('99', mx.nd.ones(big_shape)*(my_rank+1)) - - num = (nworker + 1) * nworker * rate / 2 * nrepeat + 1 - val = mx.nd.zeros(shape) - kv.pull('3', out=val) - check_diff_to_scalar(val, num) - - val2 = mx.nd.zeros(big_shape) - kv.pull('99', out=val2) - check_diff_to_scalar(val2, num) + num = (nworker + 1) * nworker * rate / 2 * (i + 1) + 1 + val = mx.nd.zeros(shape) + kv.pull('3', out=val) + check_diff_to_scalar(val, num) + val2 = mx.nd.zeros(big_shape) + kv.pull('99', out=val2) + check_diff_to_scalar(val2, num) def check_row_sparse_keys(kv, my_rank, nworker): nrepeat = 3 @@ -79,23 +79,23 @@ def check_row_sparse_keys(kv, my_rank, nworker): # push for i in range(nrepeat): kv.push('9', v.tostype('row_sparse')) - # select a random subset of rows this worker is interested in - num_rows = shape[0] - row_ids_np = np.random.randint(num_rows, size=num_rows) - row_ids = mx.nd.array(row_ids_np, dtype='int64') - # perform pull - val = mx.nd.zeros(shape, stype='row_sparse') - kv.row_sparse_pull('9', out=val, row_ids=row_ids) - # prepare updated values - updated_val = mx.nd.ones(shape) - for rank in range(nworker): - row = rank % shape[0] - updated_val[row] += (rank + 1) * rate * nrepeat - # verify subset of updated values - expected = mx.nd.zeros(shape) - for row in row_ids_np: - expected[row] = updated_val[row] - check_diff_to_scalar(val, expected) + # select a random subset of rows this worker is interested in + num_rows = shape[0] + row_ids_np = np.random.randint(num_rows, size=num_rows) + row_ids = mx.nd.array(row_ids_np, dtype='int64') + # perform pull + val = mx.nd.zeros(shape, stype='row_sparse') + kv.row_sparse_pull('9', out=val, row_ids=row_ids) + # prepare updated values + updated_val = mx.nd.ones(shape) + for rank in range(nworker): + row = rank % shape[0] + updated_val[row] += (rank + 1) * rate * (i+1) + # verify subset of updated values + expected = mx.nd.zeros(shape) + for row in row_ids_np: + expected[row] = updated_val[row] + check_diff_to_scalar(val, expected) def check_row_sparse_keys_with_zeros(kv, my_rank, nworker): nrepeat = 3 @@ -107,17 +107,17 @@ def check_row_sparse_keys_with_zeros(kv, my_rank, nworker): kv.push('11', v.tostype('row_sparse')) kv.push('100', big_v.tostype('row_sparse')) - # pull a subset of rows this worker is interested in - all_row_ids = np.arange(shape[0]) - val = mx.nd.ones(shape).tostype('row_sparse') - big_val = mx.nd.ones(big_shape).tostype('row_sparse') - kv.row_sparse_pull('11', out=val, row_ids=mx.nd.array(all_row_ids, dtype='int64')) - big_num_rows = shape[0] - big_all_row_ids = np.arange(big_shape[0]) - kv.row_sparse_pull('100', out=big_val, row_ids=mx.nd.array(big_all_row_ids, dtype='int64')) - # verify results - check_diff_to_scalar(val, mx.nd.ones(shape)) - check_diff_to_scalar(big_val, mx.nd.ones(big_shape)) + # pull a subset of rows this worker is interested in + all_row_ids = np.arange(shape[0]) + val = mx.nd.ones(shape).tostype('row_sparse') + big_val = mx.nd.ones(big_shape).tostype('row_sparse') + kv.row_sparse_pull('11', out=val, row_ids=mx.nd.array(all_row_ids, dtype='int64')) + big_num_rows = shape[0] + big_all_row_ids = np.arange(big_shape[0]) + kv.row_sparse_pull('100', out=big_val, row_ids=mx.nd.array(big_all_row_ids, dtype='int64')) + # verify results + check_diff_to_scalar(val, mx.nd.ones(shape)) + check_diff_to_scalar(big_val, mx.nd.ones(big_shape)) def check_big_row_sparse_keys(kv, my_rank, nworker): mx.random.seed(123) @@ -145,26 +145,26 @@ def check_big_row_sparse_keys(kv, my_rank, nworker): for i in range(nrepeat): kv.push('100', v.tostype('row_sparse')) - # select a random subset of rows this worker is interested in - mx.random.seed(my_rank) - rnd.seed(my_rank) - num_rows = big_shape[0] - row_ids_np = np.random.randint(num_rows, size=num_rows) - row_ids = mx.nd.array(row_ids_np, dtype='int64') - # perform pull - val = mx.nd.zeros(big_shape, stype='row_sparse') - kv.row_sparse_pull('100', out=val, row_ids=row_ids) - # prepare expected result - updated_val = mx.nd.ones(big_shape) - # apply updates from each worker - for rank in range(nworker): - for row in update_rows[rank]: - updated_val[row] += (rank + 1) * rate * nrepeat - - expected = mx.nd.zeros(big_shape) - for row in row_ids_np: - expected[row] = updated_val[row] - check_diff_to_scalar(val, expected, rank=my_rank) + # select a random subset of rows this worker is interested in + mx.random.seed(my_rank) + rnd.seed(my_rank) + num_rows = big_shape[0] + row_ids_np = np.random.randint(num_rows, size=num_rows) + row_ids = mx.nd.array(row_ids_np, dtype='int64') + # perform pull + val = mx.nd.zeros(big_shape, stype='row_sparse') + kv.row_sparse_pull('100', out=val, row_ids=row_ids) + # prepare expected result + updated_val = mx.nd.ones(big_shape) + # apply updates from each worker + for rank in range(nworker): + for row in update_rows[rank]: + updated_val[row] += (rank + 1) * rate * (i+1) + + expected = mx.nd.zeros(big_shape) + for row in row_ids_np: + expected[row] = updated_val[row] + check_diff_to_scalar(val, expected, rank=my_rank) check_default_keys(kv, my_rank, nworker) check_row_sparse_keys(kv, my_rank, nworker)