Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions python/mxnet/kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,12 @@ def push(self, key, value, priority=0):
""" Pushes a single or a sequence of key-value pairs into the store.

This function returns immediately after adding an operator to the engine.
The actual operation is executed asynchronously after all previous `push`
for the same input key(s) are finished.
There is no synchronization between workers. One can use ``_barrier()``
to sync all workers.
The actual operation is executed asynchronously. If there are consecutive
pushes to the same key, there is no guarantee on the serialization of pushes.
The execution of a push does not guarantee that all previous pushes are
finished.
There is no synchronization between workers.
One can use ``_barrier()`` to sync all workers.

Parameters
----------
Expand Down Expand Up @@ -221,12 +223,13 @@ def pull(self, key, out=None, priority=0):
Subsequent attempts to read from the `out` variable will be blocked until the
pull operation completes.

`pull` is executed asynchronously after all previous `push` and `pull` calls
for the same input key(s) are finished.
`pull` is executed asynchronously after all previous `pull` calls and only
the last `push` call for the same input key(s) are finished.

The returned values are gauranteed to be the latest values in the store.
The returned values are guaranteed to be the latest values in the store.

For `RowSparseNDArray` values, please use ``row_sparse_pull`` instead.
For `RowSparseNDArray` values, this call is ignored,
please use ``row_sparse_pull`` instead.

Parameters
----------
Expand Down Expand Up @@ -287,7 +290,8 @@ def row_sparse_pull(self, key, out=None, priority=0, row_ids=None):
from the store with specified row_ids.

`row_sparse_pull` is executed asynchronously after all previous
`push`/`pull`/`row_sparse_pull` calls for the same input key(s) are finished.
`pull`/`row_sparse_pull` calls and the last `push` call for the
same input key(s) are finished.

The returned values are guaranteed to be the latest values in the store.

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class CommCPU : public Comm {
if (buf.copy_buf.empty()) {
buf.copy_buf.resize(src.size()-1);
for (size_t j = 0; j < src.size() - 1; ++j) {
// allocate NDArray basd on storage type
// allocate NDArray based on storage type
buf.copy_buf[j] = NDArray(
src[0].shape(), pinned_ctx_, false, src[0].dtype());
}
Expand Down
46 changes: 28 additions & 18 deletions src/kvstore/kvstore_dist.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ class KVStoreDist : public KVStoreLocal {
// TODO(haibin) refactor this for loop
for (size_t i = 0; i < num_vals; i++) {
auto &row_id = target_val_rowids[i].second;
NDArray indices = row_id.Copy(pinned_ctx_);
NDArray indices(row_id.shape(), pinned_ctx_, false, mshadow::kInt64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sure rsp_pull(val, rowid) can accept rowid with dtype other than int64.

CopyFromTo(row_id, &indices, 0);
Unique(&indices, priority);
target_val_rowids[i].second = indices;
num_rows += indices.shape().Size();
Expand Down Expand Up @@ -271,6 +272,10 @@ class KVStoreDist : public KVStoreLocal {
auto& send_buf = comm_buf_[key];
const auto storage_type = merged.storage_type();
if (merged.ctx().dev_mask() == cpu::kDevMask) {
// Start of a push doesn't guarantee that the previous pushes are completed.
// This shouldn't affect training of networks though because training involves
// a sequence of push, pull, then push. This imposes ordering that the
// second push happens after the first pull, and the pull happens after first push.
send_buf = merged; // avoid memory copy
} else {
if (send_buf.is_none()) {
Expand Down Expand Up @@ -340,11 +345,13 @@ class KVStoreDist : public KVStoreLocal {
<< pskv.keys << " size: " << size;
}
auto vals = new ps::SArray<real_t>(data, size, false);
CHECK_NOTNULL(ps_worker_)->ZPull(pskv.keys, vals, &pskv.lens, kRowSparsePushPull,
[vals, cb]() { delete vals; cb(); });
// copy indices to recv_buf
// copy indices to recv_buf. this needs to be done before ZPull
// because after pull is done, the callback function returns and locks are released.
// at this point, later functions may access the indices variable while copy happens
mshadow::Copy(recv_buf->aux_data(kIdx).FlatTo1D<cpu, int64_t>(),
indices_data.FlatTo1D<cpu, int64_t>());
CHECK_NOTNULL(ps_worker_)->ZPull(pskv.keys, vals, &pskv.lens, kRowSparsePushPull,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make sure Copy(indices) is done before ZPull() is completed, so that we don't broadcast with garbage indices

[vals, cb]() { delete vals; cb(); });
};
CHECK_NOTNULL(Engine::Get())->PushAsync(
pull_from_servers,
Expand Down Expand Up @@ -485,24 +492,27 @@ class KVStoreDist : public KVStoreLocal {
int64_t start_row = 0;
// parition it to all servers
for (int i = 0; i < num_servers; ++i) {
// calculate partition ranges
int64_t part_num_rows =
llround(static_cast<double>(total_num_rows) / num_servers * (i + 1)) -
llround(static_cast<double>(total_num_rows) / num_servers * i);
auto end_row = start_row + part_num_rows;
auto lb = std::lower_bound(offsets, offsets + num_rows, start_row);
auto ub = std::upper_bound(offsets, offsets + num_rows, end_row - 1);
ps::Key master_key = krs[i].begin() + key;
pskv.keys.push_back(master_key);
pskv.lens.push_back(0);
for (auto offset = lb; offset < ub; offset++) {
ps::Key ps_key = krs[i].begin() + key + (*offset - start_row);
CHECK_LT(ps_key, krs[i].end());
pskv.keys.push_back(ps_key);
pskv.lens.push_back(unit_len);
pskv.size += unit_len;
if (offsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset might be nullptr when gradients are complete zeros

// calculate partition ranges
int64_t part_num_rows =
llround(static_cast<double>(total_num_rows) / num_servers * (i + 1)) -
llround(static_cast<double>(total_num_rows) / num_servers * i);
auto end_row = start_row + part_num_rows;
auto lb = std::lower_bound(offsets, offsets + num_rows, start_row);
auto ub = std::upper_bound(offsets, offsets + num_rows, end_row - 1);

for (auto offset = lb; offset < ub; offset++) {
ps::Key ps_key = krs[i].begin() + key + (*offset - start_row);
CHECK_LT(ps_key, krs[i].end());
pskv.keys.push_back(ps_key);
pskv.lens.push_back(unit_len);
pskv.size += unit_len;
}
start_row = end_row;
}
start_row = end_row;
}
CHECK_EQ(static_cast<size_t>(pskv.size), size);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/kvstore_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class KVStoreLocal : public KVStore {
// invalid, print warning messages once
if (this->warnings_printed_.find(key) == this->warnings_printed_.end()) {
LOG(INFO) << "Warning: non-default weights detected during kvstore pull. "
<< "This call has been ignored. "
<< "Please make sure to use row_sparse_pull with row_ids.";
this->warnings_printed_.insert(key);
}
Expand Down
114 changes: 57 additions & 57 deletions tests/nightly/dist_sync_kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ def test_sync_push_pull():
kv, my_rank, nworker = init_kv()
def check_default_keys(kv, my_rank, nworker):
nrepeat = 3
# checks pull after push in loop, because behavior during
# consecutive pushes doesn't offer any guarantees
for i in range(nrepeat):
kv.push('3', mx.nd.ones(shape)*(my_rank+1))
kv.push('99', mx.nd.ones(big_shape)*(my_rank+1))

num = (nworker + 1) * nworker * rate / 2 * nrepeat + 1
val = mx.nd.zeros(shape)
kv.pull('3', out=val)
check_diff_to_scalar(val, num)

val2 = mx.nd.zeros(big_shape)
kv.pull('99', out=val2)
check_diff_to_scalar(val2, num)
num = (nworker + 1) * nworker * rate / 2 * (i + 1) + 1
val = mx.nd.zeros(shape)
kv.pull('3', out=val)
check_diff_to_scalar(val, num)
val2 = mx.nd.zeros(big_shape)
kv.pull('99', out=val2)
check_diff_to_scalar(val2, num)

def check_row_sparse_keys(kv, my_rank, nworker):
nrepeat = 3
Expand All @@ -79,23 +79,23 @@ def check_row_sparse_keys(kv, my_rank, nworker):
# push
for i in range(nrepeat):
kv.push('9', v.tostype('row_sparse'))
# select a random subset of rows this worker is interested in
num_rows = shape[0]
row_ids_np = np.random.randint(num_rows, size=num_rows)
row_ids = mx.nd.array(row_ids_np, dtype='int64')
# perform pull
val = mx.nd.zeros(shape, stype='row_sparse')
kv.row_sparse_pull('9', out=val, row_ids=row_ids)
# prepare updated values
updated_val = mx.nd.ones(shape)
for rank in range(nworker):
row = rank % shape[0]
updated_val[row] += (rank + 1) * rate * nrepeat
# verify subset of updated values
expected = mx.nd.zeros(shape)
for row in row_ids_np:
expected[row] = updated_val[row]
check_diff_to_scalar(val, expected)
# select a random subset of rows this worker is interested in
num_rows = shape[0]
row_ids_np = np.random.randint(num_rows, size=num_rows)
row_ids = mx.nd.array(row_ids_np, dtype='int64')
# perform pull
val = mx.nd.zeros(shape, stype='row_sparse')
kv.row_sparse_pull('9', out=val, row_ids=row_ids)
# prepare updated values
updated_val = mx.nd.ones(shape)
for rank in range(nworker):
row = rank % shape[0]
updated_val[row] += (rank + 1) * rate * (i+1)
# verify subset of updated values
expected = mx.nd.zeros(shape)
for row in row_ids_np:
expected[row] = updated_val[row]
check_diff_to_scalar(val, expected)

def check_row_sparse_keys_with_zeros(kv, my_rank, nworker):
nrepeat = 3
Expand All @@ -107,17 +107,17 @@ def check_row_sparse_keys_with_zeros(kv, my_rank, nworker):
kv.push('11', v.tostype('row_sparse'))
kv.push('100', big_v.tostype('row_sparse'))

# pull a subset of rows this worker is interested in
all_row_ids = np.arange(shape[0])
val = mx.nd.ones(shape).tostype('row_sparse')
big_val = mx.nd.ones(big_shape).tostype('row_sparse')
kv.row_sparse_pull('11', out=val, row_ids=mx.nd.array(all_row_ids, dtype='int64'))
big_num_rows = shape[0]
big_all_row_ids = np.arange(big_shape[0])
kv.row_sparse_pull('100', out=big_val, row_ids=mx.nd.array(big_all_row_ids, dtype='int64'))
# verify results
check_diff_to_scalar(val, mx.nd.ones(shape))
check_diff_to_scalar(big_val, mx.nd.ones(big_shape))
# pull a subset of rows this worker is interested in
all_row_ids = np.arange(shape[0])
val = mx.nd.ones(shape).tostype('row_sparse')
big_val = mx.nd.ones(big_shape).tostype('row_sparse')
kv.row_sparse_pull('11', out=val, row_ids=mx.nd.array(all_row_ids, dtype='int64'))
big_num_rows = shape[0]
big_all_row_ids = np.arange(big_shape[0])
kv.row_sparse_pull('100', out=big_val, row_ids=mx.nd.array(big_all_row_ids, dtype='int64'))
# verify results
check_diff_to_scalar(val, mx.nd.ones(shape))
check_diff_to_scalar(big_val, mx.nd.ones(big_shape))

def check_big_row_sparse_keys(kv, my_rank, nworker):
mx.random.seed(123)
Expand Down Expand Up @@ -145,26 +145,26 @@ def check_big_row_sparse_keys(kv, my_rank, nworker):
for i in range(nrepeat):
kv.push('100', v.tostype('row_sparse'))

# select a random subset of rows this worker is interested in
mx.random.seed(my_rank)
rnd.seed(my_rank)
num_rows = big_shape[0]
row_ids_np = np.random.randint(num_rows, size=num_rows)
row_ids = mx.nd.array(row_ids_np, dtype='int64')
# perform pull
val = mx.nd.zeros(big_shape, stype='row_sparse')
kv.row_sparse_pull('100', out=val, row_ids=row_ids)
# prepare expected result
updated_val = mx.nd.ones(big_shape)
# apply updates from each worker
for rank in range(nworker):
for row in update_rows[rank]:
updated_val[row] += (rank + 1) * rate * nrepeat

expected = mx.nd.zeros(big_shape)
for row in row_ids_np:
expected[row] = updated_val[row]
check_diff_to_scalar(val, expected, rank=my_rank)
# select a random subset of rows this worker is interested in
mx.random.seed(my_rank)
rnd.seed(my_rank)
num_rows = big_shape[0]
row_ids_np = np.random.randint(num_rows, size=num_rows)
row_ids = mx.nd.array(row_ids_np, dtype='int64')
# perform pull
val = mx.nd.zeros(big_shape, stype='row_sparse')
kv.row_sparse_pull('100', out=val, row_ids=row_ids)
# prepare expected result
updated_val = mx.nd.ones(big_shape)
# apply updates from each worker
for rank in range(nworker):
for row in update_rows[rank]:
updated_val[row] += (rank + 1) * rate * (i+1)

expected = mx.nd.zeros(big_shape)
for row in row_ids_np:
expected[row] = updated_val[row]
check_diff_to_scalar(val, expected, rank=my_rank)

check_default_keys(kv, my_rank, nworker)
check_row_sparse_keys(kv, my_rank, nworker)
Expand Down