diff --git a/.travis.yml b/.travis.yml index 0eed43fc3f5e..e91997b6ba0a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -146,18 +146,33 @@ script: - python -m pytest python/ray/rllib/test/test_evaluators.py deploy: - provider: s3 - access_key_id: AKIAJ2L7XDUSZVTXI5QA - secret_access_key: - secure: MZbzbQvfn9QI2H19Ai0EZju5BERhCMA8/piHU29syvtmoDqd/QdMW0DTHhLAqlaCrGeMGCx0y6sB9DjX46ZKndQ/cgSQDesfNC300NTZZlWyYr7K86yhj+hgIpYXs+G28g1hmQOUzCWL8kAgfeMle9GvKkZ7DkhdRszg8bPyIXdKtjQGO5RRrrjQBgIzjvOiWFOD9lDzula5j8uV4tsiXT8nQjuiOIwmAxB2r7zXHc/Vsr9wBAeQ9Fq6aomEGuuVscoMhZqWc0SHOOz0dIDdlJFF+W4Effw6l9u0Fe262g0WfsnS3PqF7a6eBC0qkf3yH8joAlvquVxWp+dr7dBzy0gGZysD/pqF/NBiB3GZ9TMreK39DJ9zC83p2r0awP1hduhkCJI2QOsNX7fna6e2edVt7rxOEe19So83eDNBbJ6bfV7YbkEMqUJxNHWC6MIDCrCbFf8QlT3fnPsb0IHMa9aJRe/TvgI+aR+nKjRhvVymXddCBAy5hYb/I66omx4BGbl7+9HPo/w/c3m+vCJIu6IQZFVAmsoP6pft9aYVXgkz20C4I/4tF0YlDuH617PT3DeCjf+MG4Mgh9JiXJ2Jt8U6NH1tlXiS/F6OjPGFB7UrFw1o2e0KhX+l/qJEslf5Xc35vmbELf1Fy7QNVttZ2H5OXYrWhsV8EOmpN+KcVQI= - bucket: ray-wheels - acl: public_read - region: us-west-2 - local_dir: .whl - upload-dir: $TRAVIS_COMMIT - skip_cleanup: true - only: - - master - on: - repo: ray-project/ray - condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 + - provider: s3 + access_key_id: AKIAJ2L7XDUSZVTXI5QA + secret_access_key: + secure: MZbzbQvfn9QI2H19Ai0EZju5BERhCMA8/piHU29syvtmoDqd/QdMW0DTHhLAqlaCrGeMGCx0y6sB9DjX46ZKndQ/cgSQDesfNC300NTZZlWyYr7K86yhj+hgIpYXs+G28g1hmQOUzCWL8kAgfeMle9GvKkZ7DkhdRszg8bPyIXdKtjQGO5RRrrjQBgIzjvOiWFOD9lDzula5j8uV4tsiXT8nQjuiOIwmAxB2r7zXHc/Vsr9wBAeQ9Fq6aomEGuuVscoMhZqWc0SHOOz0dIDdlJFF+W4Effw6l9u0Fe262g0WfsnS3PqF7a6eBC0qkf3yH8joAlvquVxWp+dr7dBzy0gGZysD/pqF/NBiB3GZ9TMreK39DJ9zC83p2r0awP1hduhkCJI2QOsNX7fna6e2edVt7rxOEe19So83eDNBbJ6bfV7YbkEMqUJxNHWC6MIDCrCbFf8QlT3fnPsb0IHMa9aJRe/TvgI+aR+nKjRhvVymXddCBAy5hYb/I66omx4BGbl7+9HPo/w/c3m+vCJIu6IQZFVAmsoP6pft9aYVXgkz20C4I/4tF0YlDuH617PT3DeCjf+MG4Mgh9JiXJ2Jt8U6NH1tlXiS/F6OjPGFB7UrFw1o2e0KhX+l/qJEslf5Xc35vmbELf1Fy7QNVttZ2H5OXYrWhsV8EOmpN+KcVQI= + bucket: ray-wheels + acl: public_read + region: us-west-2 + local_dir: .whl + upload-dir: $TRAVIS_COMMIT + skip_cleanup: true + only: + - master + on: + repo: ray-project/ray + condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 + - provider: s3 + access_key_id: AKIAJ2L7XDUSZVTXI5QA + secret_access_key: + secure: MZbzbQvfn9QI2H19Ai0EZju5BERhCMA8/piHU29syvtmoDqd/QdMW0DTHhLAqlaCrGeMGCx0y6sB9DjX46ZKndQ/cgSQDesfNC300NTZZlWyYr7K86yhj+hgIpYXs+G28g1hmQOUzCWL8kAgfeMle9GvKkZ7DkhdRszg8bPyIXdKtjQGO5RRrrjQBgIzjvOiWFOD9lDzula5j8uV4tsiXT8nQjuiOIwmAxB2r7zXHc/Vsr9wBAeQ9Fq6aomEGuuVscoMhZqWc0SHOOz0dIDdlJFF+W4Effw6l9u0Fe262g0WfsnS3PqF7a6eBC0qkf3yH8joAlvquVxWp+dr7dBzy0gGZysD/pqF/NBiB3GZ9TMreK39DJ9zC83p2r0awP1hduhkCJI2QOsNX7fna6e2edVt7rxOEe19So83eDNBbJ6bfV7YbkEMqUJxNHWC6MIDCrCbFf8QlT3fnPsb0IHMa9aJRe/TvgI+aR+nKjRhvVymXddCBAy5hYb/I66omx4BGbl7+9HPo/w/c3m+vCJIu6IQZFVAmsoP6pft9aYVXgkz20C4I/4tF0YlDuH617PT3DeCjf+MG4Mgh9JiXJ2Jt8U6NH1tlXiS/F6OjPGFB7UrFw1o2e0KhX+l/qJEslf5Xc35vmbELf1Fy7QNVttZ2H5OXYrWhsV8EOmpN+KcVQI= + bucket: ray-wheels + acl: public_read + region: us-west-2 + local_dir: .whl + upload-dir: latest + skip_cleanup: true + only: + - master + on: + repo: ray-project/ray + condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 4dd867a706a7..d3fb4c98f424 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,6 +3,18 @@ from __future__ import print_function import pandas as pd +from pandas.api.types import is_scalar +from pandas.util._validators import validate_bool_kwarg +from pandas.core.index import _ensure_index_from_sequences +from pandas._libs import lib +from pandas.core.dtypes.cast import maybe_upcast_putmask +from pandas.compat import lzip +from pandas.core.dtypes.common import ( + is_bool_dtype, + is_numeric_dtype, + is_timedelta64_dtype) + +import warnings import numpy as np import ray import itertools @@ -23,13 +35,12 @@ def __init__(self, df, columns, index=None): assert(len(df) > 0) self._df = df - self._lengths = [_deploy_func.remote(_get_lengths, d) - for d in self._df] + self._compute_lengths() self.columns = columns # this _index object is a pd.DataFrame # and we use that DataFrame's Index to index the rows. - self._index = self._default_index() + self._index = _default_index.remote(self) if index is not None: self.index = index @@ -56,21 +67,33 @@ def _set_index(self, new_index): """ self._index.index = new_index - def _default_index(self): - """Create a default index, which is a RangeIndex + index = property(_get_index, _set_index) + + def _get__index(self): + """Get the _index for this DataFrame. Returns: - The pd.RangeIndex object that represents this DataFrame. + The default index. """ - dest_indices = {"partition": - [i for i in range(len(self._lengths)) - for j in range(self._lengths[i])], - "index_within_partition": - [j for i in range(len(self._lengths)) - for j in range(self._lengths[i])]} - return pd.DataFrame(dest_indices) + if isinstance(self._index_cache, ray.local_scheduler.ObjectID): + self._index_cache = ray.get(self._index_cache) + return self._index_cache - index = property(_get_index, _set_index) + def _set__index(self, new__index): + """Set the _index for this DataFrame. + + Args: + new__index: The new default index to set. + """ + self._index_cache = new__index + + _index = property(_get__index, _set__index) + + def _compute_lengths(self): + """Updates the stored lengths of DataFrame partions + """ + self._lengths = [_deploy_func.remote(_get_lengths, d) + for d in self._df] def _get_lengths(self): """Gets the lengths for each partition and caches it if it wasn't. @@ -192,6 +215,21 @@ def _map_partitions(self, func, index=None): return DataFrame(new_df, self.columns, index=index) + def _update_inplace(self, df=None, columns=None, index=None): + """Updates the current DataFrame inplace + """ + assert(len(df) > 0) + + if df: + self._df = df + if columns: + self.columns = columns + if index: + self.index = index + + self._compute_lengths() + self._index = self._default_index() + def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -576,7 +614,6 @@ def count(self, axis=0, level=None, numeric_only=False): level=level, numeric_only=numeric_only), index=temp_index)._df)) - # collapsed_df.index = self.columns return collapsed_df def cov(self, min_periods=None): @@ -623,7 +660,37 @@ def eq(self, other, axis='columns', level=None): raise NotImplementedError("Not Yet implemented.") def equals(self, other): - raise NotImplementedError("Not Yet implemented.") + """ + Checks if other DataFrame is elementwise equal to the current one + + Returns: + Boolean: True if equal, otherwise False + """ + def helper(df, index, other_series): + return df.iloc[index['index_within_partition']] \ + .equals(other_series) + + results = [] + other_partition = None + other_df = None + for i, idx in other._index.iterrows(): + if idx['partition'] != other_partition: + other_df = ray.get(other._df[idx['partition']]) + other_partition = idx['partition'] + # TODO: group series here into full df partitions to reduce + # the number of remote calls to helper + other_series = other_df.iloc[idx['index_within_partition']] + curr_index = self._index.iloc[i] + curr_df = self._df[int(curr_index['partition'])] + results.append(_deploy_func.remote(helper, + curr_df, + curr_index, + other_series)) + + for r in results: + if not ray.get(r): + return False + return True def eval(self, expr, inplace=False, **kwargs): raise NotImplementedError("Not Yet implemented.") @@ -812,7 +879,52 @@ def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, raise NotImplementedError("Not Yet implemented.") def insert(self, loc, column, value, allow_duplicates=False): - raise NotImplementedError("Not Yet implemented.") + """Insert column into DataFrame at specified location. + + Args: + loc (int): Insertion index. Must verify 0 <= loc <= len(columns). + column (hashable object): Label of the inserted column. + value (int, Series, or array-like): The values to insert. + allow_duplicates (bool): Whether to allow duplicate column names. + """ + try: + len(value) + except TypeError: + value = [value for _ in range(len(self.index))] + + if len(value) != len(self.index): + raise ValueError( + "Column length provided does not match DataFrame length.") + if loc < 0 or loc > len(self.columns): + raise ValueError( + "Location provided must be higher than 0 and lower than the " + "number of columns.") + if not allow_duplicates and column in self.columns: + raise ValueError( + "Column {} already exists in DataFrame.".format(column)) + + cumulative = np.cumsum(self._lengths) + partitions = [value[cumulative[i-1]:cumulative[i]] + for i in range(len(cumulative)) + if i != 0] + + partitions.insert(0, value[:cumulative[0]]) + + # Because insert is always inplace, we have to create this temp fn. + def _insert(_df, _loc, _column, _part, _allow_duplicates): + _df.insert(_loc, _column, _part, _allow_duplicates) + return _df + + self._df = \ + [_deploy_func.remote(_insert, + self._df[i], + loc, + column, + partitions[i], + allow_duplicates) + for i in range(len(self._df))] + + self.columns = self.columns.insert(loc, column) def interpolate(self, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', downcast=None, **kwargs): @@ -1079,7 +1191,6 @@ def pop(self, item): A Series containing the popped values. Also modifies this DataFrame. """ - popped = to_pandas(self._map_partitions( lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df @@ -1102,7 +1213,18 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, raise NotImplementedError("Not Yet implemented.") def query(self, expr, inplace=False, **kwargs): - raise NotImplementedError("Not Yet implemented.") + """Queries the Dataframe with a boolean expression + + Returns: + A new DataFrame if inplace=False + """ + new_dfs = [_deploy_func.remote(lambda df: df.query(expr, **kwargs), + part) for part in self._df] + + if inplace: + self._update_inplace(new_dfs) + else: + return DataFrame(new_dfs, self.columns) def radd(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1148,7 +1270,103 @@ def resample(self, rule, how=None, axis=0, fill_method=None, closed=None, def reset_index(self, level=None, drop=False, inplace=False, col_level=0, col_fill=''): - raise NotImplementedError("Not Yet implemented.") + """Reset this index to default and create column from current index. + + Args: + level: Only remove the given levels from the index. Removes all + levels by default + drop: Do not try to insert index into dataframe columns. This + resets the index to the default integer index. + inplace: Modify the DataFrame in place (do not create a new object) + col_level : If the columns have multiple levels, determines which + level the labels are inserted into. By default it is inserted + into the first level. + col_fill: If the columns have multiple levels, determines how the + other levels are named. If None then the index name is + repeated. + + Returns: + A new DataFrame if inplace is False, None otherwise. + """ + inplace = validate_bool_kwarg(inplace, 'inplace') + if inplace: + new_obj = self + else: + new_obj = self.copy() + + def _maybe_casted_values(index, labels=None): + if isinstance(index, pd.PeriodIndex): + values = index.asobject.values + elif isinstance(index, pd.DatetimeIndex) and index.tz is not None: + values = index + else: + values = index.values + if values.dtype == np.object_: + values = lib.maybe_convert_objects(values) + + # if we have the labels, extract the values with a mask + if labels is not None: + mask = labels == -1 + + # we can have situations where the whole mask is -1, + # meaning there is nothing found in labels, so make all nan's + if mask.all(): + values = np.empty(len(mask)) + values.fill(np.nan) + else: + values = values.take(labels) + if mask.any(): + values, changed = maybe_upcast_putmask( + values, mask, np.nan) + return values + + new_index = ray.get(_default_index.remote(new_obj)).index + if level is not None: + if not isinstance(level, (tuple, list)): + level = [level] + level = [self.index._get_level_number(lev) for lev in level] + if isinstance(self.index, pd.MultiIndex): + if len(level) < self.index.nlevels: + new_index = self.index.droplevel(level) + + if not drop: + if isinstance(self.index, pd.MultiIndex): + names = [n if n is not None else ('level_%d' % i) + for (i, n) in enumerate(self.index.names)] + to_insert = lzip(self.index.levels, self.index.labels) + else: + default = 'index' if 'index' not in self else 'level_0' + names = ([default] if self.index.name is None + else [self.index.name]) + to_insert = ((self.index, None),) + + multi_col = isinstance(self.columns, pd.MultiIndex) + for i, (lev, lab) in reversed(list(enumerate(to_insert))): + if not (level is None or i in level): + continue + name = names[i] + if multi_col: + col_name = (list(name) if isinstance(name, tuple) + else [name]) + if col_fill is None: + if len(col_name) not in (1, self.columns.nlevels): + raise ValueError("col_fill=None is incompatible " + "with incomplete column name " + "{}".format(name)) + col_fill = col_name[0] + + lev_num = self.columns._get_level_number(col_level) + name_lst = [col_fill] * lev_num + col_name + missing = self.columns.nlevels - len(name_lst) + name_lst += [col_fill] * missing + name = tuple(name_lst) + # to ndarray and maybe infer different dtype + level_values = _maybe_casted_values(lev, lab) + new_obj.insert(0, name, level_values) + + new_obj.index = new_index + if not inplace: + return new_obj def rfloordiv(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1192,11 +1410,116 @@ def sem(self, axis=None, skipna=None, level=None, ddof=1, raise NotImplementedError("Not Yet implemented.") def set_axis(self, labels, axis=0, inplace=None): - raise NotImplementedError("Not Yet implemented.") + """Assign desired index to given axis. + + Args: + labels (pd.Index or list-like): The Index to assign. + axis (string or int): The axis to reassign. + inplace (bool): Whether to make these modifications inplace. + + Returns: + If inplace is False, returns a new DataFrame, otherwise None. + """ + if is_scalar(labels): + warnings.warn( + 'set_axis now takes "labels" as first argument, and ' + '"axis" as named parameter. The old form, with "axis" as ' + 'first parameter and \"labels\" as second, is still supported ' + 'but will be deprecated in a future version of pandas.', + FutureWarning, stacklevel=2) + labels, axis = axis, labels + + if inplace is None: + warnings.warn( + 'set_axis currently defaults to operating inplace.\nThis ' + 'will change in a future version of pandas, use ' + 'inplace=True to avoid this warning.', + FutureWarning, stacklevel=2) + inplace = True + if inplace: + setattr(self, self._index._get_axis_name(axis), labels) + else: + obj = self.copy() + obj.set_axis(labels, axis=axis, inplace=True) + return obj def set_index(self, keys, drop=True, append=False, inplace=False, verify_integrity=False): - raise NotImplementedError("Not Yet implemented.") + """Set the DataFrame index using one or more existing columns. + + Args: + keys: column label or list of column labels / arrays. + drop (boolean): Delete columns to be used as the new index. + append (boolean): Whether to append columns to existing index. + inplace (boolean): Modify the DataFrame in place. + verify_integrity (boolean): Check the new index for duplicates. + Otherwise defer the check until necessary. Setting to False + will improve the performance of this method + + Returns: + If inplace is set to false returns a new DataFrame, otherwise None. + """ + inplace = validate_bool_kwarg(inplace, 'inplace') + if not isinstance(keys, list): + keys = [keys] + + if inplace: + frame = self + else: + frame = self.copy() + + arrays = [] + names = [] + if append: + names = [x for x in self.index.names] + if isinstance(self.index, pd.MultiIndex): + for i in range(self.index.nlevels): + arrays.append(self.index._get_level_values(i)) + else: + arrays.append(self.index) + + to_remove = [] + for col in keys: + if isinstance(col, pd.MultiIndex): + # append all but the last column so we don't have to modify + # the end of this loop + for n in range(col.nlevels - 1): + arrays.append(col._get_level_values(n)) + + level = col._get_level_values(col.nlevels - 1) + names.extend(col.names) + elif isinstance(col, pd.Series): + level = col._values + names.append(col.name) + elif isinstance(col, pd.Index): + level = col + names.append(col.name) + elif isinstance(col, (list, np.ndarray, pd.Index)): + level = col + names.append(None) + else: + level = frame[col]._values + names.append(col) + if drop: + to_remove.append(col) + arrays.append(level) + + index = _ensure_index_from_sequences(arrays, names) + + if verify_integrity and not index.is_unique: + duplicates = index.get_duplicates() + raise ValueError('Index has duplicate keys: %s' % duplicates) + + for c in to_remove: + del frame[c] + + # clear up memory usage + index._cleanup() + + frame.index = index + + if not inplace: + return frame def set_value(self, index, col, value, takeable=False): raise NotImplementedError("Not Yet implemented.") @@ -1450,10 +1773,15 @@ def __hash__(self): raise NotImplementedError("Not Yet implemented.") def __iter__(self): - raise NotImplementedError("Not Yet implemented.") + """Iterate over the columns + + Returns: + An Iterator over the columns of the dataframe. + """ + return iter(self.columns) def __contains__(self, key): - raise NotImplementedError("Not Yet implemented.") + return key in self.columns def __nonzero__(self): raise NotImplementedError("Not Yet implemented.") @@ -1462,7 +1790,12 @@ def __bool__(self): raise NotImplementedError("Not Yet implemented.") def __abs__(self): - raise NotImplementedError("Not Yet implemented.") + """Creates a modified DataFrame by elementwise taking the absolute value + + Returns: + A modified DataFrame + """ + return self.abs() def __round__(self, decimals=0): raise NotImplementedError("Not Yet implemented.") @@ -1541,10 +1874,20 @@ def __ge__(self, other): raise NotImplementedError("Not Yet implemented.") def __eq__(self, other): - raise NotImplementedError("Not Yet implemented.") + """Computes the equality of this DataFrame with another + + Returns: + True, if the DataFrames are equal. False otherwise. + """ + return self.equals(other) def __ne__(self, other): - raise NotImplementedError("Not Yet implemented.") + """Checks that this DataFrame is not equal to another + + Returns: + True, if the DataFrames are not equal. False otherwise. + """ + return not self.equals(other) def __add__(self, other): raise NotImplementedError("Not Yet implemented.") @@ -1571,7 +1914,19 @@ def __isub__(self, other): raise NotImplementedError("Not Yet implemented.") def __neg__(self): - raise NotImplementedError("Not Yet implemented.") + """Computes an element wise negative DataFrame + + Returns: + A modified DataFrame where every element is the negation of before + """ + for t in self.dtypes: + if not (is_bool_dtype(t) + or is_numeric_dtype(t) + or is_timedelta64_dtype(t)): + raise TypeError("Unary negative expects numeric dtype, not {}" + .format(t)) + + return self._map_partitions(lambda df: df.__neg__()) def __floordiv__(self, other): raise NotImplementedError("Not Yet implemented.") @@ -1603,8 +1958,15 @@ def iat(axis=None): def __rsub__(other, axis=None, level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") - def loc(axis=None): - raise NotImplementedError("Not Yet implemented.") + @property + def loc(self): + """Purely label-location based indexer for selection by label. + + We currently support: single label, list array, slice object + We do not support: boolean array, callable + """ + from .indexing import _Loc_Indexer + return _Loc_Indexer(self) @property def is_copy(self): @@ -1622,8 +1984,15 @@ def at(axis=None): def ix(axis=None): raise NotImplementedError("Not Yet implemented.") - def iloc(axis=None): - raise NotImplementedError("Not Yet implemented.") + @property + def iloc(self): + """Purely integer-location based indexing for selection by position. + + We currently support: single label, list array, slice object + We do not support: boolean array, callable + """ + from .indexing import _iLoc_Indexer + return _iLoc_Indexer(self) def _get_lengths(df): @@ -1753,7 +2122,23 @@ def to_pandas(df): Returns: A new pandas DataFrame. """ - joined_df = pd.concat(ray.get(df._df)) - joined_df.index = df.index - joined_df.columns = df.columns - return joined_df + pd_df = pd.concat(ray.get(df._df)) + pd_df.index = df.index + pd_df.columns = df.columns + return pd_df + + +@ray.remote +def _default_index(df): + """Create a default index, which is a RangeIndex + + Returns: + The pd.RangeIndex object that represents this DataFrame. + """ + dest_indices = {"partition": + [i for i in range(len(df._lengths)) + for j in range(df._lengths[i])], + "index_within_partition": + [j for i in range(len(df._lengths)) + for j in range(df._lengths[i])]} + return pd.DataFrame(dest_indices) diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py new file mode 100644 index 000000000000..aa94fd565c5f --- /dev/null +++ b/python/ray/dataframe/indexing.py @@ -0,0 +1,103 @@ +import pandas as pd +import ray +from .dataframe import _deploy_func + + +class _Location_Indexer_Base(): + """Base class for location indexer like loc and iloc + This class abstract away commonly used method + """ + + def __init__(self, ray_df): + self.df = ray_df + + def __getitem__(self, key): + if not isinstance(key, tuple): + # The one argument case is equivalent to full slice in 2nd dim. + return self.locate_2d(key, slice(None)) + else: + return self.locate_2d(*key) + + def _get_lookup_dict(self, ray_partition_idx): + if ray_partition_idx.ndim == 1: # Single row matched + position = (ray_partition_idx['partition'], + ray_partition_idx['index_within_partition']) + rows_to_lookup = {position[0]: [position[1]]} + if ray_partition_idx.ndim == 2: # Multiple rows matched + # We copy ray_partition_idx because it allows us to + # do groupby. This might not be the most efficient method. + # And have room to optimize. + ray_partition_idx = ray_partition_idx.copy() + rows_to_lookup = ray_partition_idx.groupby('partition').aggregate( + lambda x: list(x)).to_dict()['index_within_partition'] + return rows_to_lookup + + def locate_2d(self, row_label, col_label): + pass + + def _map_partition(self, lookup_dict, col_lst, indexer='loc'): + """Apply retrieval function to a lookup_dict + in the form of {partition_id: [idx]}. + + Returns: + retrieved_rows_remote: a list of object ids for pd_df + """ + assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc" + + if indexer == 'loc': + + def retrieve_func(df, idx_lst, col_label): + return df.loc[idx_lst, col_label] + elif indexer == 'iloc': + + def retrieve_func(df, idx_lst, col_idx): + return df.iloc[idx_lst, col_idx] + + retrieved_rows_remote = [ + _deploy_func.remote(retrieve_func, self.df._df[partition], + idx_to_lookup, col_lst) + for partition, idx_to_lookup in lookup_dict.items() + ] + return retrieved_rows_remote + + +class _Loc_Indexer(_Location_Indexer_Base): + """A indexer for ray_df.loc[] functionality""" + + def locate_2d(self, row_label, col_label): + index_loc = self.df._index.loc[row_label] + lookup_dict = self._get_lookup_dict(index_loc) + retrieved_rows_remote = self._map_partition( + lookup_dict, col_label, indexer='loc') + joined_df = pd.concat(ray.get(retrieved_rows_remote)) + + if index_loc.ndim == 2: + # The returned result need to be indexed series/df + # Re-index is needed. + joined_df.index = index_loc.index + + if isinstance(row_label, int) or isinstance(row_label, str): + return joined_df.squeeze(axis=0) + else: + return joined_df + + +class _iLoc_Indexer(_Location_Indexer_Base): + """A indexer for ray_df.iloc[] functionality""" + + def locate_2d(self, row_idx, col_idx): + index_loc = self.df._index.iloc[row_idx] + lookup_dict = self._get_lookup_dict(index_loc) + retrieved_rows_remote = self._map_partition( + lookup_dict, col_idx, indexer='iloc') + joined_df = pd.concat(ray.get(retrieved_rows_remote)) + + if index_loc.ndim == 2: + # The returned result need to be indexed series/df + # Re-index is needed. + joined_df.index = index_loc.index + + if isinstance(row_idx, int) or isinstance(row_idx, str): + return joined_df.squeeze(axis=0) + else: + return joined_df diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index d49f71fb354c..31087066a818 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -160,6 +160,9 @@ def test_int_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -185,10 +188,14 @@ def test_int_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -216,6 +223,32 @@ def test_int_dataframe(): test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + labels.append('e') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test___contains__(ray_df, key, True) + test___contains__(ray_df, "Not Exists", False) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_float_dataframe(): @@ -233,6 +266,9 @@ def test_float_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -258,10 +294,14 @@ def test_float_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -288,13 +328,36 @@ def test_float_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + labels.append('e') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_mixed_dtype_dataframe(): pandas_df = pd.DataFrame({ - 'col1': [1, 2, 3, 4], - 'col2': [4, 5, 6, 7], - 'col3': [8.0, 9.4, 10.1, 11.3], - 'col4': ['a', 'b', 'c', 'd']}) + 'col1': [1, 2, 3, 4], + 'col2': [4, 5, 6, 7], + 'col3': [8.0, 9.4, 10.1, 11.3], + 'col4': ['a', 'b', 'c', 'd']}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -303,6 +366,9 @@ def test_mixed_dtype_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -327,14 +393,21 @@ def test_mixed_dtype_dataframe(): with pytest.raises(TypeError): test_abs(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + + with pytest.raises(TypeError): + test___neg__(ray_df, pandas_df) + + test___iter__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -365,13 +438,35 @@ def test_mixed_dtype_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_nan_dataframe(): pandas_df = pd.DataFrame({ - 'col1': [1, 2, 3, np.nan], - 'col2': [4, 5, np.nan, 7], - 'col3': [8, np.nan, 10, 11], - 'col4': [np.nan, 13, 14, 15]}) + 'col1': [1, 2, 3, np.nan], + 'col2': [4, 5, np.nan, 7], + 'col3': [8, np.nan, 10, 11], + 'col4': [np.nan, 13, 14, 15]}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -380,6 +475,9 @@ def test_nan_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -405,10 +503,14 @@ def test_nan_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -435,6 +537,28 @@ def test_nan_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_add(): ray_df = create_test_dataframe() @@ -747,10 +871,19 @@ def test_eq(): def test_equals(): - ray_df = create_test_dataframe() + pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3], + 'col2': [2, 3, 4, 1]}) + ray_df1 = rdf.from_pandas(pandas_df1, 2) + ray_df2 = rdf.from_pandas(pandas_df1, 3) - with pytest.raises(NotImplementedError): - ray_df.equals(None) + assert ray_df1.equals(ray_df2) + + pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3], + 'col2': [2, 3, 5, 1]}) + ray_df3 = rdf.from_pandas(pandas_df2, 4) + + assert not ray_df3.equals(ray_df1) + assert not ray_df3.equals(ray_df2) def test_eval(): @@ -902,11 +1035,13 @@ def test_info(): ray_df.info() -def test_insert(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_insert(ray_df, pandas_df, loc, column, value): + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() - with pytest.raises(NotImplementedError): - ray_df.insert(None, None, None) + ray_df_cp.insert(loc, column, value) + pd_df_cp.insert(loc, column, value) def test_interpolate(): @@ -1223,11 +1358,12 @@ def test_quantile(): ray_df.quantile() -def test_query(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_query(ray_df, pandas_df, funcs): - with pytest.raises(NotImplementedError): - ray_df.query(None) + for f in funcs: + pandas_df_new, ray_df_new = pandas_df.query(f), ray_df.query(f) + assert pandas_df_new.equals(rdf.to_pandas(ray_df_new)) def test_radd(): @@ -1307,11 +1443,19 @@ def test_resample(): ray_df.resample(None) -def test_reset_index(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.reset_index() +@pytest.fixture +def test_reset_index(ray_df, pandas_df, inplace=False): + if not inplace: + print(rdf.to_pandas(ray_df.reset_index(inplace=inplace)).index) + print(pandas_df.reset_index(inplace=inplace)) + assert rdf.to_pandas(ray_df.reset_index(inplace=inplace)).equals( + pandas_df.reset_index(inplace=inplace)) + else: + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() + ray_df_cp.reset_index(inplace=inplace) + pd_df_cp.reset_index(inplace=inplace) + assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) def test_rfloordiv(): @@ -1397,18 +1541,23 @@ def test_sem(): ray_df.sem() -def test_set_axis(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.set_axis(None) - +@pytest.fixture +def test_set_axis(ray_df, pandas_df, label, axis): + assert rdf.to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals( + pandas_df.set_axis(label, axis, inplace=False)) -def test_set_index(): - ray_df = create_test_dataframe() - with pytest.raises(NotImplementedError): - ray_df.set_index(None) +@pytest.fixture +def test_set_index(ray_df, pandas_df, keys, inplace=False): + if not inplace: + assert rdf.to_pandas(ray_df.set_index(keys)).equals( + pandas_df.set_index(keys)) + else: + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() + ray_df_cp.set_index(keys, inplace=inplace) + pd_df_cp.set_index(keys, inplace=inplace) + assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) def test_set_value(): @@ -1789,11 +1938,10 @@ def test___unicode__(): ray_df.__unicode__() -def test___neg__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__neg__() +@pytest.fixture +def test___neg__(ray_df, pd_df): + ray_df_neg = ray_df.__neg__() + assert pd_df.__neg__().equals(rdf.to_pandas(ray_df_neg)) def test___invert__(): @@ -1810,18 +1958,22 @@ def test___hash__(): ray_df.__hash__() -def test___iter__(): - ray_df = create_test_dataframe() +@pytest.fixture +def test___iter__(ray_df, pd_df): + ray_iterator = ray_df.__iter__() - with pytest.raises(NotImplementedError): - ray_df.__iter__() + # Check that ray_iterator implements the iterator interface + assert hasattr(ray_iterator, '__iter__') + assert hasattr(ray_iterator, 'next') or hasattr(ray_iterator, '__next__') + pd_iterator = pd_df.__iter__() + assert list(ray_iterator) == list(pd_iterator) -def test___contains__(): - ray_df = create_test_dataframe() - with pytest.raises(NotImplementedError): - ray_df.__contains__(None) +@pytest.fixture +def test___contains__(ray_df, key, result): + assert result == ray_df.__contains__(key) + assert result == (key in ray_df) def test___nonzero__(): @@ -1838,11 +1990,9 @@ def test___bool__(): ray_df.__bool__() -def test___abs__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__abs__() +@pytest.fixture +def test___abs__(ray_df, pandas_df): + assert(ray_df_equals_pandas(abs(ray_df), abs(pandas_df))) def test___round__(): @@ -1936,11 +2086,20 @@ def test___rsub__(): ray_df.__rsub__(None, None, None) -def test_loc(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_loc(ray_df, pd_df): + # Singleton + assert ray_df.loc[0].equals(pd_df.loc[0]) + assert ray_df.loc[0, 'col1'] == pd_df.loc[0, 'col1'] - with pytest.raises(NotImplementedError): - ray_df.loc() + # List + assert ray_df.loc[[1, 2]].equals(pd_df.loc[[1, 2]]) + assert ray_df.loc[[1, 2], ['col1']].equals(pd_df.loc[[1, 2], ['col1']]) + + # Slice + assert ray_df.loc[1:, 'col1'].equals(pd_df.loc[1:, 'col1']) + assert ray_df.loc[1:2, 'col1'].equals(pd_df.loc[1:2, 'col1']) + assert ray_df.loc[1:2, 'col1':'col2'].equals(pd_df.loc[1:2, 'col1':'col2']) def test_is_copy(): @@ -1978,8 +2137,17 @@ def test_ix(): ray_df.ix() -def test_iloc(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.iloc() +@pytest.fixture +def test_iloc(ray_df, pd_df): + # Singleton + assert ray_df.iloc[0].equals(pd_df.iloc[0]) + assert ray_df.iloc[0, 1] == pd_df.iloc[0, 1] + + # List + assert ray_df.iloc[[1, 2]].equals(pd_df.iloc[[1, 2]]) + assert ray_df.iloc[[1, 2], [1, 0]].equals(pd_df.iloc[[1, 2], [1, 0]]) + + # Slice + assert ray_df.iloc[1:, 0].equals(pd_df.iloc[1:, 0]) + assert ray_df.iloc[1:2, 0].equals(pd_df.iloc[1:2, 0]) + assert ray_df.iloc[1:2, 0:2].equals(pd_df.iloc[1:2, 0:2]) diff --git a/python/ray/rllib/dqn/dqn_replay_evaluator.py b/python/ray/rllib/dqn/dqn_replay_evaluator.py index 56bbe6d48409..effd0bf01249 100644 --- a/python/ray/rllib/dqn/dqn_replay_evaluator.py +++ b/python/ray/rllib/dqn/dqn_replay_evaluator.py @@ -28,7 +28,7 @@ def __init__(self, registry, env_creator, config, logdir): if self.config["num_workers"] > 1: remote_cls = ray.remote(num_cpus=1)(DQNEvaluator) self.workers = [ - remote_cls.remote(env_creator, config, logdir) + remote_cls.remote(registry, env_creator, config, logdir) for _ in range(self.config["num_workers"])] else: self.workers = [] @@ -146,3 +146,9 @@ def restore(self, data): w.restore.remote(d) self.beta_schedule = data[2] self.replay_buffer = data[3] + + def set_global_timestep(self, global_timestep): + self.global_timestep = global_timestep + if self.workers: + ray.get([worker.set_global_timestep.remote(global_timestep) + for worker in self.workers]) diff --git a/python/ray/rllib/ppo/ppo.py b/python/ray/rllib/ppo/ppo.py index 8228764ecbf8..5811b2e606de 100644 --- a/python/ray/rllib/ppo/ppo.py +++ b/python/ray/rllib/ppo/ppo.py @@ -41,6 +41,8 @@ "device_count": {"CPU": 4}, "log_device_placement": False, "allow_soft_placement": True, + "intra_op_parallelism_threads": 1, + "inter_op_parallelism_threads": 2, }, # Batch size for policy evaluations for rollouts "rollout_batchsize": 1, diff --git a/python/ray/tune/cluster_info.py b/python/ray/tune/cluster_info.py new file mode 100644 index 000000000000..23632d660ab7 --- /dev/null +++ b/python/ray/tune/cluster_info.py @@ -0,0 +1,23 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import getpass +import os + + +def get_ssh_user(): + """Returns ssh username for connecting to cluster workers.""" + + return getpass.getuser() + + +# TODO(ekl) this currently only works for clusters launched with +# ray create_or_update +def get_ssh_key(): + """Returns ssh key to connecting to cluster workers.""" + + path = os.path.expanduser("~/ray_bootstrap_key.pem") + if os.path.exists(path): + return path + return None diff --git a/python/ray/tune/log_sync.py b/python/ray/tune/log_sync.py index 6f404e4d3679..c41100521f2f 100644 --- a/python/ray/tune/log_sync.py +++ b/python/ray/tune/log_sync.py @@ -4,9 +4,12 @@ import distutils.spawn import os +import pipes import subprocess import time +import ray +from ray.tune.cluster_info import get_ssh_key, get_ssh_user from ray.tune.error import TuneError from ray.tune.result import DEFAULT_RESULTS_DIR @@ -15,20 +18,21 @@ _syncers = {} -def get_syncer(local_dir, remote_dir): - if not remote_dir.startswith("s3://"): - raise TuneError("Upload uri must start with s3://") +def get_syncer(local_dir, remote_dir=None): + if remote_dir: + if not remote_dir.startswith("s3://"): + raise TuneError("Upload uri must start with s3://") - if not distutils.spawn.find_executable("aws"): - raise TuneError("Upload uri requires awscli tool to be installed") + if not distutils.spawn.find_executable("aws"): + raise TuneError("Upload uri requires awscli tool to be installed") - if local_dir.startswith(DEFAULT_RESULTS_DIR + "/"): - rel_path = os.path.relpath(local_dir, DEFAULT_RESULTS_DIR) - remote_dir = os.path.join(remote_dir, rel_path) + if local_dir.startswith(DEFAULT_RESULTS_DIR + "/"): + rel_path = os.path.relpath(local_dir, DEFAULT_RESULTS_DIR) + remote_dir = os.path.join(remote_dir, rel_path) key = (local_dir, remote_dir) if key not in _syncers: - _syncers[key] = _S3LogSyncer(local_dir, remote_dir) + _syncers[key] = _LogSyncer(local_dir, remote_dir) return _syncers[key] @@ -38,23 +42,64 @@ def wait_for_log_sync(): syncer.wait() -class _S3LogSyncer(object): - def __init__(self, local_dir, remote_dir): +class _LogSyncer(object): + """Log syncer for tune. + + This syncs files from workers to the local node, and optionally also from + the local node to a remote directory (e.g. S3).""" + + def __init__(self, local_dir, remote_dir=None): self.local_dir = local_dir self.remote_dir = remote_dir self.last_sync_time = 0 self.sync_process = None - print("Created S3LogSyncer for {} -> {}".format(local_dir, remote_dir)) + self.local_ip = ray.services.get_node_ip_address() + self.worker_ip = None + print("Created LogSyncer for {} -> {}".format(local_dir, remote_dir)) + + def set_worker_ip(self, worker_ip): + """Set the worker ip to sync logs from.""" + + self.worker_ip = worker_ip def sync_if_needed(self): if time.time() - self.last_sync_time > 300: self.sync_now() def sync_now(self, force=False): - print( - "Syncing files from {} -> {}".format( - self.local_dir, self.remote_dir)) self.last_sync_time = time.time() + if not self.worker_ip: + print( + "Worker ip unknown, skipping log sync for {}".format( + self.local_dir)) + return + + if self.worker_ip == self.local_ip: + worker_to_local_sync_cmd = None # don't need to rsync + else: + ssh_key = get_ssh_key() + ssh_user = get_ssh_user() + if ssh_key is None or ssh_user is None: + print( + "Error: log sync requires cluster to be setup with " + "`ray create_or_update`.") + return + if not distutils.spawn.find_executable("rsync"): + print("Error: log sync requires rsync to be installed.") + return + worker_to_local_sync_cmd = ( + ("""rsync -avz -e "ssh -i '{}' -o ConnectTimeout=120s """ + """-o StrictHostKeyChecking=no" '{}@{}:{}/' '{}/'""").format( + ssh_key, ssh_user, self.worker_ip, + pipes.quote(self.local_dir), pipes.quote(self.local_dir))) + + if self.remote_dir: + local_to_remote_sync_cmd = ( + "aws s3 sync '{}' '{}'".format( + pipes.quote(self.local_dir), pipes.quote(self.remote_dir))) + else: + local_to_remote_sync_cmd = None + if self.sync_process: self.sync_process.poll() if self.sync_process.returncode is None: @@ -63,8 +108,17 @@ def sync_now(self, force=False): else: print("Warning: last sync is still in progress, skipping") return - self.sync_process = subprocess.Popen( - ["aws", "s3", "sync", self.local_dir, self.remote_dir]) + + if worker_to_local_sync_cmd or local_to_remote_sync_cmd: + final_cmd = "" + if worker_to_local_sync_cmd: + final_cmd += worker_to_local_sync_cmd + if local_to_remote_sync_cmd: + if final_cmd: + final_cmd += " && " + final_cmd += local_to_remote_sync_cmd + print("Running log sync: {}".format(final_cmd)) + self.sync_process = subprocess.Popen(final_cmd, shell=True) def wait(self): if self.sync_process: diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 4c3308498d07..952049b338ab 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -24,10 +24,6 @@ class Logger(object): multiple formats (TensorBoard, rllab/viskit, plain json) at once. """ - _attrs_to_log = [ - "time_this_iter_s", "mean_loss", "mean_accuracy", - "episode_reward_mean", "episode_len_mean"] - def __init__(self, config, logdir, upload_uri=None): self.config = config self.logdir = logdir @@ -47,6 +43,11 @@ def close(self): pass + def flush(self): + """Flushes all disk writes to storage.""" + + pass + class UnifiedLogger(Logger): """Unified result logger for TensorBoard, rllab/viskit, plain json. @@ -60,22 +61,22 @@ def _init(self): print("TF not installed - cannot log with {}...".format(cls)) continue self._loggers.append(cls(self.config, self.logdir, self.uri)) - if self.uri: - self._log_syncer = get_syncer(self.logdir, self.uri) - else: - self._log_syncer = None + self._log_syncer = get_syncer(self.logdir, self.uri) def on_result(self, result): for logger in self._loggers: logger.on_result(result) - if self._log_syncer: - self._log_syncer.sync_if_needed() + self._log_syncer.set_worker_ip(result.node_ip) + self._log_syncer.sync_if_needed() def close(self): for logger in self._loggers: logger.close() - if self._log_syncer: - self._log_syncer.sync_now(force=True) + self._log_syncer.sync_now(force=True) + + def flush(self): + self._log_syncer.sync_now(force=True) + self._log_syncer.wait() class NoopLogger(Logger): @@ -103,17 +104,30 @@ def close(self): self.local_out.close() +def to_tf_values(result, path): + values = [] + for attr, value in result.items(): + if value is not None: + if type(value) in [int, float]: + values.append(tf.Summary.Value( + tag="/".join(path + [attr]), + simple_value=value)) + elif type(value) is dict: + values.extend(to_tf_values(value, path + [attr])) + return values + + class _TFLogger(Logger): def _init(self): self._file_writer = tf.summary.FileWriter(self.logdir) def on_result(self, result): - values = [] - for attr in Logger._attrs_to_log: - if getattr(result, attr) is not None: - values.append(tf.Summary.Value( - tag="ray/tune/{}".format(attr), - simple_value=getattr(result, attr))) + tmp = result._asdict() + for k in [ + "config", "pid", "timestamp", "time_total_s", + "timesteps_total"]: + del tmp[k] # not useful to tf log these + values = to_tf_values(tmp, ["ray", "tune"]) train_stats = tf.Summary(value=values) self._file_writer.add_summary(train_stats, result.timesteps_total) diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 365d3147e82f..5aad51cea570 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -85,6 +85,9 @@ # (Auto-filled) The hostname of the machine hosting the training process. "hostname", + # (Auto-filled) The node ip of the machine hosting the training process. + "node_ip", + # (Auto=filled) The current hyperparameter configuration. "config", ]) diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index cebaf3d3445b..2d06bc92e538 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -13,6 +13,7 @@ import time import uuid +import ray from ray.tune import TuneError from ray.tune.logger import UnifiedLogger from ray.tune.result import DEFAULT_RESULTS_DIR @@ -87,6 +88,7 @@ def __init__(self, config=None, registry=None, logger_creator=None): self._timesteps_total = 0 self._setup() self._initialize_ok = True + self._local_ip = ray.services.get_node_ip_address() def train(self): """Runs one logical iteration of training. @@ -136,6 +138,7 @@ def train(self): neg_mean_loss=neg_loss, pid=os.getpid(), hostname=os.uname()[1], + node_ip=self._local_ip, config=self.config) self._result_logger.on_result(result) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 9345621709bb..61b16e450204 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -252,6 +252,7 @@ def _try_recover(self, trial, error_msg): try: print("Attempting to recover trial state from last checkpoint") trial.stop(error=True, error_msg=error_msg, stop_logger=False) + trial.result_logger.flush() # make sure checkpoint is synced trial.start() self._running[trial.train_remote()] = trial except Exception: diff --git a/src/common/thirdparty/build-redis.sh b/src/common/thirdparty/build-redis.sh index 7d5f88957df8..ddce13513c78 100755 --- a/src/common/thirdparty/build-redis.sh +++ b/src/common/thirdparty/build-redis.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +set -x + # Cause the script to exit if a single command fails. set -e @@ -9,7 +11,8 @@ if [ ! -f redis/src/redis-server ]; then # relevant bit about redis/utils/whatisdoing.sh is that it is one of the last # files in the tarball. if [ ! -f redis/utils/whatisdoing.sh ]; then - mkdir -p "./redis" && wget -O- "https://github.com/antirez/redis/archive/$redis_vname.tar.gz" | tar xvz --strip-components=1 -C "./redis" + mkdir -p "./redis" + curl -sL "https://github.com/antirez/redis/archive/$redis_vname.tar.gz" | tar xz --strip-components=1 -C "./redis" fi cd redis make diff --git a/src/thirdparty/build_flatbuffers.sh b/src/thirdparty/build_flatbuffers.sh index 121cd9e8d73a..918d955e57ba 100755 --- a/src/thirdparty/build_flatbuffers.sh +++ b/src/thirdparty/build_flatbuffers.sh @@ -12,7 +12,7 @@ FLATBUFFERS_VERSION=1.7.1 # Download and compile flatbuffers if it isn't already present. if [ ! -d $TP_DIR/flatbuffers ]; then echo "building flatbuffers" - wget https://github.com/google/flatbuffers/archive/v$FLATBUFFERS_VERSION.tar.gz -O flatbuffers-$FLATBUFFERS_VERSION.tar.gz + curl -sL https://github.com/google/flatbuffers/archive/v$FLATBUFFERS_VERSION.tar.gz -o flatbuffers-$FLATBUFFERS_VERSION.tar.gz tar xf flatbuffers-$FLATBUFFERS_VERSION.tar.gz rm -rf flatbuffers-$FLATBUFFERS_VERSION.tar.gz diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 0165215a3e23..295969a5d22a 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -153,6 +153,13 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ # --stop '{"training_iteration": 2}' \ # --config '{"num_workers": 2, "use_lstm": false, "use_pytorch": true, "model": {"grayscale": true, "zero_mean": false, "dim": 80, "channel_major": true}}' +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env CartPole-v0 \ + --run DQN \ + --stop '{"training_iteration": 2}' \ + --config '{"num_workers": 2}' + docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \