From 4e1670ef14f19c7171b15b3d79f460183725e193 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Mon, 3 Feb 2020 21:08:24 -0500 Subject: [PATCH 1/2] DEV: ignore venv/* --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index f2f7edc348..7768d0dea0 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,5 @@ zarr/version.py #doesnotexist #test_sync* data/* + +venv/* From 76f2560d3a996e893ea7a95cf0cad9e666a397ba Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Mon, 3 Feb 2020 21:09:08 -0500 Subject: [PATCH 2/2] ENH: add support for concurrent block read and write This commit adds support for optional concurrent executors to allow concurrently reading or writing blocks to the underlying store. For stores where the IO is expensive, for example an S3Map, allowing concurrent reads can be a massive performance improvement. The API is designed around Executor objects to allow safe composition. An executor may be threaded to all of the places where concurrency is desired, both inside of Zarr itself and in the user's code, to ensure that a shared thread pool is used and no more than the desired number of threads are launched at the same time. Executors would allow users to safely read or write blocks concurrently from different Zarr Array objects at the same time, without worrying about accidentally spawning too many threads. --- docs/api/concurrency.rst | 15 +++++ zarr/core.py | 131 ++++++++++++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 30 deletions(-) create mode 100644 docs/api/concurrency.rst diff --git a/docs/api/concurrency.rst b/docs/api/concurrency.rst new file mode 100644 index 0000000000..2c378e9569 --- /dev/null +++ b/docs/api/concurrency.rst @@ -0,0 +1,15 @@ +Concurrency +=========== + +Zarr supports concurrent reads and writes to distinct blocks through the use of an `Executor `__ object. +The read and write routines of a :class:`zarr.core.Array` accept an optional ``executor`` keyword argument which controls how or if concurrent execution should be performed. +By default, or if ``executor=None``, all blocks will be read and written serially. + +.. warning:: + + Not all executors can be used with all stores safely. + For example, a ``ThreadPoolExecutor`` may only be used if the underlying store is in fact thread safe. + +For stores where the data is already in memory or can be read very quickly, serial execution will likely be the fastest type of execution. +A concurrent executor is particularly useful when there is a high IO cost to retrieving a block, for example, with a store that reads data from some cloud object storage like Amazon S3. +In the case of some cloud object storage, a concurrent executor allows the Zarr to submit all of the web requests at once, instead of executing many web requests serially. diff --git a/zarr/core.py b/zarr/core.py index 6edcbb475f..eb78b9417b 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -571,7 +571,7 @@ def __getitem__(self, selection): fields, selection = pop_fields(selection) return self.get_basic_selection(selection, fields=fields) - def get_basic_selection(self, selection=Ellipsis, out=None, fields=None): + def get_basic_selection(self, selection=Ellipsis, out=None, fields=None, executor=None): """Retrieve data for an item or region of the array. Parameters @@ -584,6 +584,9 @@ def get_basic_selection(self, selection=Ellipsis, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -695,7 +698,7 @@ def get_basic_selection(self, selection=Ellipsis, out=None, fields=None): fields=fields) else: return self._get_basic_selection_nd(selection=selection, out=out, - fields=fields) + fields=fields, executor=executor) def _get_basic_selection_zd(self, selection, out=None, fields=None): # special case basic selection for zero-dimensional array @@ -731,15 +734,20 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): return out - def _get_basic_selection_nd(self, selection, out=None, fields=None): + def _get_basic_selection_nd(self, selection, out=None, fields=None, executor=None): # implementation of basic selection for array with at least one dimension # setup indexer indexer = BasicIndexer(selection, self) - return self._get_selection(indexer=indexer, out=out, fields=fields) + return self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) - def get_orthogonal_selection(self, selection, out=None, fields=None): + def get_orthogonal_selection(self, selection, out=None, fields=None, executor=None): """Retrieve data by making a selection for each dimension of the array. For example, if an array has 2 dimensions, allows selecting specific rows and/or columns. The selection for each dimension can be either an integer (indexing a @@ -756,6 +764,9 @@ def get_orthogonal_selection(self, selection, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -848,9 +859,14 @@ def get_orthogonal_selection(self, selection, out=None, fields=None): # setup indexer indexer = OrthogonalIndexer(selection, self) - return self._get_selection(indexer=indexer, out=out, fields=fields) + return self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) - def get_coordinate_selection(self, selection, out=None, fields=None): + def get_coordinate_selection(self, selection, out=None, fields=None, executor=None): """Retrieve a selection of individual items, by providing the indices (coordinates) for each selected item. @@ -863,6 +879,9 @@ def get_coordinate_selection(self, selection, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -923,14 +942,19 @@ def get_coordinate_selection(self, selection, out=None, fields=None): if out is not None: out = out.reshape(-1) - out = self._get_selection(indexer=indexer, out=out, fields=fields) + out = self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) # restore shape out = out.reshape(indexer.sel_shape) return out - def get_mask_selection(self, selection, out=None, fields=None): + def get_mask_selection(self, selection, out=None, fields=None, executor=None): """Retrieve a selection of individual items, by providing a Boolean array of the same shape as the array against which the selection is being made, where True values indicate a selected item. @@ -945,6 +969,9 @@ def get_mask_selection(self, selection, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -997,9 +1024,14 @@ def get_mask_selection(self, selection, out=None, fields=None): # setup indexer indexer = MaskIndexer(selection, self) - return self._get_selection(indexer=indexer, out=out, fields=fields) + return self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) - def _get_selection(self, indexer, out=None, fields=None): + def _get_selection(self, indexer, out=None, fields=None, executor=None): # We iterate over all chunks which overlap the selection and thus contain data # that needs to be extracted. Each chunk is processed in turn, extracting the @@ -1020,12 +1052,25 @@ def _get_selection(self, indexer, out=None, fields=None): else: check_array_shape('out', out, out_shape) - # iterate over chunks - for chunk_coords, chunk_selection, out_selection in indexer: + def f(item): + chunk_coords, chunk_selection, out_selection = item + return self._chunk_getitem( + chunk_coords, + chunk_selection, + out, + out_selection, + drop_axes=indexer.drop_axes, + fields=fields, + ) + + if executor is None: + map_ = map + else: + map_ = executor.map - # load chunk selection into output array - self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection, - drop_axes=indexer.drop_axes, fields=fields) + # iterate over chunks + for _ in map_(f, indexer): + pass if out.shape: return out @@ -1114,7 +1159,7 @@ def __setitem__(self, selection, value): fields, selection = pop_fields(selection) self.set_basic_selection(selection, value, fields=fields) - def set_basic_selection(self, selection, value, fields=None): + def set_basic_selection(self, selection, value, fields=None, executor=None): """Modify data for an item or region of the array. Parameters @@ -1127,6 +1172,9 @@ def set_basic_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1207,9 +1255,14 @@ def set_basic_selection(self, selection, value, fields=None): if self._shape == (): return self._set_basic_selection_zd(selection, value, fields=fields) else: - return self._set_basic_selection_nd(selection, value, fields=fields) - - def set_orthogonal_selection(self, selection, value, fields=None): + return self._set_basic_selection_nd( + selection, + value, + fields=fields, + executor=executor, + ) + + def set_orthogonal_selection(self, selection, value, fields=None, executor=None): """Modify data via a selection for each dimension of the array. Parameters @@ -1222,6 +1275,9 @@ def set_orthogonal_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1297,9 +1353,9 @@ def set_orthogonal_selection(self, selection, value, fields=None): # setup indexer indexer = OrthogonalIndexer(selection, self) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) - def set_coordinate_selection(self, selection, value, fields=None): + def set_coordinate_selection(self, selection, value, fields=None, executor=None): """Modify a selection of individual items, by providing the indices (coordinates) for each item to be modified. @@ -1312,6 +1368,9 @@ def set_coordinate_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1375,9 +1434,9 @@ def set_coordinate_selection(self, selection, value, fields=None): if hasattr(value, 'shape') and len(value.shape) > 1: value = value.reshape(-1) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) - def set_mask_selection(self, selection, value, fields=None): + def set_mask_selection(self, selection, value, fields=None, executor=None): """Modify a selection of individual items, by providing a Boolean array of the same shape as the array against which the selection is being made, where True values indicate a selected item. @@ -1392,6 +1451,9 @@ def set_mask_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1450,7 +1512,7 @@ def set_mask_selection(self, selection, value, fields=None): # setup indexer indexer = MaskIndexer(selection, self) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) def _set_basic_selection_zd(self, selection, value, fields=None): # special case __setitem__ for zero-dimensional array @@ -1492,15 +1554,15 @@ def _set_basic_selection_zd(self, selection, value, fields=None): cdata = self._encode_chunk(chunk) self.chunk_store[ckey] = cdata - def _set_basic_selection_nd(self, selection, value, fields=None): + def _set_basic_selection_nd(self, selection, value, fields=None, executor=None): # implementation of __setitem__ for array with at least one dimension # setup indexer indexer = BasicIndexer(selection, self) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) - def _set_selection(self, indexer, value, fields=None): + def _set_selection(self, indexer, value, fields=None, executor=None): # We iterate over all chunks which overlap the selection and thus contain data # that needs to be replaced. Each chunk is processed in turn, extracting the @@ -1528,8 +1590,8 @@ def _set_selection(self, indexer, value, fields=None): value = np.asanyarray(value) check_array_shape('value', value, sel_shape) - # iterate over chunks in range - for chunk_coords, chunk_selection, out_selection in indexer: + def f(item): + chunk_coords, chunk_selection, out_selection = item # extract data to store if sel_shape == (): @@ -1549,6 +1611,15 @@ def _set_selection(self, indexer, value, fields=None): # put data self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields) + if executor is None: + map_ = map + else: + map_ = executor.map + + # iterate over chunks in range + for _ in map_(f, indexer): + pass + def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes=None, fields=None): """Obtain part or whole of a chunk.