This repository was archived by the owner on Nov 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6.7k
refactor kvstore on model.py #263
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
148479b
[IO] add back num_parts and part_index for imgrec
mli 7fbdc3d
[python] support dist kvstore
mli 2006810
[IO] multiple part support in mnist
mli e3353ab
[kvstore] add test_mlp.py
mli f9fcf8d
[doc] multi node
mli a76453d
[doc] multi-node
mli facfcde
[doc] mn
mli 5b95f34
[doc] multip
mli 8151433
[doc] multi
mli 829f143
[doc] update
mli 591c99e
[doc] update
mli d6590b2
[doc] update
mli 4c47c27
[doc] update
mli 76c8724
[kvstore] refactor the kvstore types
mli d022489
[kvstore] lint
mli 782cd27
[doc] for multipe machines
mli 52b7cf1
[kvstore] bug fix
mli 709ad2b
[python] add cpu_pinned on context
mli 0ccab70
[kvstore] update
mli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| # Multi-devices and multi-machines | ||
|
|
||
| ## Introduction | ||
|
|
||
| MXNet uses a two-level *parameter server* for data synchronization. | ||
|
|
||
| <img src=https://raw.githubusercontent.com/dmlc/dmlc.github.io/master/img/mxnet/multi-node/ps_arch.png width=400/> | ||
|
|
||
| - On the first layer, data are synchronized over multiple devices within a | ||
| single worker machine. A device could be a GPU card, CPU, or other computational | ||
| units. We often use sequential consistency model, also known as BSP, on this | ||
| level. | ||
|
|
||
| - On the second layer, data are synchronize over multiple workers via server | ||
| machines. We can either use a sequential consistency model for guaranteed | ||
| convergence or an (partial)-asynchronous model for better system performance. | ||
|
|
||
| ## KVStore | ||
|
|
||
| MXNet implemented the two-level parameter server in class *KVStore*. We | ||
| currently provide the following three types. Given the batch size *b*: | ||
|
|
||
| | kvstore type | #devices | #workers | #ex per device | #ex per update | max delay | | ||
| | :--- | --- | --- | --- | --- | --- | | ||
| | `local` | *k* | 1 | *b / k* | *b* | *0* | | ||
| | `dist_sync` | *k* | *n* | *b / k* | *b × n* | *0* | | ||
| | `dist_async` | *k* | *n* | *b / k* | *b* | inf | | ||
|
|
||
| where the number of devices *k* used on a worker could vary for different | ||
| workers. And | ||
|
|
||
| - **number examples per update** : for each update, the number of examples used to | ||
| calculate the averaged gradients. Often the larger, the slower the convergence. | ||
| - **number examples per device** : the number of examples batched to one device | ||
| each time. Often the larger, the better the performance. | ||
| - **max delay** : The maximal delay of the weight a worker can get. Given a worker, | ||
| a delay *d* for weight *w* means when this worker uses *w* (to calculate the | ||
| gradient), *w* have been already updated by *d* times on some other places. A | ||
| larger delay often improves the performance, but may slows down the | ||
| convergence. | ||
|
|
||
| ## Multiple devices on a single machine | ||
|
|
||
| KV store `local` synchronizes data over multiple devices on a single machine. | ||
| It gives the same results (e.g. model accuracy) as the single device case. But | ||
| comparing to the latter, assume there are *k* devices, then each device only | ||
| processes *1 / k* examples each time (also consumes *1 / k* device memory). We | ||
| often increase the batch size *b* for better system performance. | ||
|
|
||
| When using `local`, the system will automatically chooses one of the following | ||
| three types. Their differences are on where to average | ||
| the gradients over all devices, and where to update the weight. | ||
|
|
||
| | kvstore type | average gradient | perform update | | ||
| | :--- | :--- | --- | | ||
| | `local_update_cpu` | CPU | CPU | | ||
| | `local_allreduce_cpu` | CPU | all devices | | ||
| | `local_allreduce_device` | a device | all devices | | ||
|
|
||
| They produce (almost) the same results, but may vary on speed. | ||
|
|
||
| - `local_update_cpu`, gradients are first copied to main memory, next averaged on CPU, | ||
| and then update the weight on CPU. It is suitable when the average size of | ||
| weights are not large and there are a large number of weight. For example the | ||
| google Inception network. | ||
|
|
||
| - `local_allreduce_cpu` is similar to `local_update_cpu` except that the | ||
| averaged gradients are copied back to the devices, and then weights are | ||
| updated on devices. It is faster than 1 when the weight size is large so we | ||
| can use the device to accelerate the computation (but we increase the workload | ||
| by *k* times). Examples are AlexNet on imagenet. | ||
|
|
||
| - `local_allreduce_device` is similar to `local_allreduce_cpu` except that the | ||
| gradient are averaged on a chosen device. It may take advantage of the | ||
| possible device-to-device communication, and may accelerate the averaging | ||
| step. It is faster than 2 when the gradients are huge. But it requires more | ||
| device memory. | ||
|
|
||
| ## Multiple machines | ||
|
|
||
| Both `dist_async` and `dist_sync` can handle the multiple machines | ||
| situation. But they are different on both semantic and performance. | ||
|
|
||
| - `dist_sync`: the gradients are first averaged on the servers, and then send to | ||
| back to workers for updating the weight. It is similar to `local` and | ||
| `update_on_kvstore=false` if we treat a machine as a device. It guarantees | ||
| almost identical convergence with the single machine single device situation | ||
| if reduces the batch size to *b / n*. However, it requires synchronization | ||
| between all workers, and therefore may harm the system performance. | ||
|
|
||
| - `dist_async`: the gradient is sent to the servers, and the weight is updated | ||
| there. The weights a worker has may be stale. This loose data consistency | ||
| model reduces the machine synchronization cost and therefore could improve the | ||
| system performance. But it may harm the convergence speed. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |
| import pickle | ||
| from .ndarray import NDArray | ||
| from .base import _LIB | ||
| from .base import check_call, c_array, c_str, string_types, mx_uint | ||
| from .base import check_call, c_array, c_str, string_types, mx_uint, py_str | ||
| from .base import NDArrayHandle, KVStoreHandle | ||
| from . import optimizer as opt | ||
|
|
||
|
|
@@ -68,7 +68,7 @@ def init(self, key, value): | |
|
|
||
| For each key, one must init it before push and pull. | ||
|
|
||
| Only worker 0's (get_rank() == 0) data are used. | ||
| Only worker 0's (rank == 0) data are used. | ||
|
|
||
| This function returns after data have been initialized successfully | ||
|
|
||
|
|
@@ -95,7 +95,7 @@ def init(self, key, value): | |
| >>> keys = [5, 7, 9] | ||
| >>> kv.init(keys, [mx.nd.ones(shape)]*len(keys)) | ||
| """ | ||
| if (self.get_rank() == 0): | ||
| if (self.rank == 0): | ||
| ckeys, cvals = _ctype_key_value(key, value) | ||
| check_call(_LIB.MXKVStoreInit( | ||
| self.handle, mx_uint(len(ckeys)), ckeys, cvals)) | ||
|
|
@@ -169,6 +169,9 @@ def push(self, key, value, priority=0): | |
| self.handle, mx_uint(len(ckeys)), ckeys, cvals, | ||
| ctypes.c_int(priority))) | ||
|
|
||
| # self._wait(key) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove commented lines |
||
| # self._barrier() | ||
|
|
||
| def pull(self, key, out=None, priority=0): | ||
| """ Pull a single value or a sequence of values from the store. | ||
|
|
||
|
|
@@ -261,9 +264,23 @@ def set_optimizer(self, optimizer): | |
| raise | ||
| self._send_command_to_servers(0, optim_str) | ||
| else: | ||
| self._set_updater(opt.optimizer_clossure(optimizer)) | ||
| self._set_updater(opt.get_updater(optimizer)) | ||
|
|
||
| @property | ||
| def type(self): | ||
| """Get the type of this kvstore | ||
|
|
||
| Returns | ||
| ------- | ||
| type : str | ||
| the string type | ||
| """ | ||
| kv_type = ctypes.c_char_p() | ||
| check_call(_LIB.MXKVStoreGetType(self.handle, ctypes.byref(kv_type))) | ||
| return py_str(kv_type.value) | ||
|
|
||
| def get_rank(self): | ||
| @property | ||
| def rank(self): | ||
| """Get the rank of this worker node | ||
|
|
||
| Returns | ||
|
|
@@ -275,7 +292,8 @@ def get_rank(self): | |
| check_call(_LIB.MXKVStoreGetRank(self.handle, ctypes.byref(rank))) | ||
| return rank.value | ||
|
|
||
| def get_num_workers(self): | ||
| @property | ||
| def num_workers(self): | ||
| """Get the number of worker ndoes | ||
|
|
||
| Returns | ||
|
|
@@ -329,17 +347,17 @@ def _barrier(self): | |
| pulling, we can place a barrier to guarantee that the initialization is | ||
| finished. | ||
|
|
||
| The following codes run on n machines in parallel | ||
|
|
||
| >>> if kv.get_rank() == 0: | ||
| ... kv.init(keys, values); | ||
| ... kv.barrier() | ||
| ... kv.pull(keys, out = values); | ||
|
|
||
| But note that, this functions only blocks the main thread of workers | ||
| until all of them are reached this point. It doesn't guarantee that all | ||
| operations issued before are actually finished, such as \ref Push and | ||
| \ref Pull. In that case, we need to call \ref Wait or \ref WaitAll | ||
|
|
||
| The following codes implement a BSP model | ||
|
|
||
| >>> kv.push(keys, values) | ||
| ... kv._wait(keys) | ||
| ... kv._barrier() | ||
| ... kv.pull(keys, out = values); | ||
| """ | ||
| check_call(_LIB.MXKVStoreBarrier(self.handle)) | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tqchen please double check if it is ok to have cpu_pinned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us not expose this for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
must be here, create state may create pinned mem array. That bug costs me a
hour to find
On Sun, Oct 11, 2015 at 1:45 AM Tianqi Chen notifications@github.com
wrote:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean. Keep it here then