From ba1ce85f58faa4e4265dc4d102b7770236fb163a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 25 Feb 2018 20:32:33 -0800 Subject: [PATCH 01/13] Download Redis and flatbuffers differently. (#1602) * Download Redis differently. * Get flatbuffers with curl --- src/common/thirdparty/build-redis.sh | 5 ++++- src/thirdparty/build_flatbuffers.sh | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/common/thirdparty/build-redis.sh b/src/common/thirdparty/build-redis.sh index 7d5f88957df8..ddce13513c78 100755 --- a/src/common/thirdparty/build-redis.sh +++ b/src/common/thirdparty/build-redis.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +set -x + # Cause the script to exit if a single command fails. set -e @@ -9,7 +11,8 @@ if [ ! -f redis/src/redis-server ]; then # relevant bit about redis/utils/whatisdoing.sh is that it is one of the last # files in the tarball. if [ ! -f redis/utils/whatisdoing.sh ]; then - mkdir -p "./redis" && wget -O- "https://github.com/antirez/redis/archive/$redis_vname.tar.gz" | tar xvz --strip-components=1 -C "./redis" + mkdir -p "./redis" + curl -sL "https://github.com/antirez/redis/archive/$redis_vname.tar.gz" | tar xz --strip-components=1 -C "./redis" fi cd redis make diff --git a/src/thirdparty/build_flatbuffers.sh b/src/thirdparty/build_flatbuffers.sh index 121cd9e8d73a..918d955e57ba 100755 --- a/src/thirdparty/build_flatbuffers.sh +++ b/src/thirdparty/build_flatbuffers.sh @@ -12,7 +12,7 @@ FLATBUFFERS_VERSION=1.7.1 # Download and compile flatbuffers if it isn't already present. if [ ! -d $TP_DIR/flatbuffers ]; then echo "building flatbuffers" - wget https://github.com/google/flatbuffers/archive/v$FLATBUFFERS_VERSION.tar.gz -O flatbuffers-$FLATBUFFERS_VERSION.tar.gz + curl -sL https://github.com/google/flatbuffers/archive/v$FLATBUFFERS_VERSION.tar.gz -o flatbuffers-$FLATBUFFERS_VERSION.tar.gz tar xf flatbuffers-$FLATBUFFERS_VERSION.tar.gz rm -rf flatbuffers-$FLATBUFFERS_VERSION.tar.gz From c2ad800cbf3a418e68a6081c8d3ddd0c0df098ff Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 25 Feb 2018 22:30:11 -0800 Subject: [PATCH 02/13] [rllib] Registry fix for DQN Replay Evaluators (#1593) --- python/ray/rllib/dqn/dqn_replay_evaluator.py | 8 +++++++- test/jenkins_tests/run_multi_node_tests.sh | 7 +++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/python/ray/rllib/dqn/dqn_replay_evaluator.py b/python/ray/rllib/dqn/dqn_replay_evaluator.py index 56bbe6d48409..effd0bf01249 100644 --- a/python/ray/rllib/dqn/dqn_replay_evaluator.py +++ b/python/ray/rllib/dqn/dqn_replay_evaluator.py @@ -28,7 +28,7 @@ def __init__(self, registry, env_creator, config, logdir): if self.config["num_workers"] > 1: remote_cls = ray.remote(num_cpus=1)(DQNEvaluator) self.workers = [ - remote_cls.remote(env_creator, config, logdir) + remote_cls.remote(registry, env_creator, config, logdir) for _ in range(self.config["num_workers"])] else: self.workers = [] @@ -146,3 +146,9 @@ def restore(self, data): w.restore.remote(d) self.beta_schedule = data[2] self.replay_buffer = data[3] + + def set_global_timestep(self, global_timestep): + self.global_timestep = global_timestep + if self.workers: + ray.get([worker.set_global_timestep.remote(global_timestep) + for worker in self.workers]) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 0165215a3e23..295969a5d22a 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -153,6 +153,13 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ # --stop '{"training_iteration": 2}' \ # --config '{"num_workers": 2, "use_lstm": false, "use_pytorch": true, "model": {"grayscale": true, "zero_mean": false, "dim": 80, "channel_major": true}}' +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env CartPole-v0 \ + --run DQN \ + --stop '{"training_iteration": 2}' \ + --config '{"num_workers": 2}' + docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ From d78b65bd51f44cd6758a426f39896ed623dfcb19 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 26 Feb 2018 00:36:21 -0800 Subject: [PATCH 03/13] Implement loc and iloc --- python/ray/dataframe/dataframe.py | 22 +++- python/ray/dataframe/indexing.py | 113 ++++++++++++++++++++ python/ray/dataframe/test/test_dataframe.py | 64 ++++++++--- 3 files changed, 178 insertions(+), 21 deletions(-) create mode 100644 python/ray/dataframe/indexing.py diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 4dd867a706a7..5f89a9e5931e 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1603,8 +1603,15 @@ def iat(axis=None): def __rsub__(other, axis=None, level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") - def loc(axis=None): - raise NotImplementedError("Not Yet implemented.") + @property + def loc(self): + """Purely label-location based indexer for selection by label. + + We currently support: single label, list array, slice object + We do not support: boolean array, callable + """ + from .indexing import _Loc_Indexer + return _Loc_Indexer(self) @property def is_copy(self): @@ -1622,8 +1629,15 @@ def at(axis=None): def ix(axis=None): raise NotImplementedError("Not Yet implemented.") - def iloc(axis=None): - raise NotImplementedError("Not Yet implemented.") + @property + def iloc(self): + """Purely integer-location based indexing for selection by position. + + We currently support: single label, list array, slice object + We do not support: boolean array, callable + """ + from .indexing import _iLoc_Indexer + return _iLoc_Indexer(self) def _get_lengths(df): diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py new file mode 100644 index 000000000000..c045a0bb3ca4 --- /dev/null +++ b/python/ray/dataframe/indexing.py @@ -0,0 +1,113 @@ +import pandas as pd +import ray +from .dataframe import _deploy_func + + +def _row_tuples_to_dict(row_tuples): + """A convenient helper function convert: + [(partition, idx_in_partition)] to {partition: [idx_in_partition]} + """ + d = {} + for partition, idx_in_partition in row_tuples: + if partition in d: + d[partition].append(idx_in_partition) + else: + d[partition] = [idx_in_partition] + return d + + +class _Location_Indexer_Base(): + """Base class for location indexer like loc and iloc + This class abstract away commonly used method + """ + + def __init__(self, ray_df): + self.df = ray_df + + def __getitem__(self, key): + if not isinstance(key, tuple): + # The one argument case is equivalent to full slice in 2nd dim. + return self.locate_2d(key, slice(None)) + else: + return self.locate_2d(*key) + + def _get_lookup_dict(self, ray_partition_idx): + if ray_partition_idx.ndim == 1: # Single row matched + position = (ray_partition_idx['partition'], + ray_partition_idx['index_within_partition']) + rows_to_lookup = [position] + + if ray_partition_idx.ndim == 2: # Multiple rows matched + rows_to_lookup = [(row['partition'], row['index_within_partition']) + for _, row in ray_partition_idx.iterrows()] + + lookup_dict = _row_tuples_to_dict(rows_to_lookup) + return lookup_dict + + def locate_2d(self, row_label, col_label): + pass + + def _map_partition(self, lookup_dict, col_lst, indexer='loc'): + """Apply retrieval function to a lookup_dict + in the form of {partition_id: [idx]}. + + Returns: + retrieved_rows_remote: a list of object ids for pd_df + """ + assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc" + + if indexer == 'loc': + def retrieve_func( + df, idx_lst, col_label): return df.loc[idx_lst, col_label] + elif indexer == 'iloc': + def retrieve_func( + df, idx_lst, col_idx): return df.iloc[idx_lst, col_idx] + + retrieved_rows_remote = [] + for partition, idx_to_lookup in lookup_dict.items(): + part_remote = _deploy_func.remote( + retrieve_func, self.df._df[partition], idx_to_lookup, col_lst) + retrieved_rows_remote.append(part_remote) + return retrieved_rows_remote + + +class _Loc_Indexer(_Location_Indexer_Base): + """A indexer for ray_df.loc[] functionality""" + + def locate_2d(self, row_label, col_label): + index_loc = self.df._index.loc[row_label] + lookup_dict = self._get_lookup_dict(index_loc) + retrieved_rows_remote = self._map_partition( + lookup_dict, col_label, indexer='loc') + joined_df = pd.concat(ray.get(retrieved_rows_remote)) + + if index_loc.ndim == 2: + # The returned result need to be indexed series/df + # Re-index is needed. + joined_df.index = index_loc.index + + if isinstance(row_label, int) or isinstance(row_label, str): + return joined_df.squeeze(axis=0) + else: + return joined_df + + +class _iLoc_Indexer(_Location_Indexer_Base): + """A indexer for ray_df.iloc[] functionality""" + + def locate_2d(self, row_idx, col_idx): + index_loc = self.df._index.iloc[row_idx] + lookup_dict = self._get_lookup_dict(index_loc) + retrieved_rows_remote = self._map_partition( + lookup_dict, col_idx, indexer='iloc') + joined_df = pd.concat(ray.get(retrieved_rows_remote)) + + if index_loc.ndim == 2: + # The returned result need to be indexed series/df + # Re-index is needed. + joined_df.index = index_loc.index + + if isinstance(row_idx, int) or isinstance(row_idx, str): + return joined_df.squeeze(axis=0) + else: + return joined_df diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index d49f71fb354c..f04e93579757 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -216,6 +216,9 @@ def test_int_dataframe(): test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + def test_float_dataframe(): @@ -288,13 +291,16 @@ def test_float_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + def test_mixed_dtype_dataframe(): pandas_df = pd.DataFrame({ - 'col1': [1, 2, 3, 4], - 'col2': [4, 5, 6, 7], - 'col3': [8.0, 9.4, 10.1, 11.3], - 'col4': ['a', 'b', 'c', 'd']}) + 'col1': [1, 2, 3, 4], + 'col2': [4, 5, 6, 7], + 'col3': [8.0, 9.4, 10.1, 11.3], + 'col4': ['a', 'b', 'c', 'd']}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -365,13 +371,16 @@ def test_mixed_dtype_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + def test_nan_dataframe(): pandas_df = pd.DataFrame({ - 'col1': [1, 2, 3, np.nan], - 'col2': [4, 5, np.nan, 7], - 'col3': [8, np.nan, 10, 11], - 'col4': [np.nan, 13, 14, 15]}) + 'col1': [1, 2, 3, np.nan], + 'col2': [4, 5, np.nan, 7], + 'col3': [8, np.nan, 10, 11], + 'col4': [np.nan, 13, 14, 15]}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -435,6 +444,9 @@ def test_nan_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + def test_add(): ray_df = create_test_dataframe() @@ -1936,11 +1948,20 @@ def test___rsub__(): ray_df.__rsub__(None, None, None) -def test_loc(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_loc(ray_df, pd_df): + # Singleton + assert ray_df.loc[0].equals(pd_df.loc[0]) + assert ray_df.loc[0, 'col1'] == pd_df.loc[0, 'col1'] - with pytest.raises(NotImplementedError): - ray_df.loc() + # List + assert ray_df.loc[[1, 2]].equals(pd_df.loc[[1, 2]]) + assert ray_df.loc[[1, 2], ['col1']].equals(pd_df.loc[[1, 2], ['col1']]) + + # Slice + assert ray_df.loc[1:, 'col1'].equals(pd_df.loc[1:, 'col1']) + assert ray_df.loc[1:2, 'col1'].equals(pd_df.loc[1:2, 'col1']) + assert ray_df.loc[1:2, 'col1':'col2'].equals(pd_df.loc[1:2, 'col1':'col2']) def test_is_copy(): @@ -1978,8 +1999,17 @@ def test_ix(): ray_df.ix() -def test_iloc(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.iloc() +@pytest.fixture +def test_iloc(ray_df, pd_df): + # Singleton + assert ray_df.iloc[0].equals(pd_df.iloc[0]) + assert ray_df.iloc[0, 1] == pd_df.iloc[0, 1] + + # List + assert ray_df.iloc[[1, 2]].equals(pd_df.iloc[[1, 2]]) + assert ray_df.iloc[[1, 2], [1, 0]].equals(pd_df.iloc[[1, 2], [1, 0]]) + + # Slice + assert ray_df.iloc[1:, 0].equals(pd_df.iloc[1:, 0]) + assert ray_df.iloc[1:2, 0].equals(pd_df.iloc[1:2, 0]) + assert ray_df.iloc[1:2, 0:2].equals(pd_df.iloc[1:2, 0:2]) From 1fa59f1887aac67868f58bd9f2d2bdf28dc22a03 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Mon, 26 Feb 2018 08:58:15 -0800 Subject: [PATCH 04/13] [DataFrame] Adding insert, set_axis, set_index, reset_index and tests (#1603) --- python/ray/dataframe/dataframe.py | 271 +++++++++++++++++++- python/ray/dataframe/test/test_dataframe.py | 143 +++++++++-- 2 files changed, 384 insertions(+), 30 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a300fd8e9495..0db5a4e4031b 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,6 +3,14 @@ from __future__ import print_function import pandas as pd +from pandas.api.types import is_scalar +from pandas.util._validators import validate_bool_kwarg +from pandas.core.index import _ensure_index_from_sequences +from pandas._libs import lib +from pandas.core.dtypes.cast import maybe_upcast_putmask +from pandas.compat import lzip + +import warnings import numpy as np import ray import itertools @@ -792,7 +800,52 @@ def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, raise NotImplementedError("Not Yet implemented.") def insert(self, loc, column, value, allow_duplicates=False): - raise NotImplementedError("Not Yet implemented.") + """Insert column into DataFrame at specified location. + + Args: + loc (int): Insertion index. Must verify 0 <= loc <= len(columns). + column (hashable object): Label of the inserted column. + value (int, Series, or array-like): The values to insert. + allow_duplicates (bool): Whether to allow duplicate column names. + """ + try: + len(value) + except TypeError: + value = [value for _ in range(len(self.index))] + + if len(value) != len(self.index): + raise ValueError( + "Column length provided does not match DataFrame length.") + if loc < 0 or loc > len(self.columns): + raise ValueError( + "Location provided must be higher than 0 and lower than the " + "number of columns.") + if not allow_duplicates and column in self.columns: + raise ValueError( + "Column {} already exists in DataFrame.".format(column)) + + cumulative = np.cumsum(self._lengths) + partitions = [value[cumulative[i-1]:cumulative[i]] + for i in range(len(cumulative)) + if i != 0] + + partitions.insert(0, value[:cumulative[0]]) + + # Because insert is always inplace, we have to create this temp fn. + def _insert(_df, _loc, _column, _part, _allow_duplicates): + _df.insert(_loc, _column, _part, _allow_duplicates) + return _df + + self._df = \ + [_deploy_func.remote(_insert, + self._df[i], + loc, + column, + partitions[i], + allow_duplicates) + for i in range(len(self._df))] + + self.columns = self.columns.insert(loc, column) def interpolate(self, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', downcast=None, **kwargs): @@ -1047,6 +1100,8 @@ def pop(self, item): popped = to_pandas(self._map_partitions( lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df + self.columns = [col for col in self.columns if col != item] + return popped def pow(self, other, axis='columns', level=None, fill_value=None): @@ -1111,7 +1166,103 @@ def resample(self, rule, how=None, axis=0, fill_method=None, closed=None, def reset_index(self, level=None, drop=False, inplace=False, col_level=0, col_fill=''): - raise NotImplementedError("Not Yet implemented.") + """Reset this index to default and create column from current index. + + Args: + level: Only remove the given levels from the index. Removes all + levels by default + drop: Do not try to insert index into dataframe columns. This + resets the index to the default integer index. + inplace: Modify the DataFrame in place (do not create a new object) + col_level : If the columns have multiple levels, determines which + level the labels are inserted into. By default it is inserted + into the first level. + col_fill: If the columns have multiple levels, determines how the + other levels are named. If None then the index name is + repeated. + + Returns: + A new DataFrame if inplace is False, None otherwise. + """ + inplace = validate_bool_kwarg(inplace, 'inplace') + if inplace: + new_obj = self + else: + new_obj = self.copy() + + def _maybe_casted_values(index, labels=None): + if isinstance(index, pd.PeriodIndex): + values = index.asobject.values + elif isinstance(index, pd.DatetimeIndex) and index.tz is not None: + values = index + else: + values = index.values + if values.dtype == np.object_: + values = lib.maybe_convert_objects(values) + + # if we have the labels, extract the values with a mask + if labels is not None: + mask = labels == -1 + + # we can have situations where the whole mask is -1, + # meaning there is nothing found in labels, so make all nan's + if mask.all(): + values = np.empty(len(mask)) + values.fill(np.nan) + else: + values = values.take(labels) + if mask.any(): + values, changed = maybe_upcast_putmask( + values, mask, np.nan) + return values + + new_index = new_obj._default_index().index + if level is not None: + if not isinstance(level, (tuple, list)): + level = [level] + level = [self.index._get_level_number(lev) for lev in level] + if isinstance(self.index, pd.MultiIndex): + if len(level) < self.index.nlevels: + new_index = self.index.droplevel(level) + + if not drop: + if isinstance(self.index, pd.MultiIndex): + names = [n if n is not None else ('level_%d' % i) + for (i, n) in enumerate(self.index.names)] + to_insert = lzip(self.index.levels, self.index.labels) + else: + default = 'index' if 'index' not in self else 'level_0' + names = ([default] if self.index.name is None + else [self.index.name]) + to_insert = ((self.index, None),) + + multi_col = isinstance(self.columns, pd.MultiIndex) + for i, (lev, lab) in reversed(list(enumerate(to_insert))): + if not (level is None or i in level): + continue + name = names[i] + if multi_col: + col_name = (list(name) if isinstance(name, tuple) + else [name]) + if col_fill is None: + if len(col_name) not in (1, self.columns.nlevels): + raise ValueError("col_fill=None is incompatible " + "with incomplete column name " + "{}".format(name)) + col_fill = col_name[0] + + lev_num = self.columns._get_level_number(col_level) + name_lst = [col_fill] * lev_num + col_name + missing = self.columns.nlevels - len(name_lst) + name_lst += [col_fill] * missing + name = tuple(name_lst) + # to ndarray and maybe infer different dtype + level_values = _maybe_casted_values(lev, lab) + new_obj.insert(0, name, level_values) + + new_obj.index = new_index + if not inplace: + return new_obj def rfloordiv(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1155,11 +1306,116 @@ def sem(self, axis=None, skipna=None, level=None, ddof=1, raise NotImplementedError("Not Yet implemented.") def set_axis(self, labels, axis=0, inplace=None): - raise NotImplementedError("Not Yet implemented.") + """Assign desired index to given axis. + + Args: + labels (pd.Index or list-like): The Index to assign. + axis (string or int): The axis to reassign. + inplace (bool): Whether to make these modifications inplace. + + Returns: + If inplace is False, returns a new DataFrame, otherwise None. + """ + if is_scalar(labels): + warnings.warn( + 'set_axis now takes "labels" as first argument, and ' + '"axis" as named parameter. The old form, with "axis" as ' + 'first parameter and \"labels\" as second, is still supported ' + 'but will be deprecated in a future version of pandas.', + FutureWarning, stacklevel=2) + labels, axis = axis, labels + + if inplace is None: + warnings.warn( + 'set_axis currently defaults to operating inplace.\nThis ' + 'will change in a future version of pandas, use ' + 'inplace=True to avoid this warning.', + FutureWarning, stacklevel=2) + inplace = True + if inplace: + setattr(self, self._index._get_axis_name(axis), labels) + else: + obj = self.copy() + obj.set_axis(labels, axis=axis, inplace=True) + return obj def set_index(self, keys, drop=True, append=False, inplace=False, verify_integrity=False): - raise NotImplementedError("Not Yet implemented.") + """Set the DataFrame index using one or more existing columns. + + Args: + keys: column label or list of column labels / arrays. + drop (boolean): Delete columns to be used as the new index. + append (boolean): Whether to append columns to existing index. + inplace (boolean): Modify the DataFrame in place. + verify_integrity (boolean): Check the new index for duplicates. + Otherwise defer the check until necessary. Setting to False + will improve the performance of this method + + Returns: + If inplace is set to false returns a new DataFrame, otherwise None. + """ + inplace = validate_bool_kwarg(inplace, 'inplace') + if not isinstance(keys, list): + keys = [keys] + + if inplace: + frame = self + else: + frame = self.copy() + + arrays = [] + names = [] + if append: + names = [x for x in self.index.names] + if isinstance(self.index, pd.MultiIndex): + for i in range(self.index.nlevels): + arrays.append(self.index._get_level_values(i)) + else: + arrays.append(self.index) + + to_remove = [] + for col in keys: + if isinstance(col, pd.MultiIndex): + # append all but the last column so we don't have to modify + # the end of this loop + for n in range(col.nlevels - 1): + arrays.append(col._get_level_values(n)) + + level = col._get_level_values(col.nlevels - 1) + names.extend(col.names) + elif isinstance(col, pd.Series): + level = col._values + names.append(col.name) + elif isinstance(col, pd.Index): + level = col + names.append(col.name) + elif isinstance(col, (list, np.ndarray, pd.Index)): + level = col + names.append(None) + else: + level = frame[col]._values + names.append(col) + if drop: + to_remove.append(col) + arrays.append(level) + + index = _ensure_index_from_sequences(arrays, names) + + if verify_integrity and not index.is_unique: + duplicates = index.get_duplicates() + raise ValueError('Index has duplicate keys: %s' % duplicates) + + for c in to_remove: + del frame[c] + + # clear up memory usage + index._cleanup() + + frame.index = index + + if not inplace: + return frame def set_value(self, index, col, value, takeable=False): raise NotImplementedError("Not Yet implemented.") @@ -1416,7 +1672,7 @@ def __iter__(self): raise NotImplementedError("Not Yet implemented.") def __contains__(self, key): - raise NotImplementedError("Not Yet implemented.") + return key in self.columns def __nonzero__(self): raise NotImplementedError("Not Yet implemented.") @@ -1715,4 +1971,7 @@ def to_pandas(df): Returns: A new pandas DataFrame. """ - return pd.concat(ray.get(df._df)) + pd_df = pd.concat(ray.get(df._df)) + pd_df.index = df.index + pd_df.columns = df.columns + return pd_df diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index d49f71fb354c..7dd63059ba99 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -216,6 +216,29 @@ def test_int_dataframe(): test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + labels.append('e') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test___contains__(ray_df, key, True) + test___contains__(ray_df, "Not Exists", False) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_float_dataframe(): @@ -288,6 +311,26 @@ def test_float_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + labels.append('e') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_mixed_dtype_dataframe(): pandas_df = pd.DataFrame({ @@ -365,6 +408,25 @@ def test_mixed_dtype_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_nan_dataframe(): pandas_df = pd.DataFrame({ @@ -435,6 +497,25 @@ def test_nan_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_add(): ray_df = create_test_dataframe() @@ -902,11 +983,13 @@ def test_info(): ray_df.info() -def test_insert(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_insert(ray_df, pandas_df, loc, column, value): + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() - with pytest.raises(NotImplementedError): - ray_df.insert(None, None, None) + ray_df_cp.insert(loc, column, value) + pd_df_cp.insert(loc, column, value) def test_interpolate(): @@ -1307,11 +1390,19 @@ def test_resample(): ray_df.resample(None) -def test_reset_index(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.reset_index() +@pytest.fixture +def test_reset_index(ray_df, pandas_df, inplace=False): + if not inplace: + print(rdf.to_pandas(ray_df.reset_index(inplace=inplace)).index) + print(pandas_df.reset_index(inplace=inplace)) + assert rdf.to_pandas(ray_df.reset_index(inplace=inplace)).equals( + pandas_df.reset_index(inplace=inplace)) + else: + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() + ray_df_cp.reset_index(inplace=inplace) + pd_df_cp.reset_index(inplace=inplace) + assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) def test_rfloordiv(): @@ -1397,18 +1488,23 @@ def test_sem(): ray_df.sem() -def test_set_axis(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.set_axis(None) - +@pytest.fixture +def test_set_axis(ray_df, pandas_df, label, axis): + assert rdf.to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals( + pandas_df.set_axis(label, axis, inplace=False)) -def test_set_index(): - ray_df = create_test_dataframe() - with pytest.raises(NotImplementedError): - ray_df.set_index(None) +@pytest.fixture +def test_set_index(ray_df, pandas_df, keys, inplace=False): + if not inplace: + assert rdf.to_pandas(ray_df.set_index(keys)).equals( + pandas_df.set_index(keys)) + else: + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() + ray_df_cp.set_index(keys, inplace=inplace) + pd_df_cp.set_index(keys, inplace=inplace) + assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) def test_set_value(): @@ -1817,11 +1913,10 @@ def test___iter__(): ray_df.__iter__() -def test___contains__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__contains__(None) +@pytest.fixture +def test___contains__(ray_df, key, result): + assert result == ray_df.__contains__(key) + assert result == (key in ray_df) def test___nonzero__(): From aefefcb0cd4db71c6dcf2c12d5e4f57055a23e33 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Feb 2018 10:26:38 -0800 Subject: [PATCH 05/13] Upload wheels to 'latest' folder (#1606) --- .travis.yml | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0eed43fc3f5e..e91997b6ba0a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -146,18 +146,33 @@ script: - python -m pytest python/ray/rllib/test/test_evaluators.py deploy: - provider: s3 - access_key_id: AKIAJ2L7XDUSZVTXI5QA - secret_access_key: - secure: MZbzbQvfn9QI2H19Ai0EZju5BERhCMA8/piHU29syvtmoDqd/QdMW0DTHhLAqlaCrGeMGCx0y6sB9DjX46ZKndQ/cgSQDesfNC300NTZZlWyYr7K86yhj+hgIpYXs+G28g1hmQOUzCWL8kAgfeMle9GvKkZ7DkhdRszg8bPyIXdKtjQGO5RRrrjQBgIzjvOiWFOD9lDzula5j8uV4tsiXT8nQjuiOIwmAxB2r7zXHc/Vsr9wBAeQ9Fq6aomEGuuVscoMhZqWc0SHOOz0dIDdlJFF+W4Effw6l9u0Fe262g0WfsnS3PqF7a6eBC0qkf3yH8joAlvquVxWp+dr7dBzy0gGZysD/pqF/NBiB3GZ9TMreK39DJ9zC83p2r0awP1hduhkCJI2QOsNX7fna6e2edVt7rxOEe19So83eDNBbJ6bfV7YbkEMqUJxNHWC6MIDCrCbFf8QlT3fnPsb0IHMa9aJRe/TvgI+aR+nKjRhvVymXddCBAy5hYb/I66omx4BGbl7+9HPo/w/c3m+vCJIu6IQZFVAmsoP6pft9aYVXgkz20C4I/4tF0YlDuH617PT3DeCjf+MG4Mgh9JiXJ2Jt8U6NH1tlXiS/F6OjPGFB7UrFw1o2e0KhX+l/qJEslf5Xc35vmbELf1Fy7QNVttZ2H5OXYrWhsV8EOmpN+KcVQI= - bucket: ray-wheels - acl: public_read - region: us-west-2 - local_dir: .whl - upload-dir: $TRAVIS_COMMIT - skip_cleanup: true - only: - - master - on: - repo: ray-project/ray - condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 + - provider: s3 + access_key_id: AKIAJ2L7XDUSZVTXI5QA + secret_access_key: + secure: MZbzbQvfn9QI2H19Ai0EZju5BERhCMA8/piHU29syvtmoDqd/QdMW0DTHhLAqlaCrGeMGCx0y6sB9DjX46ZKndQ/cgSQDesfNC300NTZZlWyYr7K86yhj+hgIpYXs+G28g1hmQOUzCWL8kAgfeMle9GvKkZ7DkhdRszg8bPyIXdKtjQGO5RRrrjQBgIzjvOiWFOD9lDzula5j8uV4tsiXT8nQjuiOIwmAxB2r7zXHc/Vsr9wBAeQ9Fq6aomEGuuVscoMhZqWc0SHOOz0dIDdlJFF+W4Effw6l9u0Fe262g0WfsnS3PqF7a6eBC0qkf3yH8joAlvquVxWp+dr7dBzy0gGZysD/pqF/NBiB3GZ9TMreK39DJ9zC83p2r0awP1hduhkCJI2QOsNX7fna6e2edVt7rxOEe19So83eDNBbJ6bfV7YbkEMqUJxNHWC6MIDCrCbFf8QlT3fnPsb0IHMa9aJRe/TvgI+aR+nKjRhvVymXddCBAy5hYb/I66omx4BGbl7+9HPo/w/c3m+vCJIu6IQZFVAmsoP6pft9aYVXgkz20C4I/4tF0YlDuH617PT3DeCjf+MG4Mgh9JiXJ2Jt8U6NH1tlXiS/F6OjPGFB7UrFw1o2e0KhX+l/qJEslf5Xc35vmbELf1Fy7QNVttZ2H5OXYrWhsV8EOmpN+KcVQI= + bucket: ray-wheels + acl: public_read + region: us-west-2 + local_dir: .whl + upload-dir: $TRAVIS_COMMIT + skip_cleanup: true + only: + - master + on: + repo: ray-project/ray + condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 + - provider: s3 + access_key_id: AKIAJ2L7XDUSZVTXI5QA + secret_access_key: + secure: MZbzbQvfn9QI2H19Ai0EZju5BERhCMA8/piHU29syvtmoDqd/QdMW0DTHhLAqlaCrGeMGCx0y6sB9DjX46ZKndQ/cgSQDesfNC300NTZZlWyYr7K86yhj+hgIpYXs+G28g1hmQOUzCWL8kAgfeMle9GvKkZ7DkhdRszg8bPyIXdKtjQGO5RRrrjQBgIzjvOiWFOD9lDzula5j8uV4tsiXT8nQjuiOIwmAxB2r7zXHc/Vsr9wBAeQ9Fq6aomEGuuVscoMhZqWc0SHOOz0dIDdlJFF+W4Effw6l9u0Fe262g0WfsnS3PqF7a6eBC0qkf3yH8joAlvquVxWp+dr7dBzy0gGZysD/pqF/NBiB3GZ9TMreK39DJ9zC83p2r0awP1hduhkCJI2QOsNX7fna6e2edVt7rxOEe19So83eDNBbJ6bfV7YbkEMqUJxNHWC6MIDCrCbFf8QlT3fnPsb0IHMa9aJRe/TvgI+aR+nKjRhvVymXddCBAy5hYb/I66omx4BGbl7+9HPo/w/c3m+vCJIu6IQZFVAmsoP6pft9aYVXgkz20C4I/4tF0YlDuH617PT3DeCjf+MG4Mgh9JiXJ2Jt8U6NH1tlXiS/F6OjPGFB7UrFw1o2e0KhX+l/qJEslf5Xc35vmbELf1Fy7QNVttZ2H5OXYrWhsV8EOmpN+KcVQI= + bucket: ray-wheels + acl: public_read + region: us-west-2 + local_dir: .whl + upload-dir: latest + skip_cleanup: true + only: + - master + on: + repo: ray-project/ray + condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 From 87e107edd8191e4f6b325ff71b679eca353e05c3 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 26 Feb 2018 11:35:51 -0800 Subject: [PATCH 06/13] [tune] Sync logs from workers and improve tensorboard reporting (#1567) --- python/ray/tune/cluster_info.py | 23 +++++++++ python/ray/tune/log_sync.py | 88 ++++++++++++++++++++++++++------- python/ray/tune/logger.py | 50 ++++++++++++------- python/ray/tune/result.py | 3 ++ python/ray/tune/trainable.py | 3 ++ python/ray/tune/trial_runner.py | 1 + 6 files changed, 133 insertions(+), 35 deletions(-) create mode 100644 python/ray/tune/cluster_info.py diff --git a/python/ray/tune/cluster_info.py b/python/ray/tune/cluster_info.py new file mode 100644 index 000000000000..23632d660ab7 --- /dev/null +++ b/python/ray/tune/cluster_info.py @@ -0,0 +1,23 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import getpass +import os + + +def get_ssh_user(): + """Returns ssh username for connecting to cluster workers.""" + + return getpass.getuser() + + +# TODO(ekl) this currently only works for clusters launched with +# ray create_or_update +def get_ssh_key(): + """Returns ssh key to connecting to cluster workers.""" + + path = os.path.expanduser("~/ray_bootstrap_key.pem") + if os.path.exists(path): + return path + return None diff --git a/python/ray/tune/log_sync.py b/python/ray/tune/log_sync.py index 6f404e4d3679..c41100521f2f 100644 --- a/python/ray/tune/log_sync.py +++ b/python/ray/tune/log_sync.py @@ -4,9 +4,12 @@ import distutils.spawn import os +import pipes import subprocess import time +import ray +from ray.tune.cluster_info import get_ssh_key, get_ssh_user from ray.tune.error import TuneError from ray.tune.result import DEFAULT_RESULTS_DIR @@ -15,20 +18,21 @@ _syncers = {} -def get_syncer(local_dir, remote_dir): - if not remote_dir.startswith("s3://"): - raise TuneError("Upload uri must start with s3://") +def get_syncer(local_dir, remote_dir=None): + if remote_dir: + if not remote_dir.startswith("s3://"): + raise TuneError("Upload uri must start with s3://") - if not distutils.spawn.find_executable("aws"): - raise TuneError("Upload uri requires awscli tool to be installed") + if not distutils.spawn.find_executable("aws"): + raise TuneError("Upload uri requires awscli tool to be installed") - if local_dir.startswith(DEFAULT_RESULTS_DIR + "/"): - rel_path = os.path.relpath(local_dir, DEFAULT_RESULTS_DIR) - remote_dir = os.path.join(remote_dir, rel_path) + if local_dir.startswith(DEFAULT_RESULTS_DIR + "/"): + rel_path = os.path.relpath(local_dir, DEFAULT_RESULTS_DIR) + remote_dir = os.path.join(remote_dir, rel_path) key = (local_dir, remote_dir) if key not in _syncers: - _syncers[key] = _S3LogSyncer(local_dir, remote_dir) + _syncers[key] = _LogSyncer(local_dir, remote_dir) return _syncers[key] @@ -38,23 +42,64 @@ def wait_for_log_sync(): syncer.wait() -class _S3LogSyncer(object): - def __init__(self, local_dir, remote_dir): +class _LogSyncer(object): + """Log syncer for tune. + + This syncs files from workers to the local node, and optionally also from + the local node to a remote directory (e.g. S3).""" + + def __init__(self, local_dir, remote_dir=None): self.local_dir = local_dir self.remote_dir = remote_dir self.last_sync_time = 0 self.sync_process = None - print("Created S3LogSyncer for {} -> {}".format(local_dir, remote_dir)) + self.local_ip = ray.services.get_node_ip_address() + self.worker_ip = None + print("Created LogSyncer for {} -> {}".format(local_dir, remote_dir)) + + def set_worker_ip(self, worker_ip): + """Set the worker ip to sync logs from.""" + + self.worker_ip = worker_ip def sync_if_needed(self): if time.time() - self.last_sync_time > 300: self.sync_now() def sync_now(self, force=False): - print( - "Syncing files from {} -> {}".format( - self.local_dir, self.remote_dir)) self.last_sync_time = time.time() + if not self.worker_ip: + print( + "Worker ip unknown, skipping log sync for {}".format( + self.local_dir)) + return + + if self.worker_ip == self.local_ip: + worker_to_local_sync_cmd = None # don't need to rsync + else: + ssh_key = get_ssh_key() + ssh_user = get_ssh_user() + if ssh_key is None or ssh_user is None: + print( + "Error: log sync requires cluster to be setup with " + "`ray create_or_update`.") + return + if not distutils.spawn.find_executable("rsync"): + print("Error: log sync requires rsync to be installed.") + return + worker_to_local_sync_cmd = ( + ("""rsync -avz -e "ssh -i '{}' -o ConnectTimeout=120s """ + """-o StrictHostKeyChecking=no" '{}@{}:{}/' '{}/'""").format( + ssh_key, ssh_user, self.worker_ip, + pipes.quote(self.local_dir), pipes.quote(self.local_dir))) + + if self.remote_dir: + local_to_remote_sync_cmd = ( + "aws s3 sync '{}' '{}'".format( + pipes.quote(self.local_dir), pipes.quote(self.remote_dir))) + else: + local_to_remote_sync_cmd = None + if self.sync_process: self.sync_process.poll() if self.sync_process.returncode is None: @@ -63,8 +108,17 @@ def sync_now(self, force=False): else: print("Warning: last sync is still in progress, skipping") return - self.sync_process = subprocess.Popen( - ["aws", "s3", "sync", self.local_dir, self.remote_dir]) + + if worker_to_local_sync_cmd or local_to_remote_sync_cmd: + final_cmd = "" + if worker_to_local_sync_cmd: + final_cmd += worker_to_local_sync_cmd + if local_to_remote_sync_cmd: + if final_cmd: + final_cmd += " && " + final_cmd += local_to_remote_sync_cmd + print("Running log sync: {}".format(final_cmd)) + self.sync_process = subprocess.Popen(final_cmd, shell=True) def wait(self): if self.sync_process: diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 4c3308498d07..952049b338ab 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -24,10 +24,6 @@ class Logger(object): multiple formats (TensorBoard, rllab/viskit, plain json) at once. """ - _attrs_to_log = [ - "time_this_iter_s", "mean_loss", "mean_accuracy", - "episode_reward_mean", "episode_len_mean"] - def __init__(self, config, logdir, upload_uri=None): self.config = config self.logdir = logdir @@ -47,6 +43,11 @@ def close(self): pass + def flush(self): + """Flushes all disk writes to storage.""" + + pass + class UnifiedLogger(Logger): """Unified result logger for TensorBoard, rllab/viskit, plain json. @@ -60,22 +61,22 @@ def _init(self): print("TF not installed - cannot log with {}...".format(cls)) continue self._loggers.append(cls(self.config, self.logdir, self.uri)) - if self.uri: - self._log_syncer = get_syncer(self.logdir, self.uri) - else: - self._log_syncer = None + self._log_syncer = get_syncer(self.logdir, self.uri) def on_result(self, result): for logger in self._loggers: logger.on_result(result) - if self._log_syncer: - self._log_syncer.sync_if_needed() + self._log_syncer.set_worker_ip(result.node_ip) + self._log_syncer.sync_if_needed() def close(self): for logger in self._loggers: logger.close() - if self._log_syncer: - self._log_syncer.sync_now(force=True) + self._log_syncer.sync_now(force=True) + + def flush(self): + self._log_syncer.sync_now(force=True) + self._log_syncer.wait() class NoopLogger(Logger): @@ -103,17 +104,30 @@ def close(self): self.local_out.close() +def to_tf_values(result, path): + values = [] + for attr, value in result.items(): + if value is not None: + if type(value) in [int, float]: + values.append(tf.Summary.Value( + tag="/".join(path + [attr]), + simple_value=value)) + elif type(value) is dict: + values.extend(to_tf_values(value, path + [attr])) + return values + + class _TFLogger(Logger): def _init(self): self._file_writer = tf.summary.FileWriter(self.logdir) def on_result(self, result): - values = [] - for attr in Logger._attrs_to_log: - if getattr(result, attr) is not None: - values.append(tf.Summary.Value( - tag="ray/tune/{}".format(attr), - simple_value=getattr(result, attr))) + tmp = result._asdict() + for k in [ + "config", "pid", "timestamp", "time_total_s", + "timesteps_total"]: + del tmp[k] # not useful to tf log these + values = to_tf_values(tmp, ["ray", "tune"]) train_stats = tf.Summary(value=values) self._file_writer.add_summary(train_stats, result.timesteps_total) diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 365d3147e82f..5aad51cea570 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -85,6 +85,9 @@ # (Auto-filled) The hostname of the machine hosting the training process. "hostname", + # (Auto-filled) The node ip of the machine hosting the training process. + "node_ip", + # (Auto=filled) The current hyperparameter configuration. "config", ]) diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index cebaf3d3445b..2d06bc92e538 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -13,6 +13,7 @@ import time import uuid +import ray from ray.tune import TuneError from ray.tune.logger import UnifiedLogger from ray.tune.result import DEFAULT_RESULTS_DIR @@ -87,6 +88,7 @@ def __init__(self, config=None, registry=None, logger_creator=None): self._timesteps_total = 0 self._setup() self._initialize_ok = True + self._local_ip = ray.services.get_node_ip_address() def train(self): """Runs one logical iteration of training. @@ -136,6 +138,7 @@ def train(self): neg_mean_loss=neg_loss, pid=os.getpid(), hostname=os.uname()[1], + node_ip=self._local_ip, config=self.config) self._result_logger.on_result(result) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 9345621709bb..61b16e450204 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -252,6 +252,7 @@ def _try_recover(self, trial, error_msg): try: print("Attempting to recover trial state from last checkpoint") trial.stop(error=True, error_msg=error_msg, stop_logger=False) + trial.result_logger.flush() # make sure checkpoint is synced trial.start() self._running[trial.train_remote()] = trial except Exception: From d78a22f94c7413a093b0a2a20087131f1e6e85ad Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 26 Feb 2018 18:26:38 -0800 Subject: [PATCH 07/13] [DataFrame] Implement IO for ray_df (#1599) * Add parquet-cpp to gitignore * Add read_csv and read_parquet * Gitignore pytest_cache * Fix flake8 * Add io to __init__ * Changing Index. Currently running tests, but so far untested. * Removing issue of reassigning DF in from_pandas * Fixing lint * Fix bug * Fix bug * Fix bug * Better performance * Fixing index issue with sum * Address comments * Update io with index * Updating performance and implementation. Adding tests * Fixing off-by-1 * Fix lint * Address Comments * Make pop compatible with new to_pandas * Format Code * Cleanup some index issue * Bug fix: assigned reset_index back * Remove unused debug line --- .gitignore | 4 + python/ray/dataframe/__init__.py | 28 ++- python/ray/dataframe/dataframe.py | 59 ++++-- python/ray/dataframe/io.py | 262 +++++++++++++++++++++++++++ python/ray/dataframe/test/test_io.py | 91 ++++++++++ 5 files changed, 427 insertions(+), 17 deletions(-) create mode 100644 python/ray/dataframe/io.py create mode 100644 python/ray/dataframe/test/test_io.py diff --git a/.gitignore b/.gitignore index efdeba1bb258..924bbe4c7684 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ /src/thirdparty/boost_1_60_0/ /src/thirdparty/catapult/ /src/thirdparty/flatbuffers/ +/src/thirdparty/parquet-cpp # Files generated by flatc should be ignored /src/common/format/*.py @@ -137,3 +138,6 @@ build /site/Gemfile.lock /site/.sass-cache /site/_site + +# Pytest Cache +**/.pytest_cache diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 8f315be6ad07..6ba12b91ab32 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -2,9 +2,27 @@ from __future__ import division from __future__ import print_function -from .dataframe import DataFrame -from .dataframe import from_pandas -from .dataframe import to_pandas -from .series import Series +DEFAULT_NPARTITIONS = 10 -__all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"] + +def set_npartition_default(n): + global DEFAULT_NPARTITIONS + DEFAULT_NPARTITIONS = n + + +def get_npartitions(): + return DEFAULT_NPARTITIONS + + +# We import these file after above two function +# because they depend on npartitions. +from .dataframe import DataFrame # noqa: 402 +from .dataframe import from_pandas # noqa: 402 +from .dataframe import to_pandas # noqa: 402 +from .series import Series # noqa: 402 +from .io import (read_csv, read_parquet) # noqa: 402 + +__all__ = [ + "DataFrame", "from_pandas", "to_pandas", "Series", "read_csv", + "read_parquet" +] diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 0db5a4e4031b..be1d3b9f40ed 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -373,16 +373,29 @@ def transpose(self, *args, **kwargs): temp_index = [idx for _ in range(len(self._df)) for idx in self.columns] - temp_columns = self.index local_transpose = self._map_partitions( lambda df: df.transpose(*args, **kwargs), index=temp_index) local_transpose.columns = temp_columns # Sum will collapse the NAs from the groupby - return local_transpose.reduce_by_index( + df = local_transpose.reduce_by_index( lambda df: df.apply(lambda x: x), axis=1) + # Reassign the columns within partition to self.index. + # We have to use _depoly_func instead of _map_partition due to + # new_labels argument + def _reassign_columns(df, new_labels): + df.columns = new_labels + return df + df._df = [ + _deploy_func.remote( + _reassign_columns, + part, + self.index) for part in df._df] + + return df + T = property(transpose) def dropna(self, axis, how, thresh=None, subset=[], inplace=False): @@ -563,9 +576,15 @@ def count(self, axis=0, level=None, numeric_only=False): for _ in range(len(self._df)) for idx in self.columns] - return sum(ray.get(self._map_partitions(lambda df: df.count( - axis=axis, level=level, numeric_only=numeric_only - ), index=temp_index)._df)) + collapsed_df = sum( + ray.get( + self._map_partitions( + lambda df: df.count( + axis=axis, + level=level, + numeric_only=numeric_only), + index=temp_index)._df)) + return collapsed_df def cov(self, min_periods=None): raise NotImplementedError("Not Yet implemented.") @@ -865,7 +884,9 @@ def iterrows(self): iters = ray.get([ _deploy_func.remote( lambda df: list(df.iterrows()), part) for part in self._df]) - return itertools.chain.from_iterable(iters) + iters = itertools.chain.from_iterable(iters) + series = map(lambda idx_series_tuple: idx_series_tuple[1], iters) + return zip(self.index, series) def items(self): """Iterator over (column name, Series) pairs. @@ -884,6 +905,7 @@ def items(self): def concat_iters(iterables): for partitions in zip(*iterables): series = pd.concat([_series for _, _series in partitions]) + series.index = self.index yield (series.name, series) return concat_iters(iters) @@ -919,7 +941,20 @@ def itertuples(self, index=True, name='Pandas'): _deploy_func.remote( lambda df: list(df.itertuples(index=index, name=name)), part) for part in self._df]) - return itertools.chain.from_iterable(iters) + iters = itertools.chain.from_iterable(iters) + + def _replace_index(row_tuple, idx): + # We need to use try-except here because + # isinstance(row_tuple, namedtuple) won't work. + try: + row_tuple = row_tuple._replace(Index=idx) + except AttributeError: # Tuple not namedtuple + row_tuple = (idx,) + row_tuple[1:] + return row_tuple + + if index: + iters = itertools.starmap(_replace_index, zip(iters, self.index)) + return iters def join(self, other, on=None, how='left', lsuffix='', rsuffix='', sort=False): @@ -1100,8 +1135,7 @@ def pop(self, item): popped = to_pandas(self._map_partitions( lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df - self.columns = [col for col in self.columns if col != item] - + self.columns = self.columns.drop(item) return popped def pow(self, other, axis='columns', level=None, fill_value=None): @@ -1949,13 +1983,14 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True): while len(temp_df) > chunksize: t_df = temp_df[:chunksize] lengths.append(len(t_df)) - # reindex here because we want a pd.RangeIndex within the partitions. - # It is smaller and sometimes faster. - t_df.reindex() + # reset_index here because we want a pd.RangeIndex + # within the partitions. It is smaller and sometimes faster. + t_df = t_df.reset_index(drop=True) top = ray.put(t_df) dataframes.append(top) temp_df = temp_df[chunksize:] else: + temp_df = temp_df.reset_index(drop=True) dataframes.append(ray.put(temp_df)) lengths.append(len(temp_df)) diff --git a/python/ray/dataframe/io.py b/python/ray/dataframe/io.py new file mode 100644 index 000000000000..7fa49ebb242b --- /dev/null +++ b/python/ray/dataframe/io.py @@ -0,0 +1,262 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from itertools import chain +from io import BytesIO +import os +import re + +from pyarrow.parquet import ParquetFile +import pandas as pd + +from .dataframe import ray, DataFrame +from . import get_npartitions + + +# Parquet +def read_parquet(path, engine='auto', columns=None, **kwargs): + """Load a parquet object from the file path, returning a DataFrame. + Ray DataFrame only supports pyarrow engine for now. + + Args: + path: The filepath of the parquet file. + We only support local files for now. + engine: Ray only support pyarrow reader. + This argument doesn't do anything for now. + kwargs: Pass into parquet's read_row_group function. + """ + pf = ParquetFile(path) + + n_rows = pf.metadata.num_rows + chunksize = n_rows // get_npartitions() + n_row_groups = pf.metadata.num_row_groups + + idx_regex = re.compile('__index_level_\d+__') + columns = [ + name for name in pf.metadata.schema.names if not idx_regex.match(name) + ] + + df_from_row_groups = [ + _read_parquet_row_group.remote(path, columns, i, kwargs) + for i in range(n_row_groups) + ] + splited_dfs = ray.get( + [_split_df.remote(df, chunksize) for df in df_from_row_groups]) + df_remotes = list(chain.from_iterable(splited_dfs)) + + return DataFrame(df_remotes, columns) + + +@ray.remote +def _read_parquet_row_group(path, columns, row_group_id, kwargs={}): + """Read a parquet row_group given file_path. + """ + pf = ParquetFile(path) + df = pf.read_row_group(row_group_id, columns=columns, **kwargs).to_pandas() + return df + + +@ray.remote +def _split_df(pd_df, chunksize): + """Split a pd_df into partitions. + + Returns: + remote_df_ids ([ObjectID]) + """ + dataframes = [] + + while len(pd_df) > chunksize: + t_df = pd_df[:chunksize] + t_df.reset_index(drop=True) + top = ray.put(t_df) + dataframes.append(top) + pd_df = pd_df[chunksize:] + else: + pd_df = pd_df.reset_index(drop=True) + dataframes.append(ray.put(pd_df)) + + return dataframes + + +# CSV +def _compute_offset(fn, npartitions): + """ + Calculate the currect bytes offsets for a csv file. + Return a list of (start, end) tuple where the end == \n or EOF. + """ + total_bytes = os.path.getsize(fn) + chunksize = total_bytes // npartitions + if chunksize == 0: + chunksize = 1 + + bio = open(fn, 'rb') + + offsets = [] + start = 0 + while start <= total_bytes: + bio.seek(chunksize, 1) # Move forward {chunksize} bytes + extend_line = bio.readline() # Move after the next \n + total_offset = chunksize + len(extend_line) + # The position of the \n we just crossed. + new_line_cursor = start + total_offset - 1 + offsets.append((start, new_line_cursor)) + start = new_line_cursor + 1 + + bio.close() + return offsets + + +def _get_firstline(file_path): + bio = open(file_path, 'rb') + first = bio.readline() + bio.close() + return first + + +def _infer_column(first_line): + return pd.read_csv(BytesIO(first_line)).columns + + +@ray.remote +def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}): + bio = open(fn, 'rb') + bio.seek(start) + to_read = header + bio.read(end - start) + bio.close() + return pd.read_csv(BytesIO(to_read), **kwargs) + + +def read_csv(filepath, + sep=',', + delimiter=None, + header='infer', + names=None, + index_col=None, + usecols=None, + squeeze=False, + prefix=None, + mangle_dupe_cols=True, + dtype=None, + engine=None, + converters=None, + true_values=None, + false_values=None, + skipinitialspace=False, + skiprows=None, + nrows=None, + na_values=None, + keep_default_na=True, + na_filter=True, + verbose=False, + skip_blank_lines=True, + parse_dates=False, + infer_datetime_format=False, + keep_date_col=False, + date_parser=None, + dayfirst=False, + iterator=False, + chunksize=None, + compression='infer', + thousands=None, + decimal=b'.', + lineterminator=None, + quotechar='"', + quoting=0, + escapechar=None, + comment=None, + encoding=None, + dialect=None, + tupleize_cols=None, + error_bad_lines=True, + warn_bad_lines=True, + skipfooter=0, + skip_footer=0, + doublequote=True, + delim_whitespace=False, + as_recarray=None, + compact_ints=None, + use_unsigned=None, + low_memory=True, + buffer_lines=None, + memory_map=False, + float_precision=None): + """Read csv file from local disk. + + Args: + filepath: + The filepath of the csv file. + We only support local files for now. + kwargs: Keyword arguments in pandas::from_csv + """ + kwargs = dict( + sep=sep, + delimiter=delimiter, + header=header, + names=names, + index_col=index_col, + usecols=usecols, + squeeze=squeeze, + prefix=prefix, + mangle_dupe_cols=mangle_dupe_cols, + dtype=dtype, + engine=engine, + converters=converters, + true_values=true_values, + false_values=false_values, + skipinitialspace=skipinitialspace, + skiprows=skiprows, + nrows=nrows, + na_values=na_values, + keep_default_na=keep_default_na, + na_filter=na_filter, + verbose=verbose, + skip_blank_lines=skip_blank_lines, + parse_dates=parse_dates, + infer_datetime_format=infer_datetime_format, + keep_date_col=keep_date_col, + date_parser=date_parser, + dayfirst=dayfirst, + iterator=iterator, + chunksize=chunksize, + compression=compression, + thousands=thousands, + decimal=decimal, + lineterminator=lineterminator, + quotechar=quotechar, + quoting=quoting, + escapechar=escapechar, + comment=comment, + encoding=encoding, + dialect=dialect, + tupleize_cols=tupleize_cols, + error_bad_lines=error_bad_lines, + warn_bad_lines=warn_bad_lines, + skipfooter=skipfooter, + skip_footer=skip_footer, + doublequote=doublequote, + delim_whitespace=delim_whitespace, + as_recarray=as_recarray, + compact_ints=compact_ints, + use_unsigned=use_unsigned, + low_memory=low_memory, + buffer_lines=buffer_lines, + memory_map=memory_map, + float_precision=float_precision) + + offsets = _compute_offset(filepath, get_npartitions()) + + first_line = _get_firstline(filepath) + columns = _infer_column(first_line) + + df_obj_ids = [] + for start, end in offsets: + if start != 0: + df = _read_csv_with_offset.remote( + filepath, start, end, header=first_line, kwargs=kwargs) + else: + df = _read_csv_with_offset.remote( + filepath, start, end, kwargs=kwargs) + df_obj_ids.append(df) + + return DataFrame(df_obj_ids, columns) diff --git a/python/ray/dataframe/test/test_io.py b/python/ray/dataframe/test/test_io.py new file mode 100644 index 000000000000..64ea2c0ff41f --- /dev/null +++ b/python/ray/dataframe/test/test_io.py @@ -0,0 +1,91 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import numpy as np +import pandas as pd +import ray +import ray.dataframe as rdf +import ray.dataframe.io as io +import os + +TEST_PARQUET_FILENAME = 'test.parquet' +TEST_CSV_FILENAME = 'test.csv' +SMALL_ROW_SIZE = 2000 +LARGE_ROW_SIZE = 7e6 + + +@pytest.fixture +def ray_df_equals_pandas(ray_df, pandas_df): + return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + + +@pytest.fixture +def setup_parquet_file(row_size, force=False): + if os.path.exists(TEST_PARQUET_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_parquet(TEST_PARQUET_FILENAME) + + +@pytest.fixture +def teardown_parquet_file(): + if os.path.exists(TEST_PARQUET_FILENAME): + os.remove(TEST_PARQUET_FILENAME) + + +@pytest.fixture +def setup_csv_file(row_size, force=False): + if os.path.exists(TEST_CSV_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_csv(TEST_CSV_FILENAME) + + +@pytest.fixture +def teardown_csv_file(): + if os.path.exists(TEST_CSV_FILENAME): + os.remove(TEST_CSV_FILENAME) + + +def test_from_parquet_small(): + ray.init() + + setup_parquet_file(SMALL_ROW_SIZE) + + pd_df = pd.read_parquet(TEST_PARQUET_FILENAME) + ray_df = io.read_parquet(TEST_PARQUET_FILENAME) + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_parquet_file() + + +def test_from_parquet_large(): + setup_parquet_file(LARGE_ROW_SIZE) + + pd_df = pd.read_parquet(TEST_PARQUET_FILENAME) + ray_df = io.read_parquet(TEST_PARQUET_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_parquet_file() + + +def test_from_csv(): + setup_csv_file(SMALL_ROW_SIZE) + + pd_df = pd.read_csv(TEST_CSV_FILENAME) + ray_df = io.read_csv(TEST_CSV_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_csv_file() From 48bd7b147dcb968ca161329392ef46c039f2a80a Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Mon, 26 Feb 2018 18:31:00 -0800 Subject: [PATCH 08/13] [DataFrame] Added Implementations for equals, query, and some other operations (#1610) * Implemented Dataframe __abs__ and __iter__ * implemented __neg__ * implemented query * Implemented equals * Implemented __eq__ and __ne__ operators * Added method level comments * resolved flake8 comments * resolving devin's comments --- python/ray/dataframe/dataframe.py | 115 ++++++++++++++++++-- python/ray/dataframe/test/test_dataframe.py | 85 +++++++++++---- 2 files changed, 170 insertions(+), 30 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index be1d3b9f40ed..113d415103f4 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -9,6 +9,10 @@ from pandas._libs import lib from pandas.core.dtypes.cast import maybe_upcast_putmask from pandas.compat import lzip +from pandas.core.dtypes.common import ( + is_bool_dtype, + is_numeric_dtype, + is_timedelta64_dtype) import warnings import numpy as np @@ -31,8 +35,7 @@ def __init__(self, df, columns, index=None): assert(len(df) > 0) self._df = df - self._lengths = [_deploy_func.remote(_get_lengths, d) - for d in self._df] + self._compute_lengths() self.columns = columns # this _index object is a pd.DataFrame @@ -80,6 +83,12 @@ def _default_index(self): index = property(_get_index, _set_index) + def _compute_lengths(self): + """Updates the stored lengths of DataFrame partions + """ + self._lengths = [_deploy_func.remote(_get_lengths, d) + for d in self._df] + def _get_lengths(self): """Gets the lengths for each partition and caches it if it wasn't. @@ -200,6 +209,21 @@ def _map_partitions(self, func, index=None): return DataFrame(new_df, self.columns, index=index) + def _update_inplace(self, df=None, columns=None, index=None): + """Updates the current DataFrame inplace + """ + assert(len(df) > 0) + + if df: + self._df = df + if columns: + self.columns = columns + if index: + self.index = index + + self._compute_lengths() + self._index = self._default_index() + def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -630,7 +654,37 @@ def eq(self, other, axis='columns', level=None): raise NotImplementedError("Not Yet implemented.") def equals(self, other): - raise NotImplementedError("Not Yet implemented.") + """ + Checks if other DataFrame is elementwise equal to the current one + + Returns: + Boolean: True if equal, otherwise False + """ + def helper(df, index, other_series): + return df.iloc[index['index_within_partition']] \ + .equals(other_series) + + results = [] + other_partition = None + other_df = None + for i, idx in other._index.iterrows(): + if idx['partition'] != other_partition: + other_df = ray.get(other._df[idx['partition']]) + other_partition = idx['partition'] + # TODO: group series here into full df partitions to reduce + # the number of remote calls to helper + other_series = other_df.iloc[idx['index_within_partition']] + curr_index = self._index.iloc[i] + curr_df = self._df[int(curr_index['partition'])] + results.append(_deploy_func.remote(helper, + curr_df, + curr_index, + other_series)) + + for r in results: + if not ray.get(r): + return False + return True def eval(self, expr, inplace=False, **kwargs): raise NotImplementedError("Not Yet implemented.") @@ -1154,7 +1208,18 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, raise NotImplementedError("Not Yet implemented.") def query(self, expr, inplace=False, **kwargs): - raise NotImplementedError("Not Yet implemented.") + """Queries the Dataframe with a boolean expression + + Returns: + A new DataFrame if inplace=False + """ + new_dfs = [_deploy_func.remote(lambda df: df.query(expr, **kwargs), + part) for part in self._df] + + if inplace: + self._update_inplace(new_dfs) + else: + return DataFrame(new_dfs, self.columns) def radd(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1703,7 +1768,12 @@ def __hash__(self): raise NotImplementedError("Not Yet implemented.") def __iter__(self): - raise NotImplementedError("Not Yet implemented.") + """Iterate over the columns + + Returns: + An Iterator over the columns of the dataframe. + """ + return iter(self.columns) def __contains__(self, key): return key in self.columns @@ -1715,7 +1785,12 @@ def __bool__(self): raise NotImplementedError("Not Yet implemented.") def __abs__(self): - raise NotImplementedError("Not Yet implemented.") + """Creates a modified DataFrame by elementwise taking the absolute value + + Returns: + A modified DataFrame + """ + return self.abs() def __round__(self, decimals=0): raise NotImplementedError("Not Yet implemented.") @@ -1794,10 +1869,20 @@ def __ge__(self, other): raise NotImplementedError("Not Yet implemented.") def __eq__(self, other): - raise NotImplementedError("Not Yet implemented.") + """Computes the equality of this DataFrame with another + + Returns: + True, if the DataFrames are equal. False otherwise. + """ + return self.equals(other) def __ne__(self, other): - raise NotImplementedError("Not Yet implemented.") + """Checks that this DataFrame is not equal to another + + Returns: + True, if the DataFrames are not equal. False otherwise. + """ + return not self.equals(other) def __add__(self, other): raise NotImplementedError("Not Yet implemented.") @@ -1824,7 +1909,19 @@ def __isub__(self, other): raise NotImplementedError("Not Yet implemented.") def __neg__(self): - raise NotImplementedError("Not Yet implemented.") + """Computes an element wise negative DataFrame + + Returns: + A modified DataFrame where every element is the negation of before + """ + for t in self.dtypes: + if not (is_bool_dtype(t) + or is_numeric_dtype(t) + or is_timedelta64_dtype(t)): + raise TypeError("Unary negative expects numeric dtype, not {}" + .format(t)) + + return self._map_partitions(lambda df: df.__neg__()) def __floordiv__(self, other): raise NotImplementedError("Not Yet implemented.") diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 7dd63059ba99..741259e3bd89 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -160,6 +160,9 @@ def test_int_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -185,10 +188,14 @@ def test_int_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -256,6 +263,9 @@ def test_float_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -281,10 +291,14 @@ def test_float_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -346,6 +360,9 @@ def test_mixed_dtype_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -370,14 +387,21 @@ def test_mixed_dtype_dataframe(): with pytest.raises(TypeError): test_abs(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + + with pytest.raises(TypeError): + test___neg__(ray_df, pandas_df) + + test___iter__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -442,6 +466,9 @@ def test_nan_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -467,10 +494,14 @@ def test_nan_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -828,10 +859,19 @@ def test_eq(): def test_equals(): - ray_df = create_test_dataframe() + pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3], + 'col2': [2, 3, 4, 1]}) + ray_df1 = rdf.from_pandas(pandas_df1, 2) + ray_df2 = rdf.from_pandas(pandas_df1, 3) - with pytest.raises(NotImplementedError): - ray_df.equals(None) + assert ray_df1.equals(ray_df2) + + pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3], + 'col2': [2, 3, 5, 1]}) + ray_df3 = rdf.from_pandas(pandas_df2, 4) + + assert not ray_df3.equals(ray_df1) + assert not ray_df3.equals(ray_df2) def test_eval(): @@ -1306,11 +1346,12 @@ def test_quantile(): ray_df.quantile() -def test_query(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_query(ray_df, pandas_df, funcs): - with pytest.raises(NotImplementedError): - ray_df.query(None) + for f in funcs: + pandas_df_new, ray_df_new = pandas_df.query(f), ray_df.query(f) + assert pandas_df_new.equals(rdf.to_pandas(ray_df_new)) def test_radd(): @@ -1885,11 +1926,10 @@ def test___unicode__(): ray_df.__unicode__() -def test___neg__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__neg__() +@pytest.fixture +def test___neg__(ray_df, pd_df): + ray_df_neg = ray_df.__neg__() + assert pd_df.__neg__().equals(rdf.to_pandas(ray_df_neg)) def test___invert__(): @@ -1906,11 +1946,16 @@ def test___hash__(): ray_df.__hash__() -def test___iter__(): - ray_df = create_test_dataframe() +@pytest.fixture +def test___iter__(ray_df, pd_df): + ray_iterator = ray_df.__iter__() - with pytest.raises(NotImplementedError): - ray_df.__iter__() + # Check that ray_iterator implements the iterator interface + assert hasattr(ray_iterator, '__iter__') + assert hasattr(ray_iterator, 'next') or hasattr(ray_iterator, '__next__') + + pd_iterator = pd_df.__iter__() + assert list(ray_iterator) == list(pd_iterator) @pytest.fixture @@ -1933,11 +1978,9 @@ def test___bool__(): ray_df.__bool__() -def test___abs__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__abs__() +@pytest.fixture +def test___abs__(ray_df, pandas_df): + assert(ray_df_equals_pandas(abs(ray_df), abs(pandas_df))) def test___round__(): From 009d19313bd2cd5c21f69cc73dd05b5ed25ee6c3 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 26 Feb 2018 19:13:38 -0800 Subject: [PATCH 09/13] Revert whitespace --- python/ray/dataframe/dataframe.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index bac5300ed010..93193561bb61 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -608,7 +608,6 @@ def count(self, axis=0, level=None, numeric_only=False): level=level, numeric_only=numeric_only), index=temp_index)._df)) - return collapsed_df def cov(self, min_periods=None): @@ -1190,7 +1189,6 @@ def pop(self, item): lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df self.columns = self.columns.drop(item) - return popped def pow(self, other, axis='columns', level=None, fill_value=None): From 1d0402ed0f64e8332ff7aeb57b686e8287cb6fea Mon Sep 17 00:00:00 2001 From: simon-mo Date: Mon, 26 Feb 2018 19:38:08 -0800 Subject: [PATCH 10/13] Format code --- python/ray/dataframe/test/test_dataframe.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 96abaf4b2fd2..31087066a818 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -223,7 +223,6 @@ def test_int_dataframe(): test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) - test_loc(ray_df, pandas_df) test_iloc(ray_df, pandas_df) @@ -329,7 +328,6 @@ def test_float_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) - test_loc(ray_df, pandas_df) test_iloc(ray_df, pandas_df) @@ -874,14 +872,14 @@ def test_eq(): def test_equals(): pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3], - 'col2': [2, 3, 4, 1]}) + 'col2': [2, 3, 4, 1]}) ray_df1 = rdf.from_pandas(pandas_df1, 2) ray_df2 = rdf.from_pandas(pandas_df1, 3) assert ray_df1.equals(ray_df2) pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3], - 'col2': [2, 3, 5, 1]}) + 'col2': [2, 3, 5, 1]}) ray_df3 = rdf.from_pandas(pandas_df2, 4) assert not ray_df3.equals(ray_df1) From f43328f332c4a92d53d48bc218d7d8d0239e3dc1 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Mon, 26 Feb 2018 21:12:04 -0800 Subject: [PATCH 11/13] moved _default_index to remote fn (#1617) --- python/ray/dataframe/dataframe.py | 48 ++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 113d415103f4..7ecf89a84046 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -40,7 +40,7 @@ def __init__(self, df, columns, index=None): # this _index object is a pd.DataFrame # and we use that DataFrame's Index to index the rows. - self._index = self._default_index() + self._index = _default_index.remote(self) if index is not None: self.index = index @@ -67,21 +67,27 @@ def _set_index(self, new_index): """ self._index.index = new_index - def _default_index(self): - """Create a default index, which is a RangeIndex + index = property(_get_index, _set_index) + + def _get__index(self): + """Get the _index for this DataFrame. Returns: - The pd.RangeIndex object that represents this DataFrame. + The default index. """ - dest_indices = {"partition": - [i for i in range(len(self._lengths)) - for j in range(self._lengths[i])], - "index_within_partition": - [j for i in range(len(self._lengths)) - for j in range(self._lengths[i])]} - return pd.DataFrame(dest_indices) + if isinstance(self._index_cache, ray.local_scheduler.ObjectID): + self._index_cache = ray.get(self._index_cache) + return self._index_cache - index = property(_get_index, _set_index) + def _set__index(self, new__index): + """Set the _index for this DataFrame. + + Args: + new__index: The new default index to set. + """ + self._index_cache = new__index + + _index = property(_get__index, _set__index) def _compute_lengths(self): """Updates the stored lengths of DataFrame partions @@ -1315,7 +1321,7 @@ def _maybe_casted_values(index, labels=None): values, mask, np.nan) return values - new_index = new_obj._default_index().index + new_index = ray.get(_default_index.remote(new_obj)).index if level is not None: if not isinstance(level, (tuple, list)): level = [level] @@ -2107,3 +2113,19 @@ def to_pandas(df): pd_df.index = df.index pd_df.columns = df.columns return pd_df + + +@ray.remote +def _default_index(df): + """Create a default index, which is a RangeIndex + + Returns: + The pd.RangeIndex object that represents this DataFrame. + """ + dest_indices = {"partition": + [i for i in range(len(df._lengths)) + for j in range(df._lengths[i])], + "index_within_partition": + [j for i in range(len(df._lengths)) + for j in range(df._lengths[i])]} + return pd.DataFrame(dest_indices) From b79597dc0063c714d7fa05abb5726e13368b6541 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Feb 2018 22:22:05 -0800 Subject: [PATCH 12/13] [rllib] PPO Thread Limit (#1568) --- python/ray/rllib/ppo/ppo.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/rllib/ppo/ppo.py b/python/ray/rllib/ppo/ppo.py index 8228764ecbf8..5811b2e606de 100644 --- a/python/ray/rllib/ppo/ppo.py +++ b/python/ray/rllib/ppo/ppo.py @@ -41,6 +41,8 @@ "device_count": {"CPU": 4}, "log_device_placement": False, "allow_soft_placement": True, + "intra_op_parallelism_threads": 1, + "inter_op_parallelism_threads": 2, }, # Batch size for policy evaluations for rollouts "rollout_batchsize": 1, From ee7ac5560f35a8da09ee1d6c3e0df64353afddb0 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 26 Feb 2018 23:57:25 -0800 Subject: [PATCH 13/13] Address comments --- python/ray/dataframe/indexing.py | 50 +++++++++++++------------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py index c045a0bb3ca4..aa94fd565c5f 100644 --- a/python/ray/dataframe/indexing.py +++ b/python/ray/dataframe/indexing.py @@ -3,19 +3,6 @@ from .dataframe import _deploy_func -def _row_tuples_to_dict(row_tuples): - """A convenient helper function convert: - [(partition, idx_in_partition)] to {partition: [idx_in_partition]} - """ - d = {} - for partition, idx_in_partition in row_tuples: - if partition in d: - d[partition].append(idx_in_partition) - else: - d[partition] = [idx_in_partition] - return d - - class _Location_Indexer_Base(): """Base class for location indexer like loc and iloc This class abstract away commonly used method @@ -35,14 +22,15 @@ def _get_lookup_dict(self, ray_partition_idx): if ray_partition_idx.ndim == 1: # Single row matched position = (ray_partition_idx['partition'], ray_partition_idx['index_within_partition']) - rows_to_lookup = [position] - + rows_to_lookup = {position[0]: [position[1]]} if ray_partition_idx.ndim == 2: # Multiple rows matched - rows_to_lookup = [(row['partition'], row['index_within_partition']) - for _, row in ray_partition_idx.iterrows()] - - lookup_dict = _row_tuples_to_dict(rows_to_lookup) - return lookup_dict + # We copy ray_partition_idx because it allows us to + # do groupby. This might not be the most efficient method. + # And have room to optimize. + ray_partition_idx = ray_partition_idx.copy() + rows_to_lookup = ray_partition_idx.groupby('partition').aggregate( + lambda x: list(x)).to_dict()['index_within_partition'] + return rows_to_lookup def locate_2d(self, row_label, col_label): pass @@ -57,17 +45,19 @@ def _map_partition(self, lookup_dict, col_lst, indexer='loc'): assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc" if indexer == 'loc': - def retrieve_func( - df, idx_lst, col_label): return df.loc[idx_lst, col_label] + + def retrieve_func(df, idx_lst, col_label): + return df.loc[idx_lst, col_label] elif indexer == 'iloc': - def retrieve_func( - df, idx_lst, col_idx): return df.iloc[idx_lst, col_idx] - - retrieved_rows_remote = [] - for partition, idx_to_lookup in lookup_dict.items(): - part_remote = _deploy_func.remote( - retrieve_func, self.df._df[partition], idx_to_lookup, col_lst) - retrieved_rows_remote.append(part_remote) + + def retrieve_func(df, idx_lst, col_idx): + return df.iloc[idx_lst, col_idx] + + retrieved_rows_remote = [ + _deploy_func.remote(retrieve_func, self.df._df[partition], + idx_to_lookup, col_lst) + for partition, idx_to_lookup in lookup_dict.items() + ] return retrieved_rows_remote