diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc index 880fe8467ea..5a377611e39 100644 --- a/cpp/src/arrow/python/inference.cc +++ b/cpp/src/arrow/python/inference.cc @@ -317,7 +317,7 @@ class TypeInferrer { if (obj == Py_None || internal::PyFloat_IsNaN(obj)) { ++none_count_; - } else if (PyBool_Check(obj)) { + } else if (internal::PyBoolScalar_Check(obj)) { ++bool_count_; *keep_going = make_unions_; } else if (internal::PyFloatScalar_Check(obj)) { @@ -583,5 +583,20 @@ Status InferArrowTypeAndSize(PyObject* obj, int64_t* size, return Status::OK(); } +ARROW_EXPORT +bool IsPyBool(PyObject* obj) { + return internal::PyBoolScalar_Check(obj); +} + +ARROW_EXPORT +bool IsPyInt(PyObject* obj) { + return internal::PyIntScalar_Check(obj); +} + +ARROW_EXPORT +bool IsPyFloat(PyObject* obj) { + return internal::PyFloatScalar_Check(obj); +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/inference.h b/cpp/src/arrow/python/inference.h index 21cddf40532..143e0f71fdb 100644 --- a/cpp/src/arrow/python/inference.h +++ b/cpp/src/arrow/python/inference.h @@ -40,6 +40,15 @@ class Status; namespace py { +ARROW_EXPORT +bool IsPyBool(PyObject* obj); + +ARROW_EXPORT +bool IsPyInt(PyObject* obj); + +ARROW_EXPORT +bool IsPyFloat(PyObject* obj); + // These three functions take a sequence input, not arbitrary iterables ARROW_EXPORT arrow::Status InferArrowType(PyObject* obj, std::shared_ptr* out_type); diff --git a/cpp/src/arrow/python/numpy-internal.h b/cpp/src/arrow/python/numpy-internal.h index b214a500edf..4c02894fd62 100644 --- a/cpp/src/arrow/python/numpy-internal.h +++ b/cpp/src/arrow/python/numpy-internal.h @@ -165,6 +165,10 @@ inline bool PyIntScalar_Check(PyObject* obj) { return PyLong_Check(obj) || PyArray_IsScalar(obj, Integer); } +inline bool PyBoolScalar_Check(PyObject* obj) { + return PyBool_Check(obj) || PyArray_IsScalar(obj, Bool); +} + } // namespace internal } // namespace py diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index f1d0eec3f8d..6606c64878c 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -149,7 +149,8 @@ def _isfilestore(self): def read_parquet(self, path, columns=None, metadata=None, schema=None, use_threads=True, nthreads=None, - use_pandas_metadata=False): + use_pandas_metadata=False, filters=None, + exact_filter_evaluation=True): """ Read Parquet data from path in file system. Can read from a single file or a directory of files @@ -179,9 +180,10 @@ def read_parquet(self, path, columns=None, metadata=None, schema=None, from pyarrow.util import _deprecate_nthreads use_threads = _deprecate_nthreads(use_threads, nthreads) dataset = ParquetDataset(path, schema=schema, metadata=metadata, - filesystem=self) + filesystem=self, filters=filters) return dataset.read(columns=columns, use_threads=use_threads, - use_pandas_metadata=use_pandas_metadata) + use_pandas_metadata=use_pandas_metadata, + exact_filter_evaluation=exact_filter_evaluation) def open(self, path, mode='rb'): """ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 370e14dc8c1..f75afbd7547 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1064,6 +1064,12 @@ cdef extern from 'arrow/python/config.h' namespace 'arrow::py': void set_numpy_nan(object o) +cdef extern from 'arrow/python/inference.h' namespace 'arrow::py': + c_bool IsPyBool(object o) + c_bool IsPyInt(object o) + c_bool IsPyFloat(object o) + + cdef extern from 'arrow/python/benchmark.h' namespace 'arrow::py::benchmark': void Benchmark_PandasObjectIsNull(object lst) except * diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 9fa97b4e6e8..d73a0ee7b69 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -17,11 +17,13 @@ from collections import defaultdict from concurrent import futures -import os -import json -import re +import datetime +import json import numpy as np +import os +import re +import six import pyarrow as pa import pyarrow.lib as lib @@ -36,6 +38,253 @@ _get_fs_from_path) from pyarrow.util import _is_path_like, _stringify_path, _deprecate_nthreads + +EPOCH_ORDINAL = datetime.date(1970, 1, 1).toordinal() + + +def _timelike_to_arrow_encoding(value, pa_type): + # Date32 columns are encoded as days since 1970 + if pa.types.is_date32(pa_type): + if isinstance(value, datetime.date): + return value.toordinal() - EPOCH_ORDINAL + elif pa.types.is_temporal(pa_type): + unit = pa_type.unit + if unit == "ns": + conversion_factor = 1 + elif unit == "us": + conversion_factor = 10 ** 3 + elif unit == "ms": + conversion_factor = 10 ** 6 + elif unit == "s": + conversion_factor = 10 ** 9 + else: + raise TypeError( + "Unkwnown timestamp resolution encoudtered `{}`".format(unit) + ) + val = np.datetime64(value, 'ns').to_datetime64() + val = int(val.astype("int64") / conversion_factor) + return val + else: + return value + + +def _normalize_value(value, pa_type): + if pa.types.is_string(pa_type): + if isinstance(value, six.binary_type): + return value.decode("utf-8") + elif isinstance(value, six.text_type): + return value + elif pa.types.is_binary(pa_type): + if isinstance(value, six.binary_type): + return value + elif isinstance(value, six.text_type): + return six.text_type(value).encode("utf-8") + elif ( + pa.types.is_integer(pa_type) + and pa.types.is_integer_object(value) + or pa.types.is_floating(pa_type) + and pa.types.is_float_object(value) + or pa.types.is_boolean(pa_type) + and pa.types.is_boolean_object(value) + or pa.types.is_timestamp(pa_type) + and not isinstance(value, (six.binary_type, six.text_type)) + and ( + isinstance(value, (datetime.datetime, np.datetime64)) + ) + ): + return value + elif pa.types.is_date(pa_type): + if isinstance(value, six.string_types): + return datetime.datetime.strptime(value, "%Y-%m-%d").date() + elif isinstance(value, six.binary_type): + value = value.decode("utf-8") + return datetime.datetime.strptime(value, "%Y-%m-%d").date() + elif isinstance(value, datetime.date) and not isinstance( + value, datetime.datetime + ): + return value + raise TypeError( + "Unexpected type for predicate. Expected `{} ({})` but got `{} ({})`".format( + pa_type, pa_type.to_pandas_dtype(), value, type(value) + ) + ) + + +def _highest_significant_position(num): + """ + >>> _highest_significant_position(1.0) + 1 + >>> _highest_significant_position(9.0) + 1 + >>> _highest_significant_position(39.0) + 2 + >>> _highest_significant_position(0.1) + -1 + >>> _highest_significant_position(0.9) + -1 + >>> _highest_significant_position(0.000123) + -4 + >>> _highest_significant_position(1234567.0) + 7 + >>> _highest_significant_position(-0.1) + -1 + >>> _highest_significant_position(-100.0) + 3 + """ + abs_num = np.absolute(num) + log_of_abs = np.log10(abs_num) + position = int(np.floor(log_of_abs)) + + # is position left of decimal point? + if abs_num >= 1.0: + position += 1 + + return position + + +def _epsilon(num): + """ + >>> _epsilon(123456) + 1 + >>> _epsilon(0.123456) + 1e-06 + >>> _epsilon(0.123) + 1e-06 + >>> _epsilon(0) + 0 + >>> _epsilon(-0.123456) + 1e-06 + >>> _epsilon(-123456) + 1 + """ + SIGNIFICANT_DIGITS = 6 + + if num == 0: + return 0 + + epsilon_position = _highest_significant_position(num) - SIGNIFICANT_DIGITS + + # is position right of decimal point? + if epsilon_position < 0: + epsilon_position += 1 + + return 10 ** epsilon_position + + +def _predicate_accepts(predicate, row_meta, arrow_schema, parquet_reader): + """ + Checks if a predicate evaluates on a column. + + This method first casts the value of the predicate to the type used for this column + in the statistics and then applies the relevant operator. The operation applied here + is done in a fashion to check if the predicate would evaluate to True for any possible + row in the RowGroup. Thus e.g. for the `==` predicate, we check if the predicate value + is in the (min, max) range of the RowGroup. + """ + col, op, val = predicate + col_idx = parquet_reader.column_name_idx(col) + pa_type = arrow_schema[col_idx].type + parquet_statistics = row_meta.column(col_idx).statistics + min_value = parquet_statistics.min + max_value = parquet_statistics.max + # Transform the predicate value to the respective type used in the statistics. + + if pa.types.is_string(pa_type): + # String types are always UTF-8 encoded binary strings in parquet + min_value = min_value.decode("utf-8") + max_value = max_value.decode("utf-8") + + # The statistics for floats only contain the 6 most significant digits. + # So a suitable epsilon has to be considered below min and above max. + if isinstance(val, float): + min_value -= _epsilon(min_value) + max_value += _epsilon(max_value) + if op == "==": + return (min_value <= val) and (val <= max_value) + elif op == "!=": + return not ((min_value >= val) and (val >= max_value)) + elif op == "<=": + return min_value <= val + elif op == ">=": + return max_value >= val + elif op == "<": + return min_value < val + elif op == ">": + return max_value > val + elif op == "in": + return not ((max_value < val[0]) or (min_value > val[1])) + else: + raise NotImplementedError("op not supported") + + +# TODO: replace with arrow-builtins +def filter_df_from_predicates(df, predicates): + indexer = np.broadcast_to(False, len(df)) + for conjunction in predicates: + inner_indexer = np.broadcast_to(True, len(df)) + for column, op, value in conjunction: + # datetime is a subclass of date which is why we need this double check + if isinstance(value, datetime.date): + value = np.datetime64(value, 'ns') + elif isinstance(value, list) and isinstance(value[0], datetime.date): + value = [np.datetime64(value, 'ns') for val in value] + ser = df[column].values + if op == "==": + inner_indexer = (ser == value) & inner_indexer + elif op == "!=": + inner_indexer = (ser != value) & inner_indexer + elif op == "<=": + inner_indexer = (ser <= value) & inner_indexer + elif op == ">=": + inner_indexer = (ser >= value) & inner_indexer + elif op == "<": + inner_indexer = (ser < value) & inner_indexer + elif op == ">": + inner_indexer = (ser > value) & inner_indexer + elif op == "in": + inner_indexer = (np.isin(ser, value)) & inner_indexer + else: + raise NotImplementedError("op not supported") + indexer = inner_indexer | indexer + return df[indexer] + + +def _check_contains_null(val): + if isinstance(val, six.binary_type): + for byte in val: + if isinstance(byte, six.binary_type): + compare_to = chr(0) + else: + compare_to = 0 + if byte == compare_to: + return True + return False + + +def check_filters(filters): + """ + Check if filters are well-formed. + """ + if filters is not None: + if len(filters) == 0 or any(len(f) == 0 for f in filters): + raise ValueError("Malformed filters") + if isinstance(filters[0][0], six.string_types): + # We have encountered the situation where we have one nesting level too few: + # We have [(,,), ..] instead of [[(,,), ..]] + filters = [filters] + for conjunction in filters: + for col, op, val in conjunction: + if ( + isinstance(val, list) + and all(_check_contains_null(v) for v in val) + or _check_contains_null(val) + ): + raise NotImplementedError( + "Null-terminated binary strings are not supported as filter values." + ) + return filters + + # ---------------------------------------------------------------------- # Reading a single Parquet file @@ -119,7 +368,72 @@ def read_row_group(self, i, columns=None, nthreads=None, return self.reader.read_row_group(i, column_indices=column_indices, use_threads=use_threads) - def read(self, columns=None, use_threads=True, use_pandas_metadata=False): + def _normalize_filters(self, filters, for_pushdown): + schema = self.schema.to_arrow_schema() + + normalized_filters = [] + for conjunction in filters: + new_conjunction = [] + for literal in conjunction: + col, op, val = literal + col_idx = self.reader.column_name_idx(col) + pa_type = schema[col_idx].type + if op == "in": + values = [_normalize_value(l, pa_type) for l in literal[2]] + if for_pushdown: + # In the case of predicate pushdown, we only compare ranges + # and their overlap. Therefore we only need the min and max + # values of the values. + normalized_value = ( + _timelike_to_arrow_encoding(min(values), pa_type), + _timelike_to_arrow_encoding(max(values), pa_type), + ) + else: + normalized_value = values + else: + normalized_value = _normalize_value(literal[2], pa_type) + if for_pushdown: + normalized_value = _timelike_to_arrow_encoding( + normalized_value, pa_type + ) + new_literal = (literal[0], literal[1], normalized_value) + new_conjunction.append(new_literal) + normalized_filters.append(new_conjunction) + return normalized_filters + + def _read_row_groups_into_tables(self, columns, predicates_in): + """ + For each RowGroup check if the predicate in DNF applies and then + read the respective RowGroup. + """ + arrow_schema = self.schema.to_arrow_schema() + + def all_predicates_accept(row): + # Check if the predicates evaluate on this RowGroup. + # As the predicate is in DNF, we only need a single of the + # inner lists to match. Once we have found a positive match, + # there is no need to check whether the remaining ones apply. + row_meta = self.metadata.row_group(row) + for predicate_list in predicates_in: + if all( + _predicate_accepts(predicate, row_meta, arrow_schema, self.reader) + for predicate in predicate_list + ): + return True + return False + + # Iterate over the RowGroups and evaluate the list of predicates on each + # one of them. Only access those that could contain a row where we could + # get an exact match of the predicate. + result = [] + for row in range(self.num_row_groups): + if all_predicates_accept(row): + row_group = self.read_row_group(row, columns=columns) + result.append(row_group) + return result + + def read(self, columns=None, use_threads=True, use_pandas_metadata=False, + filters=None, exact_filter_evaluation=True): """ Read a Table from Parquet format @@ -134,6 +448,23 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False): use_pandas_metadata : boolean, default False If True and file has custom pandas schema metadata, ensure that index columns are also loaded + filters : list of list of tuple[str, str, Any] + Optional list of filters, like [[('x', '>', 0), ...], that are used + to filter the resulting DataFrame, possibly using predicate pushdown, + if supported by the file format. + This parameter is not compatible with filter_query. + + Predicates are expressed in disjunctive normal form (DNF). This means + that the innermost tuple describe a single column predicate. These + inner predicate make are all combined with a conjunction (AND) into a + larger predicate. The most outer list then combines all filters + with a disjunction (OR). By this, we should be able to express all + kinds of filters that are possible using boolean logic. + exact_filter_evaluation : bool, default True + Evaluate the passed filters fully on the resulting table. Setting + this to false will only evaluate the filters on the RowGroup + level but the resulting table may still contain rows that don't + match the passed filters. Returns ------- @@ -142,8 +473,24 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False): """ column_indices = self._get_column_indices( columns, use_pandas_metadata=use_pandas_metadata) - return self.reader.read_all(column_indices=column_indices, - use_threads=use_threads) + if filters is not None: + filters = check_filters(filters) + filters_for_pushdown = self._normalize_filters(filters, True) + filters = self._normalize_filters(filters, False) + tables = self._read_row_groups_into_tables(columns, filters_for_pushdown) + + if len(tables) == 0: + table = self.schema.to_arrow_schema().empty_table() + else: + table = pa.concat_tables(tables) + + if exact_filter_evaluation: + table = pa.Table.from_pandas(filter_df_from_predicates(table.to_pandas(date_as_object=True), filters)) + return table + else: + result = self.reader.read_all(column_indices=column_indices, + use_threads=use_threads) + return result def scan_contents(self, columns=None, batch_size=65536): """ @@ -417,7 +764,8 @@ def _open(self, open_file_func=None): return reader def read(self, columns=None, use_threads=True, partitions=None, - open_file_func=None, file=None, use_pandas_metadata=False): + open_file_func=None, file=None, use_pandas_metadata=False, + filters=None, exact_filter_evaluation=True): """ Read this piece as a pyarrow.Table @@ -447,7 +795,9 @@ def read(self, columns=None, use_threads=True, partitions=None, options = dict(columns=columns, use_threads=use_threads, - use_pandas_metadata=use_pandas_metadata) + use_pandas_metadata=use_pandas_metadata, + filters=filters, + exact_filter_evaluation=exact_filter_evaluation) if self.row_group is not None: table = reader.read_row_group(self.row_group, **options) @@ -624,7 +974,7 @@ def filter_accepts_partition(self, part_key, filter, level): elif op == 'not in': return p_value not in f_value else: - raise ValueError("'%s' is not a valid operator in predicates.", + raise ValueError("'%s' is not a valid operator in filters.", filter[1]) @@ -825,8 +1175,11 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, if validate_schema: self.validate_schemas() - if filters: + if filters is not None: + filters = check_filters(filters) self._filter(filters) + else: + self.piece_filters = None def validate_schemas(self): open_file = self._get_open_file_func() @@ -858,7 +1211,8 @@ def validate_schemas(self): .format(piece, file_schema, dataset_schema)) - def read(self, columns=None, use_threads=True, use_pandas_metadata=False): + def read(self, columns=None, use_threads=True, use_pandas_metadata=False, + exact_filter_evaluation=True): """ Read multiple Parquet files as a single pyarrow.Table @@ -878,12 +1232,16 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False): """ open_file = self._get_open_file_func() + # TODO: Evaluate filters + tables = [] for piece in self.pieces: table = piece.read(columns=columns, use_threads=use_threads, partitions=self.partitions, open_file_func=open_file, - use_pandas_metadata=use_pandas_metadata) + use_pandas_metadata=use_pandas_metadata, + filters=self.piece_filters, + exact_filter_evaluation=exact_filter_evaluation) tables.append(table) all_data = lib.concat_tables(tables) @@ -933,16 +1291,24 @@ def open_file(path, meta=None): return open_file def _filter(self, filters): - accepts_filter = self.partitions.filter_accepts_partition - - def one_filter_accepts(piece, filter): - return all(accepts_filter(part_key, filter, level) - for level, part_key in enumerate(piece.partition_keys)) - - def all_filters_accept(piece): - return all(one_filter_accepts(piece, f) for f in filters) - - self.pieces = [p for p in self.pieces if all_filters_accept(p)] + # TODO: Handle case where filters apply partly to partitions and partly to pieces + # No-op if there are no partitions + if self.partitions: + accepts_filter = self.partitions.filter_accepts_partition + + def one_filter_accepts(piece, filter): + return all(accepts_filter(part_key, filter, level) + for level, part_key in enumerate(piece.partition_keys)) + + def all_filters_accept(piece): + return any(all(one_filter_accepts(piece, f) for f in conjunction) + for conjunction in filters) + + self.pieces = [p for p in self.pieces if all_filters_accept(p)] + # FIXME + self.piece_filters = None + else: + self.piece_filters = filters def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1): @@ -1007,17 +1373,22 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1): def read_table(source, columns=None, use_threads=True, metadata=None, - use_pandas_metadata=False, nthreads=None): + use_pandas_metadata=False, nthreads=None, + filters=None, exact_filter_evaluation=True): use_threads = _deprecate_nthreads(use_threads, nthreads) if _is_path_like(source): fs = _get_fs_from_path(source) return fs.read_parquet(source, columns=columns, use_threads=use_threads, metadata=metadata, - use_pandas_metadata=use_pandas_metadata) + use_pandas_metadata=use_pandas_metadata, + filters=filters, + exact_filter_evaluation=exact_filter_evaluation) pf = ParquetFile(source, metadata=metadata) return pf.read(columns=columns, use_threads=use_threads, - use_pandas_metadata=use_pandas_metadata) + use_pandas_metadata=use_pandas_metadata, + filters=filters, + exact_filter_evaluation=exact_filter_evaluation) read_table.__doc__ = _read_table_docstring.format( @@ -1030,10 +1401,13 @@ def read_table(source, columns=None, use_threads=True, metadata=None, def read_pandas(source, columns=None, use_threads=True, - nthreads=None, metadata=None): + nthreads=None, metadata=None, + filters=None, exact_filter_evaluation=True): return read_table(source, columns=columns, use_threads=use_threads, - metadata=metadata, use_pandas_metadata=True) + metadata=metadata, use_pandas_metadata=True, + filters=filters, + exact_filter_evaluation=exact_filter_evaluation) read_pandas.__doc__ = _read_table_docstring.format( diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index e7970bb0c40..97a1d20665d 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +from functools import partial + import datetime import decimal import io @@ -2146,3 +2148,135 @@ def test_parquet_writer_context_obj_with_exception(tempdir): expected = pd.concat(frames, ignore_index=True) tm.assert_frame_equal(result.to_pandas(), expected) + + +@pytest.mark.parametrize( + "df", + [ + pd.DataFrame({"string": ["abc", "affe", "banane", "buchstabe"]}), + pd.DataFrame({"integer": np.arange(4)}), + pd.DataFrame({"float": [-3.141591, 0.0, 3.141593, 3.141595]}), + pd.DataFrame( + { + "date": [ + datetime.date(2011, 1, 31), + datetime.date(2011, 2, 3), + datetime.date(2011, 2, 4), + datetime.date(2011, 3, 10), + ] + } + ), + ], +) +@pytest.mark.parametrize('use_post_filter', [True, False]) +def test_filter_pushdown(df, use_post_filter, tmpdir): + """ + Test predicate pushdown for several types and operations. + + The DataFrame parameters all need to be of same length for this test to + work universally. Also the values in the DataFrames need to be sorted in + ascending order. + """ + import pyarrow.parquet as pq + + if not use_post_filter and pd.api.types.is_float_dtype(df.dtypes[0]): + pytest.skip("Floating point operations are too fuzzy to have exact filtering even on single row RowGroups") + if use_post_filter and isinstance(df.iloc[0, 0], datetime.date): + pytest.skip("Exact filtering on date is not supported") + + if use_post_filter: + chunk_size = 2 + read_func = pq.read_pandas + else: + # Don't use the post filter but write out Parquet files with a single + # row per RowGroup to test filters + chunk_size = 1 + read_func = partial(pq.read_pandas, exact_filter_evaluation=False) + + filename = tmpdir / "testfile.parquet" + table = pa.Table.from_pandas(df) + pq.write_table(table, filename, chunk_size=chunk_size) + + # Test `<` and `>` operators + expected = df.iloc[[1, 2], :].copy() + filters = [ + [(df.columns[0], "<", df.iloc[3, 0]), (df.columns[0], ">", df.iloc[0, 0])] + ] + result = read_func(filename, filters=filters).to_pandas(date_as_object=True) + tm.assert_frame_equal(result, expected) + + # Test `=<` and `>=` operators + expected = df.iloc[[1, 2, 3], :].copy() + filters = [ + [(df.columns[0], "<=", df.iloc[3, 0]), (df.columns[0], ">=", df.iloc[1, 0])] + ] + result = read_func(filename, filters=filters).to_pandas(date_as_object=True) + tm.assert_frame_equal(result, expected) + + # Test `==` operator + expected = df.iloc[[1], :].copy() + filters = [[(df.columns[0], "==", df.iloc[1, 0])]] + result = read_func(filename, filters=filters).to_pandas(date_as_object=True) + tm.assert_frame_equal(result, expected) + + # Test `in` operator + expected = df.iloc[[1], :].copy() + filters = [[(df.columns[0], "in", [df.iloc[1, 0]])]] + result = read_func(filename, filters=filters).to_pandas(date_as_object=True) + tm.assert_frame_equal(result, expected) + + # Test `!=` operator + expected = df.iloc[[0, 2, 3], :].copy() + filters = [[(df.columns[0], "!=", df.iloc[1, 0])]] + result = read_func(filename, filters=filters).to_pandas(date_as_object=True) + tm.assert_frame_equal(result, expected) + + # Test empty DataFrame + expected = df.iloc[[], :].copy() + filters = [[(df.columns[0], "<", df.iloc[0, 0])]] + result = read_func(filename, filters=filters).to_pandas(date_as_object=True) + tm.assert_frame_equal(result, expected) + + # Test malformed predicates 1 + filters = [] + with pytest.raises(ValueError) as exc: + read_func(filename, filters=filters) + assert str(exc.value) == "Malformed filters" + + # Test malformed predicates 2 + filters = [[]] + with pytest.raises(ValueError) as exc: + read_func(filename, filters=filters) + assert str(exc.value) == "Malformed filters" + + # Test malformed predicates 3 + filters = [[(df.columns[0], "<", df.iloc[0, 0])], []] + with pytest.raises(ValueError) as exc: + read_func(filename, filters=filters) + assert str(exc.value) == "Malformed filters" + + +def test_predicate_float_equal_big(tmpdir): + import pyarrow.parquet as pq + + df = pd.DataFrame({"float": [3141590.0, 3141592.0, 3141594.0]}) + pq.write_table(pa.Table.from_pandas(df), "testfile.parquet") + + filters = [[("float", "==", 3141592.0)]] + result_df = pq.read_pandas("testfile.parquet", filters=filters).to_pandas() + expected_df = df.iloc[[1], :].copy() + + tm.assert_frame_equal(result_df, expected_df) + + +def test_predicate_float_equal_small(tmpdir): + import pyarrow.parquet as pq + + df = pd.DataFrame({"float": [0.3141590, 0.3141592, 0.3141594]}) + pq.write_table(pa.Table.from_pandas(df), tmpdir / "testfile.parquet") + + filters = [[("float", "==", 0.3141592)]] + result_df = pq.read_pandas(tmpdir / "testfile.parquet", filters=filters).to_pandas() + expected_df = df.iloc[[1], :].copy() + + tm.assert_frame_equal(result_df, expected_df) diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 98c0dda5a45..7ad30e11708 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1474,3 +1474,14 @@ def from_numpy_dtype(object dtype): check_status(NumPyDtypeToArrow(dtype, &c_type)) return pyarrow_wrap_data_type(c_type) + + +def is_boolean_object(object obj): + return IsPyBool(obj) + +def is_integer_object(object obj): + return IsPyInt(obj) + + +def is_float_object(object obj): + return IsPyFloat(obj) diff --git a/python/pyarrow/types.py b/python/pyarrow/types.py index 6ce438657f5..c658934dad1 100644 --- a/python/pyarrow/types.py +++ b/python/pyarrow/types.py @@ -17,6 +17,8 @@ # Tools for dealing with Arrow type metadata in Python +from pyarrow.lib import is_integer_object, is_float_object, is_boolean_object + import pyarrow.lib as lib