diff --git a/.gitignore b/.gitignore index f2f7edc348..7768d0dea0 100644 --- a/.gitignore +++ b/.gitignore @@ -74,3 +74,5 @@ zarr/version.py #doesnotexist #test_sync* data/* + +venv/* diff --git a/docs/api/concurrency.rst b/docs/api/concurrency.rst new file mode 100644 index 0000000000..2c378e9569 --- /dev/null +++ b/docs/api/concurrency.rst @@ -0,0 +1,15 @@ +Concurrency +=========== + +Zarr supports concurrent reads and writes to distinct blocks through the use of an `Executor `__ object. +The read and write routines of a :class:`zarr.core.Array` accept an optional ``executor`` keyword argument which controls how or if concurrent execution should be performed. +By default, or if ``executor=None``, all blocks will be read and written serially. + +.. warning:: + + Not all executors can be used with all stores safely. + For example, a ``ThreadPoolExecutor`` may only be used if the underlying store is in fact thread safe. + +For stores where the data is already in memory or can be read very quickly, serial execution will likely be the fastest type of execution. +A concurrent executor is particularly useful when there is a high IO cost to retrieving a block, for example, with a store that reads data from some cloud object storage like Amazon S3. +In the case of some cloud object storage, a concurrent executor allows the Zarr to submit all of the web requests at once, instead of executing many web requests serially. diff --git a/zarr/core.py b/zarr/core.py index 6edcbb475f..eb78b9417b 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -571,7 +571,7 @@ def __getitem__(self, selection): fields, selection = pop_fields(selection) return self.get_basic_selection(selection, fields=fields) - def get_basic_selection(self, selection=Ellipsis, out=None, fields=None): + def get_basic_selection(self, selection=Ellipsis, out=None, fields=None, executor=None): """Retrieve data for an item or region of the array. Parameters @@ -584,6 +584,9 @@ def get_basic_selection(self, selection=Ellipsis, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -695,7 +698,7 @@ def get_basic_selection(self, selection=Ellipsis, out=None, fields=None): fields=fields) else: return self._get_basic_selection_nd(selection=selection, out=out, - fields=fields) + fields=fields, executor=executor) def _get_basic_selection_zd(self, selection, out=None, fields=None): # special case basic selection for zero-dimensional array @@ -731,15 +734,20 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): return out - def _get_basic_selection_nd(self, selection, out=None, fields=None): + def _get_basic_selection_nd(self, selection, out=None, fields=None, executor=None): # implementation of basic selection for array with at least one dimension # setup indexer indexer = BasicIndexer(selection, self) - return self._get_selection(indexer=indexer, out=out, fields=fields) + return self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) - def get_orthogonal_selection(self, selection, out=None, fields=None): + def get_orthogonal_selection(self, selection, out=None, fields=None, executor=None): """Retrieve data by making a selection for each dimension of the array. For example, if an array has 2 dimensions, allows selecting specific rows and/or columns. The selection for each dimension can be either an integer (indexing a @@ -756,6 +764,9 @@ def get_orthogonal_selection(self, selection, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -848,9 +859,14 @@ def get_orthogonal_selection(self, selection, out=None, fields=None): # setup indexer indexer = OrthogonalIndexer(selection, self) - return self._get_selection(indexer=indexer, out=out, fields=fields) + return self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) - def get_coordinate_selection(self, selection, out=None, fields=None): + def get_coordinate_selection(self, selection, out=None, fields=None, executor=None): """Retrieve a selection of individual items, by providing the indices (coordinates) for each selected item. @@ -863,6 +879,9 @@ def get_coordinate_selection(self, selection, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -923,14 +942,19 @@ def get_coordinate_selection(self, selection, out=None, fields=None): if out is not None: out = out.reshape(-1) - out = self._get_selection(indexer=indexer, out=out, fields=fields) + out = self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) # restore shape out = out.reshape(indexer.sel_shape) return out - def get_mask_selection(self, selection, out=None, fields=None): + def get_mask_selection(self, selection, out=None, fields=None, executor=None): """Retrieve a selection of individual items, by providing a Boolean array of the same shape as the array against which the selection is being made, where True values indicate a selected item. @@ -945,6 +969,9 @@ def get_mask_selection(self, selection, out=None, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to extract data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Returns ------- @@ -997,9 +1024,14 @@ def get_mask_selection(self, selection, out=None, fields=None): # setup indexer indexer = MaskIndexer(selection, self) - return self._get_selection(indexer=indexer, out=out, fields=fields) + return self._get_selection( + indexer=indexer, + out=out, + fields=fields, + executor=executor, + ) - def _get_selection(self, indexer, out=None, fields=None): + def _get_selection(self, indexer, out=None, fields=None, executor=None): # We iterate over all chunks which overlap the selection and thus contain data # that needs to be extracted. Each chunk is processed in turn, extracting the @@ -1020,12 +1052,25 @@ def _get_selection(self, indexer, out=None, fields=None): else: check_array_shape('out', out, out_shape) - # iterate over chunks - for chunk_coords, chunk_selection, out_selection in indexer: + def f(item): + chunk_coords, chunk_selection, out_selection = item + return self._chunk_getitem( + chunk_coords, + chunk_selection, + out, + out_selection, + drop_axes=indexer.drop_axes, + fields=fields, + ) + + if executor is None: + map_ = map + else: + map_ = executor.map - # load chunk selection into output array - self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection, - drop_axes=indexer.drop_axes, fields=fields) + # iterate over chunks + for _ in map_(f, indexer): + pass if out.shape: return out @@ -1114,7 +1159,7 @@ def __setitem__(self, selection, value): fields, selection = pop_fields(selection) self.set_basic_selection(selection, value, fields=fields) - def set_basic_selection(self, selection, value, fields=None): + def set_basic_selection(self, selection, value, fields=None, executor=None): """Modify data for an item or region of the array. Parameters @@ -1127,6 +1172,9 @@ def set_basic_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1207,9 +1255,14 @@ def set_basic_selection(self, selection, value, fields=None): if self._shape == (): return self._set_basic_selection_zd(selection, value, fields=fields) else: - return self._set_basic_selection_nd(selection, value, fields=fields) - - def set_orthogonal_selection(self, selection, value, fields=None): + return self._set_basic_selection_nd( + selection, + value, + fields=fields, + executor=executor, + ) + + def set_orthogonal_selection(self, selection, value, fields=None, executor=None): """Modify data via a selection for each dimension of the array. Parameters @@ -1222,6 +1275,9 @@ def set_orthogonal_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1297,9 +1353,9 @@ def set_orthogonal_selection(self, selection, value, fields=None): # setup indexer indexer = OrthogonalIndexer(selection, self) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) - def set_coordinate_selection(self, selection, value, fields=None): + def set_coordinate_selection(self, selection, value, fields=None, executor=None): """Modify a selection of individual items, by providing the indices (coordinates) for each item to be modified. @@ -1312,6 +1368,9 @@ def set_coordinate_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1375,9 +1434,9 @@ def set_coordinate_selection(self, selection, value, fields=None): if hasattr(value, 'shape') and len(value.shape) > 1: value = value.reshape(-1) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) - def set_mask_selection(self, selection, value, fields=None): + def set_mask_selection(self, selection, value, fields=None, executor=None): """Modify a selection of individual items, by providing a Boolean array of the same shape as the array against which the selection is being made, where True values indicate a selected item. @@ -1392,6 +1451,9 @@ def set_mask_selection(self, selection, value, fields=None): fields : str or sequence of str, optional For arrays with a structured dtype, one or more fields can be specified to set data for. + executor : concurrent.futures.Executor or None, optional + An executor for submitting tasks to run concurrently. If not + provided, work will be executed serially. Examples -------- @@ -1450,7 +1512,7 @@ def set_mask_selection(self, selection, value, fields=None): # setup indexer indexer = MaskIndexer(selection, self) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) def _set_basic_selection_zd(self, selection, value, fields=None): # special case __setitem__ for zero-dimensional array @@ -1492,15 +1554,15 @@ def _set_basic_selection_zd(self, selection, value, fields=None): cdata = self._encode_chunk(chunk) self.chunk_store[ckey] = cdata - def _set_basic_selection_nd(self, selection, value, fields=None): + def _set_basic_selection_nd(self, selection, value, fields=None, executor=None): # implementation of __setitem__ for array with at least one dimension # setup indexer indexer = BasicIndexer(selection, self) - self._set_selection(indexer, value, fields=fields) + self._set_selection(indexer, value, fields=fields, executor=executor) - def _set_selection(self, indexer, value, fields=None): + def _set_selection(self, indexer, value, fields=None, executor=None): # We iterate over all chunks which overlap the selection and thus contain data # that needs to be replaced. Each chunk is processed in turn, extracting the @@ -1528,8 +1590,8 @@ def _set_selection(self, indexer, value, fields=None): value = np.asanyarray(value) check_array_shape('value', value, sel_shape) - # iterate over chunks in range - for chunk_coords, chunk_selection, out_selection in indexer: + def f(item): + chunk_coords, chunk_selection, out_selection = item # extract data to store if sel_shape == (): @@ -1549,6 +1611,15 @@ def _set_selection(self, indexer, value, fields=None): # put data self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields) + if executor is None: + map_ = map + else: + map_ = executor.map + + # iterate over chunks in range + for _ in map_(f, indexer): + pass + def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes=None, fields=None): """Obtain part or whole of a chunk.