diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 556faa74539..231588ab074 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -623,7 +623,8 @@ cdef class ParquetReadOptions: buffer_size : int, default 8192 Size of buffered stream, if enabled. Default is 8KB. dictionary_columns : list of string, default None - Names of columns which should be read as dictionaries. + Names of columns which should be dictionary encoded as + they are read. """ cdef public: @@ -632,9 +633,11 @@ cdef class ParquetReadOptions: set dictionary_columns def __init__(self, bint use_buffered_stream=False, - uint32_t buffer_size=8192, + buffer_size=8192, dictionary_columns=None): self.use_buffered_stream = use_buffered_stream + if buffer_size <= 0: + raise ValueError("Buffer size must be larger than zero") self.buffer_size = buffer_size self.dictionary_columns = set(dictionary_columns or set()) @@ -1191,7 +1194,8 @@ cdef class FileSystemDatasetFactory(DatasetFactory): c_options ) else: - raise TypeError('Must pass either paths or a FileSelector') + raise TypeError('Must pass either paths or a FileSelector, but ' + 'passed {}'.format(type(paths_or_selector))) self.init(GetResultValue(result)) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 656321646a3..8586366d940 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -77,7 +77,7 @@ def _check_contains_null(val): return False -def _check_filters(filters): +def _check_filters(filters, check_null_strings=True): """ Check if filters are well-formed. """ @@ -89,19 +89,94 @@ def _check_filters(filters): # too few: # We have [(,,), ..] instead of [[(,,), ..]] filters = [filters] - for conjunction in filters: - for col, op, val in conjunction: - if ( - isinstance(val, list) - and all(_check_contains_null(v) for v in val) - or _check_contains_null(val) - ): - raise NotImplementedError( - "Null-terminated binary strings are not supported as" - " filter values." - ) + if check_null_strings: + for conjunction in filters: + for col, op, val in conjunction: + if ( + isinstance(val, list) + and all(_check_contains_null(v) for v in val) + or _check_contains_null(val) + ): + raise NotImplementedError( + "Null-terminated binary strings are not supported " + "as filter values." + ) return filters + +_DNF_filter_doc = """Predicates are expressed in disjunctive normal form (DNF), like + ``[[('x', '=', 0), ...], ...]``. DNF allows arbitrary boolean logical + combinations of single column predicates. The innermost tuples each + describe a single column predicate. The list of inner predicates is + interpreted as a conjunction (AND), forming a more selective and + multiple column predicate. Finally, the most outer list combines these + filters as a disjunction (OR). + + Predicates may also be passed as List[Tuple]. This form is interpreted + as a single conjunction. To express OR in predicates, one must + use the (preferred) List[List[Tuple]] notation.""" + + +def _filters_to_expression(filters): + """ + Check if filters are well-formed. + + See _DNF_filter_doc above for more details. + """ + import pyarrow.dataset as ds + + if isinstance(filters, ds.Expression): + return filters + + filters = _check_filters(filters, check_null_strings=False) + + def convert_single_predicate(col, op, val): + field = ds.field(col) + + if op == "=" or op == "==": + return field == val + elif op == "!=": + return field != val + elif op == '<': + return field < val + elif op == '>': + return field > val + elif op == '<=': + return field <= val + elif op == '>=': + return field >= val + elif op == 'in': + return field.isin(val) + elif op == 'not in': + return ~field.isin(val) + else: + raise ValueError( + '"{0}" is not a valid operator in predicates.'.format( + (col, op, val))) + + or_exprs = [] + + for conjunction in filters: + and_exprs = [] + for col, op, val in conjunction: + and_exprs.append(convert_single_predicate(col, op, val)) + + expr = and_exprs[0] + if len(and_exprs) > 1: + for and_expr in and_exprs[1:]: + expr = ds.AndExpression(expr, and_expr) + + or_exprs.append(expr) + + expr = or_exprs[0] + if len(or_exprs) > 1: + expr = ds.OrExpression(*or_exprs) + for or_expr in or_exprs[1:]: + expr = ds.OrExpression(expr, or_expr) + + return expr + + # ---------------------------------------------------------------------- # Reading a single Parquet file @@ -979,7 +1054,18 @@ def _open_dataset_file(dataset, path, meta=None): improve performance in some environments. buffer_size : int, default 0 If positive, perform read buffering when deserializing individual - column chunks. Otherwise IO calls are unbuffered.""" + column chunks. Otherwise IO calls are unbuffered. +partitioning : Partitioning or str or list of str, default "hive" + The partitioning scheme for a partitioned dataset. The default of "hive" + assumes directory names with key=value pairs like "/year=2009/month=11". + In addition, a scheme like "/2009/11" is also supported, in which case + you need to specify the field names or a full schema. See the + ``pyarrow.dataset.partitioning()`` function for more details. +use_legacy_dataset : bool, default True + Set to False to enable the new code path (experimental, using the + new Arrow Dataset API). Among other things, this allows to pass + `filters` for all columns and not only the partition keys, enables + different partitioning schemes, etc.""" class ParquetDataset: @@ -1005,31 +1091,51 @@ class ParquetDataset: validate_schema : bool, default True Check that individual file schemas are all the same / compatible. filters : List[Tuple] or List[List[Tuple]] or None (default) - List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This - implements partition-level (hive) filtering only, i.e., to prevent the - loading of some files of the dataset. - - Predicates are expressed in disjunctive normal form (DNF). This means - that the innermost tuple describe a single column predicate. These - inner predicate make are all combined with a conjunction (AND) into a - larger predicate. The most outer list then combines all filters - with a disjunction (OR). By this, we should be able to express all - kinds of filters that are possible using boolean logic. - - This function also supports passing in as List[Tuple]. These predicates - are evaluated as a conjunction. To express OR in predicates, one must - use the (preferred) List[List[Tuple]] notation. + Rows which do not match the filter predicate will be removed from scanned + data. Partition keys embedded in a nested directory structure will be + exploited to avoid loading files at all if they contain no matching rows. + If `use_legacy_dataset` is True, filters can only reference partition + keys and only a hive-style directory structure is supported. When + setting `use_legacy_dataset` to False, also within-file level filtering + and different partitioning schemes are supported. + + {1} metadata_nthreads: int, default 1 How many threads to allow the thread pool which is used to read the dataset metadata. Increasing this is helpful to read partitioned datasets. -{} -""".format(_read_docstring_common) +{0} +""".format(_read_docstring_common, _DNF_filter_doc) + + def __new__(cls, path_or_paths=None, filesystem=None, schema=None, + metadata=None, split_row_groups=False, validate_schema=True, + filters=None, metadata_nthreads=1, read_dictionary=None, + memory_map=False, buffer_size=0, partitioning="hive", + use_legacy_dataset=True): + if not use_legacy_dataset: + return _ParquetDatasetV2(path_or_paths, filesystem=filesystem, + filters=filters, + partitioning=partitioning, + read_dictionary=read_dictionary, + memory_map=memory_map, + buffer_size=buffer_size, + # unsupported keywords + schema=schema, metadata=metadata, + split_row_groups=split_row_groups, + validate_schema=validate_schema, + metadata_nthreads=metadata_nthreads) + self = object.__new__(cls) + return self def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, read_dictionary=None, - memory_map=False, buffer_size=0): + memory_map=False, buffer_size=0, partitioning="hive", + use_legacy_dataset=True): + if partitioning != "hive": + raise ValueError( + 'Only "hive" for hive-like partitioning is supported when ' + 'using use_legacy_dataset=True') self._metadata = _ParquetDatasetMetadata() a_path = path_or_paths if isinstance(a_path, list): @@ -1252,6 +1358,112 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, return pieces, partitions, common_metadata_path, metadata_path +class _ParquetDatasetV2: + """ + ParquetDataset shim using the Dataset API under the hood. + """ + def __init__(self, path_or_paths, filesystem=None, filters=None, + partitioning="hive", read_dictionary=None, buffer_size=None, + memory_map=False, **kwargs): + import pyarrow.dataset as ds + import pyarrow.fs + + # Raise error for not supported keywords + for keyword, default in [ + ("schema", None), ("metadata", None), + ("split_row_groups", False), ("validate_schema", True), + ("metadata_nthreads", 1)]: + if keyword in kwargs and kwargs[keyword] is not default: + raise ValueError( + "Keyword '{0}' is not yet supported with the new " + "Dataset API".format(keyword)) + + # map old filesystems to new one + # TODO(dataset) deal with other file systems + if isinstance(filesystem, LocalFileSystem): + filesystem = pyarrow.fs.LocalFileSystem(use_mmap=memory_map) + elif filesystem is None and memory_map: + # if memory_map is specified, assume local file system (string + # path can in principle be URI for any filesystem) + filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True) + + # map additional arguments + read_options = {} + if buffer_size: + read_options.update(use_buffered_stream=True, + buffer_size=buffer_size) + if read_dictionary is not None: + read_options.update(dictionary_columns=read_dictionary) + parquet_format = ds.ParquetFileFormat(read_options=read_options) + + self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, + format=parquet_format, + partitioning=partitioning) + self._filters = filters + if filters is not None: + self._filter_expression = _filters_to_expression(filters) + else: + self._filter_expression = None + + @property + def schema(self): + return self._dataset.schema + + def read(self, columns=None, use_threads=True, use_pandas_metadata=False): + """ + Read (multiple) Parquet files as a single pyarrow.Table. + + Parameters + ---------- + columns : List[str] + Names of columns to read from the dataset. + use_threads : bool, default True + Perform multi-threaded column reads. + use_pandas_metadata : bool, default False + If True and file has custom pandas schema metadata, ensure that + index columns are also loaded. + + Returns + ------- + pyarrow.Table + Content of the file as a table (of columns). + """ + # if use_pandas_metadata, we need to include index columns in the + # column selection, to be able to restore those in the pandas DataFrame + metadata = self._dataset.schema.metadata + if columns is not None and use_pandas_metadata: + if metadata and b'pandas' in metadata: + index_columns = set(_get_pandas_index_columns(metadata)) + columns = columns + list(index_columns - set(columns)) + + table = self._dataset.to_table( + columns=columns, filter=self._filter_expression, + use_threads=use_threads + ) + + # if use_pandas_metadata, restore the pandas metadata (which gets + # lost if doing a specific `columns` selection in to_table) + if use_pandas_metadata: + if metadata and b"pandas" in metadata: + new_metadata = table.schema.metadata or {} + new_metadata.update({b"pandas": metadata[b"pandas"]}) + table = table.replace_schema_metadata(new_metadata) + + return table + + def read_pandas(self, **kwargs): + """ + Read dataset including pandas metadata, if any. Other arguments passed + through to ParquetDataset.read, see docstring for further details. + """ + return self.read(use_pandas_metadata=True, **kwargs) + + @property + def pieces(self): + # TODO raise deprecation warning + return list(self._dataset.get_fragments()) + + _read_table_docstring = """ {0} @@ -1271,10 +1483,15 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, If separately computed {1} filters : List[Tuple] or List[List[Tuple]] or None (default) - List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This - implements partition-level (hive) filtering only, i.e., to prevent the - loading of some files of the dataset if `source` is a directory. - See the docstring of ParquetDataset for more details. + Rows which do not match the filter predicate will be removed from scanned + data. Partition keys embedded in a nested directory structure will be + exploited to avoid loading files at all if they contain no matching rows. + If `use_legacy_dataset` is True, filters can only reference partition + keys and only a hive-style directory structure is supported. When + setting `use_legacy_dataset` to False, also within-file level filtering + and different partitioning schemes are supported. + + {3} Returns ------- @@ -1285,12 +1502,32 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, def read_table(source, columns=None, use_threads=True, metadata=None, use_pandas_metadata=False, memory_map=False, read_dictionary=None, filesystem=None, filters=None, - buffer_size=0): + buffer_size=0, partitioning="hive", use_legacy_dataset=True): + if not use_legacy_dataset: + if not _is_path_like(source): + raise ValueError("File-like objects are not yet supported with " + "the new Dataset API") + + dataset = _ParquetDatasetV2( + source, + filesystem=filesystem, + partitioning=partitioning, + memory_map=memory_map, + read_dictionary=read_dictionary, + buffer_size=buffer_size, + filters=filters, + # unsupported keywords + metadata=metadata + ) + return dataset.read(columns=columns, use_threads=use_threads, + use_pandas_metadata=use_pandas_metadata) + if _is_path_like(source): pf = ParquetDataset(source, metadata=metadata, memory_map=memory_map, read_dictionary=read_dictionary, buffer_size=buffer_size, - filesystem=filesystem, filters=filters) + filesystem=filesystem, filters=filters, + partitioning=partitioning) else: pf = ParquetFile(source, metadata=metadata, read_dictionary=read_dictionary, @@ -1307,11 +1544,13 @@ def read_table(source, columns=None, use_threads=True, metadata=None, If True and file has custom pandas schema metadata, ensure that index columns are also loaded""")), """pyarrow.Table - Content of the file as a table (of columns)""") + Content of the file as a table (of columns)""", + _DNF_filter_doc) def read_pandas(source, columns=None, use_threads=True, memory_map=False, - metadata=None, filters=None, buffer_size=0): + metadata=None, filters=None, buffer_size=0, + use_legacy_dataset=True): return read_table( source, columns=columns, @@ -1321,6 +1560,7 @@ def read_pandas(source, columns=None, use_threads=True, memory_map=False, memory_map=memory_map, buffer_size=buffer_size, use_pandas_metadata=True, + use_legacy_dataset=use_legacy_dataset, ) @@ -1330,7 +1570,8 @@ def read_pandas(source, columns=None, use_threads=True, memory_map=False, _read_docstring_common, """pyarrow.Table Content of the file as a Table of Columns, including DataFrame - indexes as columns""") + indexes as columns""", + _DNF_filter_doc) def write_table(table, where, row_group_size=None, version='1.0', diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 7e900f0af8c..89131b428da 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -56,6 +56,21 @@ def datadir(datadir): return datadir / 'parquet' +parametrize_legacy_dataset = pytest.mark.parametrize( + "use_legacy_dataset", + [True, pytest.param(False, marks=pytest.mark.dataset)]) +parametrize_legacy_dataset_not_supported = pytest.mark.parametrize( + "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)]) +parametrize_legacy_dataset_skip_buffer = pytest.mark.parametrize( + "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)]) + + +def deterministic_row_order(use_legacy_dataset, chunk_size=None): + # TODO(datasets) ensure to use use_threads=False with the new dataset API + # in the tests because otherwise the row order is not deterministic + return False if not use_legacy_dataset and chunk_size is not None else True + + def _write_table(table, path, **kwargs): # So we see the ImportError somewhere import pyarrow.parquet as pq @@ -72,18 +87,26 @@ def _read_table(*args, **kwargs): def _roundtrip_table(table, read_table_kwargs=None, - write_table_kwargs=None): + write_table_kwargs=None, use_legacy_dataset=True): read_table_kwargs = read_table_kwargs or {} write_table_kwargs = write_table_kwargs or {} - buf = io.BytesIO() - _write_table(table, buf, **write_table_kwargs) - buf.seek(0) - return _read_table(buf, **read_table_kwargs) + if use_legacy_dataset: + buf = io.BytesIO() + _write_table(table, buf, **write_table_kwargs) + buf.seek(0) + return _read_table(buf, **read_table_kwargs) + else: + from pyarrow.fs import _MockFileSystem + mockfs = _MockFileSystem() + with mockfs.open_output_stream("test") as out: + _write_table(table, out, **write_table_kwargs) + return _read_table("test", filesystem=mockfs, use_legacy_dataset=False, + **read_table_kwargs) def _check_roundtrip(table, expected=None, read_table_kwargs=None, - **write_table_kwargs): + use_legacy_dataset=True, **write_table_kwargs): if expected is None: expected = table @@ -91,31 +114,31 @@ def _check_roundtrip(table, expected=None, read_table_kwargs=None, # intentionally check twice result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs, - write_table_kwargs=write_table_kwargs) + write_table_kwargs=write_table_kwargs, + use_legacy_dataset=use_legacy_dataset) assert result.equals(expected) result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs, - write_table_kwargs=write_table_kwargs) + write_table_kwargs=write_table_kwargs, + use_legacy_dataset=use_legacy_dataset) assert result.equals(expected) -def _roundtrip_pandas_dataframe(df, write_kwargs): +def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True): table = pa.Table.from_pandas(df) - - buf = io.BytesIO() - _write_table(table, buf, **write_kwargs) - - buf.seek(0) - table1 = _read_table(buf) - return table1.to_pandas() + result = _roundtrip_table( + table, write_table_kwargs=write_kwargs, + use_legacy_dataset=use_legacy_dataset) + return result.to_pandas() +@parametrize_legacy_dataset @pytest.mark.parametrize('dtype', [int, float]) -def test_single_pylist_column_roundtrip(tempdir, dtype): +def test_single_pylist_column_roundtrip(tempdir, dtype, use_legacy_dataset): filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__) data = [pa.array(list(map(dtype, range(5))))] table = pa.Table.from_arrays(data, names=['a']) _write_table(table, filename) - table_read = _read_table(filename) + table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) for i in range(table.num_columns): col_written = table[i] col_read = table_read[i] @@ -156,8 +179,9 @@ def alltypes_sample(size=10000, seed=0, categorical=False): @pytest.mark.pandas +@parametrize_legacy_dataset @pytest.mark.parametrize('chunk_size', [None, 1000]) -def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size): +def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size, use_legacy_dataset): df = alltypes_sample(size=10000, categorical=True) filename = tempdir / 'pandas_roundtrip.parquet' @@ -166,10 +190,17 @@ def test_pandas_parquet_2_0_roundtrip(tempdir, chunk_size): _write_table(arrow_table, filename, version="2.0", coerce_timestamps='ms', chunk_size=chunk_size) - table_read = pq.read_pandas(filename) + use_threads = deterministic_row_order(use_legacy_dataset, chunk_size) + table_read = pq.read_pandas( + filename, use_legacy_dataset=use_legacy_dataset, + use_threads=use_threads) assert table_read.schema.pandas_metadata is not None - assert arrow_table.schema.metadata == table_read.schema.metadata + read_metadata = table_read.schema.metadata + if not use_legacy_dataset: + read_metadata.pop(b"ARROW:schema") + + assert arrow_table.schema.metadata == read_metadata df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @@ -181,74 +212,85 @@ def test_parquet_invalid_version(tempdir): _write_table(table, tempdir / 'test_version.parquet', version="2.2") -def test_set_data_page_size(): +@parametrize_legacy_dataset +def test_set_data_page_size(use_legacy_dataset): arr = pa.array([1, 2, 3] * 100000) t = pa.Table.from_arrays([arr], names=['f0']) # 128K, 512K page_sizes = [2 << 16, 2 << 18] for target_page_size in page_sizes: - _check_roundtrip(t, data_page_size=target_page_size) + _check_roundtrip(t, data_page_size=target_page_size, + use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas -def test_chunked_table_write(): +@parametrize_legacy_dataset +def test_chunked_table_write(use_legacy_dataset): # ARROW-232 df = alltypes_sample(size=10) batch = pa.RecordBatch.from_pandas(df) table = pa.Table.from_batches([batch] * 3) - _check_roundtrip(table, version='2.0') + _check_roundtrip( + table, version='2.0', use_legacy_dataset=use_legacy_dataset) df, _ = dataframe_with_lists() batch = pa.RecordBatch.from_pandas(df) table = pa.Table.from_batches([batch] * 3) - _check_roundtrip(table, version='2.0') + _check_roundtrip( + table, version='2.0', use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas -def test_memory_map(tempdir): +@parametrize_legacy_dataset +def test_memory_map(tempdir, use_legacy_dataset): df = alltypes_sample(size=10) table = pa.Table.from_pandas(df) _check_roundtrip(table, read_table_kwargs={'memory_map': True}, - version='2.0') + version='2.0', use_legacy_dataset=use_legacy_dataset) filename = str(tempdir / 'tmp_file') with open(filename, 'wb') as f: _write_table(table, f, version='2.0') - table_read = pq.read_pandas(filename, memory_map=True) + table_read = pq.read_pandas(filename, memory_map=True, + use_legacy_dataset=use_legacy_dataset) assert table_read.equals(table) @pytest.mark.pandas -def test_enable_buffered_stream(tempdir): +@parametrize_legacy_dataset +def test_enable_buffered_stream(tempdir, use_legacy_dataset): df = alltypes_sample(size=10) table = pa.Table.from_pandas(df) _check_roundtrip(table, read_table_kwargs={'buffer_size': 1025}, - version='2.0') + version='2.0', use_legacy_dataset=use_legacy_dataset) filename = str(tempdir / 'tmp_file') with open(filename, 'wb') as f: _write_table(table, f, version='2.0') - table_read = pq.read_pandas(filename, buffer_size=4096) + table_read = pq.read_pandas(filename, buffer_size=4096, + use_legacy_dataset=use_legacy_dataset) assert table_read.equals(table) -def test_special_chars_filename(tempdir): +@parametrize_legacy_dataset +def test_special_chars_filename(tempdir, use_legacy_dataset): table = pa.Table.from_arrays([pa.array([42])], ["ints"]) filename = "foo # bar" path = tempdir / filename assert not path.exists() _write_table(table, str(path)) assert path.exists() - table_read = _read_table(str(path)) + table_read = _read_table(str(path), use_legacy_dataset=use_legacy_dataset) assert table_read.equals(table) @pytest.mark.pandas -def test_empty_table_roundtrip(): +@parametrize_legacy_dataset +def test_empty_table_roundtrip(use_legacy_dataset): df = alltypes_sample(size=10) # Create a non-empty table to infer the types correctly, then slice to 0 @@ -259,24 +301,28 @@ def test_empty_table_roundtrip(): assert table.schema.field('null').type == pa.null() assert table.schema.field('null_list').type == pa.list_(pa.null()) - _check_roundtrip(table, version='2.0') + _check_roundtrip( + table, version='2.0', use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas -def test_empty_table_no_columns(): +@parametrize_legacy_dataset +def test_empty_table_no_columns(use_legacy_dataset): df = pd.DataFrame() empty = pa.Table.from_pandas(df, preserve_index=False) - _check_roundtrip(empty) + _check_roundtrip(empty, use_legacy_dataset=use_legacy_dataset) -def test_empty_lists_table_roundtrip(): +@parametrize_legacy_dataset +def test_empty_lists_table_roundtrip(use_legacy_dataset): # ARROW-2744: Shouldn't crash when writing an array of empty lists arr = pa.array([[], []], type=pa.list_(pa.int32())) table = pa.Table.from_arrays([arr], ["A"]) - _check_roundtrip(table) + _check_roundtrip(table, use_legacy_dataset=use_legacy_dataset) -def test_nested_list_nonnullable_roundtrip_bug(): +@parametrize_legacy_dataset +def test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset): # Reproduce failure in ARROW-5630 typ = pa.list_(pa.field("item", pa.float32(), False)) num_rows = 10000 @@ -284,11 +330,13 @@ def test_nested_list_nonnullable_roundtrip_bug(): pa.array(([[0] * ((i + 5) % 10) for i in range(0, 10)] * (num_rows // 10)), type=typ) ], ['a']) - _check_roundtrip(t, data_page_size=4096) + _check_roundtrip( + t, data_page_size=4096, use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas -def test_pandas_parquet_datetime_tz(): +@parametrize_legacy_dataset_skip_buffer +def test_pandas_parquet_datetime_tz(use_legacy_dataset): s = pd.Series([datetime.datetime(2017, 9, 6)]) s = s.dt.tz_localize('utc') @@ -313,12 +361,14 @@ def test_pandas_parquet_datetime_tz(): @pytest.mark.pandas -def test_datetime_timezone_tzinfo(): +@parametrize_legacy_dataset +def test_datetime_timezone_tzinfo(use_legacy_dataset): value = datetime.datetime(2018, 1, 1, 1, 23, 45, tzinfo=datetime.timezone.utc) df = pd.DataFrame({'foo': [value]}) - _roundtrip_pandas_dataframe(df, write_kwargs={}) + _roundtrip_pandas_dataframe( + df, write_kwargs={}, use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas @@ -342,7 +392,8 @@ def test_pandas_parquet_custom_metadata(tempdir): @pytest.mark.pandas -def test_pandas_parquet_column_multiindex(tempdir): +@parametrize_legacy_dataset +def test_pandas_parquet_column_multiindex(tempdir, use_legacy_dataset): df = alltypes_sample(size=10) df.columns = pd.MultiIndex.from_tuples( list(zip(df.columns, df.columns[::-1])), @@ -355,13 +406,17 @@ def test_pandas_parquet_column_multiindex(tempdir): _write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms') - table_read = pq.read_pandas(filename) + table_read = pq.read_pandas( + filename, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @pytest.mark.pandas -def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir): +@parametrize_legacy_dataset +def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written( + tempdir, use_legacy_dataset +): df = alltypes_sample(size=10000) filename = tempdir / 'pandas_roundtrip.parquet' @@ -373,19 +428,25 @@ def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir): assert js['columns'] _write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms') - table_read = pq.read_pandas(filename) + table_read = pq.read_pandas( + filename, use_legacy_dataset=use_legacy_dataset) js = table_read.schema.pandas_metadata assert not js['index_columns'] - assert arrow_table.schema.metadata == table_read.schema.metadata + read_metadata = table_read.schema.metadata + if not use_legacy_dataset: + read_metadata.pop(b"ARROW:schema") + + assert arrow_table.schema.metadata == read_metadata df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @pytest.mark.pandas -def test_pandas_parquet_1_0_roundtrip(tempdir): +@parametrize_legacy_dataset +def test_pandas_parquet_1_0_roundtrip(tempdir, use_legacy_dataset): size = 10000 np.random.seed(0) df = pd.DataFrame({ @@ -407,7 +468,7 @@ def test_pandas_parquet_1_0_roundtrip(tempdir): filename = tempdir / 'pandas_roundtrip.parquet' arrow_table = pa.Table.from_pandas(df) _write_table(arrow_table, filename, version='1.0') - table_read = _read_table(filename) + table_read = _read_table(filename, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() # We pass uint32_t as int64_t if we write Parquet version 1.0 @@ -417,12 +478,13 @@ def test_pandas_parquet_1_0_roundtrip(tempdir): @pytest.mark.pandas -def test_multiple_path_types(tempdir): +@parametrize_legacy_dataset +def test_multiple_path_types(tempdir, use_legacy_dataset): # Test compatibility with PEP 519 path-like objects path = tempdir / 'zzz.parquet' df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) _write_table(df, path) - table_read = _read_table(path) + table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @@ -430,13 +492,15 @@ def test_multiple_path_types(tempdir): path = str(tempdir) + 'zzz.parquet' df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) _write_table(df, path) - table_read = _read_table(path) + table_read = _read_table(path, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) +# TODO(dataset) duplicate column selection actually gives duplicate columns now @pytest.mark.pandas -def test_pandas_column_selection(tempdir): +@parametrize_legacy_dataset_not_supported +def test_pandas_column_selection(tempdir, use_legacy_dataset): size = 10000 np.random.seed(0) df = pd.DataFrame({ @@ -446,14 +510,17 @@ def test_pandas_column_selection(tempdir): filename = tempdir / 'pandas_roundtrip.parquet' arrow_table = pa.Table.from_pandas(df) _write_table(arrow_table, filename) - table_read = _read_table(filename, columns=['uint8']) + table_read = _read_table( + filename, columns=['uint8'], use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df[['uint8']], df_read) # ARROW-4267: Selection of duplicate columns still leads to these columns # being read uniquely. - table_read = _read_table(filename, columns=['uint8', 'uint8']) + table_read = _read_table( + filename, columns=['uint8', 'uint8'], + use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df[['uint8']], df_read) @@ -491,20 +558,24 @@ def _test_dataframe(size=10000, seed=0): return df +# TODO(ARROW-8074) NativeFile support @pytest.mark.pandas -def test_pandas_parquet_native_file_roundtrip(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset): df = _test_dataframe(10000) arrow_table = pa.Table.from_pandas(df) imos = pa.BufferOutputStream() _write_table(arrow_table, imos, version="2.0") buf = imos.getvalue() reader = pa.BufferReader(buf) - df_read = _read_table(reader).to_pandas() + df_read = _read_table( + reader, use_legacy_dataset=use_legacy_dataset).to_pandas() tm.assert_frame_equal(df, df_read) @pytest.mark.pandas -def test_parquet_incremental_file_build(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_parquet_incremental_file_build(tempdir, use_legacy_dataset): df = _test_dataframe(100) df['unique_id'] = 0 @@ -524,33 +595,40 @@ def test_parquet_incremental_file_build(tempdir): writer.close() buf = out.getvalue() - result = _read_table(pa.BufferReader(buf)) + result = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) expected = pd.concat(frames, ignore_index=True) tm.assert_frame_equal(result.to_pandas(), expected) @pytest.mark.pandas -def test_read_pandas_column_subset(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_read_pandas_column_subset(tempdir, use_legacy_dataset): df = _test_dataframe(10000) arrow_table = pa.Table.from_pandas(df) imos = pa.BufferOutputStream() _write_table(arrow_table, imos, version="2.0") buf = imos.getvalue() reader = pa.BufferReader(buf) - df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas() + df_read = pq.read_pandas( + reader, columns=['strings', 'uint8'], + use_legacy_dataset=use_legacy_dataset + ).to_pandas() tm.assert_frame_equal(df[['strings', 'uint8']], df_read) @pytest.mark.pandas -def test_pandas_parquet_empty_roundtrip(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset): df = _test_dataframe(0) arrow_table = pa.Table.from_pandas(df) imos = pa.BufferOutputStream() _write_table(arrow_table, imos, version="2.0") buf = imos.getvalue() reader = pa.BufferReader(buf) - df_read = _read_table(reader).to_pandas() + df_read = _read_table( + reader, use_legacy_dataset=use_legacy_dataset).to_pandas() tm.assert_frame_equal(df, df_read) @@ -580,7 +658,8 @@ def test_pandas_can_write_nested_data(tempdir): @pytest.mark.pandas -def test_pandas_parquet_pyfile_roundtrip(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset): filename = tempdir / 'pandas_pyfile_roundtrip.parquet' size = 5 df = pd.DataFrame({ @@ -598,13 +677,14 @@ def test_pandas_parquet_pyfile_roundtrip(tempdir): data = io.BytesIO(filename.read_bytes()) - table_read = _read_table(data) + table_read = _read_table(data, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @pytest.mark.pandas -def test_pandas_parquet_configuration_options(tempdir): +@parametrize_legacy_dataset +def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset): size = 10000 np.random.seed(0) df = pd.DataFrame({ @@ -626,14 +706,16 @@ def test_pandas_parquet_configuration_options(tempdir): for use_dictionary in [True, False]: _write_table(arrow_table, filename, version='2.0', use_dictionary=use_dictionary) - table_read = _read_table(filename) + table_read = _read_table( + filename, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) for write_statistics in [True, False]: _write_table(arrow_table, filename, version='2.0', write_statistics=write_statistics) - table_read = _read_table(filename) + table_read = _read_table(filename, + use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @@ -643,7 +725,8 @@ def test_pandas_parquet_configuration_options(tempdir): continue _write_table(arrow_table, filename, version='2.0', compression=compression) - table_read = _read_table(filename) + table_read = _read_table( + filename, use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) @@ -662,7 +745,8 @@ def make_sample_file(table_or_df): return pq.ParquetFile(buf) -def test_byte_stream_split(): +@parametrize_legacy_dataset +def test_byte_stream_split(use_legacy_dataset): # This is only a smoke test. arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) @@ -696,26 +780,31 @@ def test_byte_stream_split(): table = pa.Table.from_arrays([arr_int], names=['tmp']) with pytest.raises(IOError): _check_roundtrip(table, expected=table, use_byte_stream_split=True, - use_dictionary=False) + use_dictionary=False, + use_legacy_dataset=use_legacy_dataset) -def test_compression_level(): +@parametrize_legacy_dataset +def test_compression_level(use_legacy_dataset): arr = pa.array(list(map(int, range(1000)))) data = [arr, arr] table = pa.Table.from_arrays(data, names=['a', 'b']) # Check one compression level. _check_roundtrip(table, expected=table, compression="gzip", - compression_level=1) + compression_level=1, + use_legacy_dataset=use_legacy_dataset) # Check another one to make sure that compression_level=1 does not # coincide with the default one in Arrow. _check_roundtrip(table, expected=table, compression="gzip", - compression_level=5) + compression_level=5, + use_legacy_dataset=use_legacy_dataset) # Check that the user can provide a compression level per column _check_roundtrip(table, expected=table, compression="gzip", - compression_level={'a': 2, 'b': 3}) + compression_level={'a': 2, 'b': 3}, + use_legacy_dataset=use_legacy_dataset) # Check that specifying a compression level for a codec which does allow # specifying one, results into an error. @@ -1356,7 +1445,8 @@ def test_fixed_size_binary(): @pytest.mark.pandas -def test_multithreaded_read(): +@parametrize_legacy_dataset_skip_buffer +def test_multithreaded_read(use_legacy_dataset): df = alltypes_sample(size=10000) table = pa.Table.from_pandas(df) @@ -1365,16 +1455,19 @@ def test_multithreaded_read(): _write_table(table, buf, compression='SNAPPY', version='2.0') buf.seek(0) - table1 = _read_table(buf, use_threads=True) + table1 = _read_table( + buf, use_threads=True, use_legacy_dataset=use_legacy_dataset) buf.seek(0) - table2 = _read_table(buf, use_threads=False) + table2 = _read_table( + buf, use_threads=False, use_legacy_dataset=use_legacy_dataset) assert table1.equals(table2) @pytest.mark.pandas -def test_min_chunksize(): +@parametrize_legacy_dataset_skip_buffer +def test_min_chunksize(use_legacy_dataset): data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D']) table = pa.Table.from_pandas(data.reset_index()) @@ -1382,7 +1475,7 @@ def test_min_chunksize(): _write_table(table, buf, chunk_size=-1) buf.seek(0) - result = _read_table(buf) + result = _read_table(buf, use_legacy_dataset=use_legacy_dataset) assert result.equals(table) @@ -1581,9 +1674,10 @@ def test_partition_set_dictionary_type(): @pytest.mark.pandas -def test_read_partitioned_directory(tempdir): +@parametrize_legacy_dataset +def test_read_partitioned_directory(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() - _partition_test_for_filesystem(fs, tempdir) + _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset) @pytest.mark.pandas @@ -1604,7 +1698,8 @@ def test_create_parquet_dataset_multi_threaded(tempdir): @pytest.mark.pandas -def test_equivalency(tempdir): +@parametrize_legacy_dataset +def test_equivalency(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() base_path = tempdir @@ -1631,7 +1726,8 @@ def test_equivalency(tempdir): dataset = pq.ParquetDataset( base_path, filesystem=fs, filters=[('integer', '=', 1), ('string', '!=', 'b'), - ('boolean', '==', True)] + ('boolean', '==', True)], + use_legacy_dataset=use_legacy_dataset, ) table = dataset.read() result_df = (table.to_pandas().reset_index(drop=True)) @@ -1652,7 +1748,9 @@ def test_equivalency(tempdir): ], [('integer', '=', 0), ('boolean', '==', 'False')] ] - dataset = pq.ParquetDataset(base_path, filesystem=fs, filters=filters) + dataset = pq.ParquetDataset( + base_path, filesystem=fs, filters=filters, + use_legacy_dataset=use_legacy_dataset) table = dataset.read() result_df = table.to_pandas().reset_index(drop=True) @@ -1668,19 +1766,28 @@ def test_equivalency(tempdir): assert df_filter_2.sum() > 0 assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum()) - # Check for \0 in predicate values. Until they are correctly implemented - # in ARROW-3391, they would otherwise lead to weird results with the - # current code. - with pytest.raises(NotImplementedError): - filters = [[('string', '==', b'1\0a')]] - pq.ParquetDataset(base_path, filesystem=fs, filters=filters) - with pytest.raises(NotImplementedError): - filters = [[('string', '==', '1\0a')]] - pq.ParquetDataset(base_path, filesystem=fs, filters=filters) + if use_legacy_dataset: + # Check for \0 in predicate values. Until they are correctly + # implemented in ARROW-3391, they would otherwise lead to weird + # results with the current code. + with pytest.raises(NotImplementedError): + filters = [[('string', '==', b'1\0a')]] + pq.ParquetDataset(base_path, filesystem=fs, filters=filters) + with pytest.raises(NotImplementedError): + filters = [[('string', '==', '1\0a')]] + pq.ParquetDataset(base_path, filesystem=fs, filters=filters) + else: + for filters in [[[('string', '==', b'1\0a')]], + [[('string', '==', '1\0a')]]]: + dataset = pq.ParquetDataset( + base_path, filesystem=fs, filters=filters, + use_legacy_dataset=False) + assert dataset.read().num_rows == 0 @pytest.mark.pandas -def test_cutoff_exclusive_integer(tempdir): +@parametrize_legacy_dataset +def test_cutoff_exclusive_integer(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() base_path = tempdir @@ -1702,7 +1809,8 @@ def test_cutoff_exclusive_integer(tempdir): filters=[ ('integers', '<', 4), ('integers', '>', 1), - ] + ], + use_legacy_dataset=use_legacy_dataset ) table = dataset.read() result_df = (table.to_pandas() @@ -1714,11 +1822,14 @@ def test_cutoff_exclusive_integer(tempdir): @pytest.mark.pandas +@parametrize_legacy_dataset @pytest.mark.xfail( - raises=TypeError, + # different error with use_legacy_datasets because result_df is no longer + # categorical + raises=(TypeError, AssertionError), reason='Loss of type information in creation of categoricals.' ) -def test_cutoff_exclusive_datetime(tempdir): +def test_cutoff_exclusive_datetime(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() base_path = tempdir @@ -1746,7 +1857,8 @@ def test_cutoff_exclusive_datetime(tempdir): filters=[ ('dates', '<', "2018-04-12"), ('dates', '>', "2018-04-10") - ] + ], + use_legacy_dataset=use_legacy_dataset ) table = dataset.read() result_df = (table.to_pandas() @@ -1761,7 +1873,8 @@ def test_cutoff_exclusive_datetime(tempdir): @pytest.mark.pandas -def test_inclusive_integer(tempdir): +@parametrize_legacy_dataset +def test_inclusive_integer(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() base_path = tempdir @@ -1783,7 +1896,8 @@ def test_inclusive_integer(tempdir): filters=[ ('integers', '<=', 3), ('integers', '>=', 2), - ] + ], + use_legacy_dataset=use_legacy_dataset ) table = dataset.read() result_df = (table.to_pandas() @@ -1795,7 +1909,8 @@ def test_inclusive_integer(tempdir): @pytest.mark.pandas -def test_inclusive_set(tempdir): +@parametrize_legacy_dataset +def test_inclusive_set(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() base_path = tempdir @@ -1820,7 +1935,8 @@ def test_inclusive_set(tempdir): dataset = pq.ParquetDataset( base_path, filesystem=fs, filters=[('integer', 'in', {1}), ('string', 'in', {'a', 'b'}), - ('boolean', 'in', {True})] + ('boolean', 'in', {True})], + use_legacy_dataset=use_legacy_dataset ) table = dataset.read() result_df = (table.to_pandas().reset_index(drop=True)) @@ -1831,7 +1947,8 @@ def test_inclusive_set(tempdir): @pytest.mark.pandas -def test_invalid_pred_op(tempdir): +@parametrize_legacy_dataset +def test_invalid_pred_op(tempdir, use_legacy_dataset): fs = LocalFileSystem.get_instance() base_path = tempdir @@ -1851,23 +1968,29 @@ def test_invalid_pred_op(tempdir): with pytest.raises(ValueError): pq.ParquetDataset(base_path, filesystem=fs, - filters=[ - ('integers', '=<', 3), - ]) - - with pytest.raises(ValueError): - pq.ParquetDataset(base_path, - filesystem=fs, - filters=[ - ('integers', 'in', set()), - ]) + filters=[('integers', '=<', 3), ], + use_legacy_dataset=use_legacy_dataset) - with pytest.raises(ValueError): + if use_legacy_dataset: + with pytest.raises(ValueError): + pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', 'in', set()), ], + use_legacy_dataset=use_legacy_dataset) + else: + # Dataset API returns empty table instead + dataset = pq.ParquetDataset(base_path, + filesystem=fs, + filters=[('integers', 'in', set()), ], + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().num_rows == 0 + + with pytest.raises(ValueError if use_legacy_dataset else TypeError): + # dataset API returns TypeError when trying create invalid comparison pq.ParquetDataset(base_path, filesystem=fs, - filters=[ - ('integers', '!=', {3}), - ]) + filters=[('integers', '!=', {3})], + use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas @@ -1956,7 +2079,7 @@ def test_read_partitioned_directory_s3fs(s3_example): dataset.read() -def _partition_test_for_filesystem(fs, base_path): +def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True): foo_keys = [0, 1] bar_keys = ['a', 'b', 'c'] partition_spec = [ @@ -1974,7 +2097,8 @@ def _partition_test_for_filesystem(fs, base_path): _generate_partition_directories(fs, base_path, partition_spec, df) - dataset = pq.ParquetDataset(base_path, filesystem=fs) + dataset = pq.ParquetDataset( + base_path, filesystem=fs, use_legacy_dataset=use_legacy_dataset) table = dataset.read() result_df = (table.to_pandas() .sort_values(by='index') @@ -1983,8 +2107,11 @@ def _partition_test_for_filesystem(fs, base_path): expected_df = (df.sort_values(by='index') .reset_index(drop=True) .reindex(columns=result_df.columns)) - expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys) - expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys) + if use_legacy_dataset: + # TODO(dataset) Dataset API does not create categorical columns + # for partition keys + expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys) + expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys) assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all() @@ -2133,7 +2260,8 @@ def _filter_partition(df, part_keys): @pytest.mark.pandas -def test_read_multiple_files(tempdir): +@parametrize_legacy_dataset +def test_read_multiple_files(tempdir, use_legacy_dataset): nfiles = 10 size = 5 @@ -2159,8 +2287,11 @@ def test_read_multiple_files(tempdir): # Write a _SUCCESS.crc file (dirpath / '_SUCCESS.crc').touch() - def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): - dataset = pq.ParquetDataset(paths, **kwargs) + # TODO(datasets) changed to use_threads=False because otherwise the + # row order is not deterministic + def read_multiple_files(paths, columns=None, use_threads=False, **kwargs): + dataset = pq.ParquetDataset( + paths, use_legacy_dataset=use_legacy_dataset, **kwargs) return dataset.read(columns=columns, use_threads=use_threads) result = read_multiple_files(paths) @@ -2169,13 +2300,15 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): assert result.equals(expected) # Read with provided metadata - metadata = pq.read_metadata(paths[0]) + # TODO(dataset) specifying metadata not yet supported + if use_legacy_dataset: + metadata = pq.read_metadata(paths[0]) - result2 = read_multiple_files(paths, metadata=metadata) - assert result2.equals(expected) + result2 = read_multiple_files(paths, metadata=metadata) + assert result2.equals(expected) - result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema) - assert result3.equals(expected) + result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema) + assert result3.equals(expected) # Read column subset to_read = [0, 2, 6, result.num_columns - 1] @@ -2197,6 +2330,10 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): t = pa.Table.from_pandas(bad_apple) _write_table(t, bad_apple_path) + if not use_legacy_dataset: + # TODO(dataset) Dataset API skips bad files + return + bad_meta = pq.read_metadata(bad_apple_path) with pytest.raises(ValueError): @@ -2215,7 +2352,8 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): @pytest.mark.pandas -def test_dataset_read_pandas(tempdir): +@parametrize_legacy_dataset +def test_dataset_read_pandas(tempdir, use_legacy_dataset): nfiles = 5 size = 5 @@ -2238,7 +2376,7 @@ def test_dataset_read_pandas(tempdir): frames.append(df) paths.append(path) - dataset = pq.ParquetDataset(dirpath) + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) columns = ['uint8', 'strings'] result = dataset.read_pandas(columns=columns).to_pandas() expected = pd.concat([x[columns] for x in frames]) @@ -2247,7 +2385,8 @@ def test_dataset_read_pandas(tempdir): @pytest.mark.pandas -def test_dataset_memory_map(tempdir): +@parametrize_legacy_dataset +def test_dataset_memory_map(tempdir, use_legacy_dataset): # ARROW-2627: Check that we can use ParquetDataset with memory-mapping dirpath = tempdir / guid() dirpath.mkdir() @@ -2257,12 +2396,16 @@ def test_dataset_memory_map(tempdir): table = pa.Table.from_pandas(df) _write_table(table, path, version='2.0') - dataset = pq.ParquetDataset(dirpath, memory_map=True) - assert dataset.pieces[0].read().equals(table) + dataset = pq.ParquetDataset( + dirpath, memory_map=True, use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) + if use_legacy_dataset: + assert dataset.pieces[0].read().equals(table) @pytest.mark.pandas -def test_dataset_enable_buffered_stream(tempdir): +@parametrize_legacy_dataset +def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset): dirpath = tempdir / guid() dirpath.mkdir() @@ -2272,11 +2415,15 @@ def test_dataset_enable_buffered_stream(tempdir): _write_table(table, path, version='2.0') with pytest.raises(ValueError): - pq.ParquetDataset(dirpath, buffer_size=-64) + pq.ParquetDataset( + dirpath, buffer_size=-64, + use_legacy_dataset=use_legacy_dataset) for buffer_size in [128, 1024]: - dataset = pq.ParquetDataset(dirpath, buffer_size=buffer_size) - assert dataset.pieces[0].read().equals(table) + dataset = pq.ParquetDataset( + dirpath, buffer_size=buffer_size, + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) @pytest.mark.pandas @@ -2336,9 +2483,18 @@ def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5): return paths +def _assert_dataset_paths(dataset, paths, use_legacy_dataset): + if use_legacy_dataset: + assert set(map(str, paths)) == {x.path for x in dataset.pieces} + else: + paths = [str(path.as_posix()) for path in paths] + assert set(paths) == set(dataset._dataset.files) + + @pytest.mark.pandas +@parametrize_legacy_dataset @pytest.mark.parametrize('dir_prefix', ['_', '.']) -def test_ignore_private_directories(tempdir, dir_prefix): +def test_ignore_private_directories(tempdir, dir_prefix, use_legacy_dataset): dirpath = tempdir / guid() dirpath.mkdir() @@ -2348,12 +2504,14 @@ def test_ignore_private_directories(tempdir, dir_prefix): # private directory (dirpath / '{}staging'.format(dir_prefix)).mkdir() - dataset = pq.ParquetDataset(dirpath) - assert set(map(str, paths)) == {x.path for x in dataset.pieces} + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + + _assert_dataset_paths(dataset, paths, use_legacy_dataset) @pytest.mark.pandas -def test_ignore_hidden_files_dot(tempdir): +@parametrize_legacy_dataset +def test_ignore_hidden_files_dot(tempdir, use_legacy_dataset): dirpath = tempdir / guid() dirpath.mkdir() @@ -2366,12 +2524,14 @@ def test_ignore_hidden_files_dot(tempdir): with (dirpath / '.private').open('wb') as f: f.write(b'gibberish') - dataset = pq.ParquetDataset(dirpath) - assert set(map(str, paths)) == {x.path for x in dataset.pieces} + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + + _assert_dataset_paths(dataset, paths, use_legacy_dataset) @pytest.mark.pandas -def test_ignore_hidden_files_underscore(tempdir): +@parametrize_legacy_dataset +def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset): dirpath = tempdir / guid() dirpath.mkdir() @@ -2384,12 +2544,14 @@ def test_ignore_hidden_files_underscore(tempdir): with (dirpath / '_started_321').open('wb') as f: f.write(b'abcd') - dataset = pq.ParquetDataset(dirpath) - assert set(map(str, paths)) == {x.path for x in dataset.pieces} + dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset) + + _assert_dataset_paths(dataset, paths, use_legacy_dataset) @pytest.mark.pandas -def test_multiindex_duplicate_values(tempdir): +@parametrize_legacy_dataset +def test_multiindex_duplicate_values(tempdir, use_legacy_dataset): num_rows = 3 numbers = list(range(num_rows)) index = pd.MultiIndex.from_arrays( @@ -2403,7 +2565,7 @@ def test_multiindex_duplicate_values(tempdir): filename = tempdir / 'dup_multi_index_levels.parquet' _write_table(table, filename) - result_table = _read_table(filename) + result_table = _read_table(filename, use_legacy_dataset=use_legacy_dataset) assert table.equals(result_table) result_df = result_table.to_pandas() @@ -2460,22 +2622,26 @@ def test_noncoerced_nanoseconds_written_without_exception(tempdir): pq.write_table(tb, filename, coerce_timestamps='ms', version='2.0') -def test_read_non_existent_file(tempdir): +@parametrize_legacy_dataset +def test_read_non_existent_file(tempdir, use_legacy_dataset): path = 'non-existent-file.parquet' try: - pq.read_table(path) + pq.read_table(path, use_legacy_dataset=use_legacy_dataset) except Exception as e: assert path in e.args[0] -def test_read_table_doesnt_warn(datadir): +@parametrize_legacy_dataset +def test_read_table_doesnt_warn(datadir, use_legacy_dataset): with pytest.warns(None) as record: - pq.read_table(datadir / 'v0.7.1.parquet') + pq.read_table(datadir / 'v0.7.1.parquet', + use_legacy_dataset=use_legacy_dataset) assert len(record) == 0 def _test_write_to_dataset_with_partitions(base_path, + use_legacy_dataset=True, filesystem=None, schema=None, index_name=None): @@ -2505,12 +2671,19 @@ def _test_write_to_dataset_with_partitions(base_path, # partitioned dataset dataset = pq.ParquetDataset(base_path, filesystem=filesystem, - validate_schema=True) + validate_schema=True, + use_legacy_dataset=use_legacy_dataset) # ARROW-2209: Ensure the dataset schema also includes the partition columns - dataset_cols = set(dataset.schema.to_arrow_schema().names) + if use_legacy_dataset: + dataset_cols = set(dataset.schema.to_arrow_schema().names) + else: + # NB schema property is an arrow and not parquet schema + dataset_cols = set(dataset.schema.names) + assert dataset_cols == set(output_table.schema.names) - input_table = dataset.read() + use_threads = deterministic_row_order(use_legacy_dataset) + input_table = dataset.read(use_threads=use_threads) input_df = input_table.to_pandas() # Read data back in and compare with original DataFrame @@ -2520,12 +2693,15 @@ def _test_write_to_dataset_with_partitions(base_path, # Partitioned columns become 'categorical' dtypes input_df = input_df[cols] - for col in partition_by: - output_df[col] = output_df[col].astype('category') + if use_legacy_dataset: + for col in partition_by: + output_df[col] = output_df[col].astype('category') assert output_df.equals(input_df) -def _test_write_to_dataset_no_partitions(base_path, filesystem=None): +def _test_write_to_dataset_no_partitions(base_path, + use_legacy_dataset=True, + filesystem=None): # ARROW-1400 output_df = pd.DataFrame({'group1': list('aaabbbbccc'), 'group2': list('eefeffgeee'), @@ -2549,8 +2725,10 @@ def _test_write_to_dataset_no_partitions(base_path, filesystem=None): # Deduplicated incoming DataFrame should match # original outgoing Dataframe - input_table = pq.ParquetDataset(base_path, - filesystem=filesystem).read() + input_table = pq.ParquetDataset( + base_path, filesystem=filesystem, + use_legacy_dataset=use_legacy_dataset + ).read() input_df = input_table.to_pandas() input_df = input_df.drop_duplicates() input_df = input_df[cols] @@ -2558,28 +2736,37 @@ def _test_write_to_dataset_no_partitions(base_path, filesystem=None): @pytest.mark.pandas -def test_write_to_dataset_with_partitions(tempdir): - _test_write_to_dataset_with_partitions(str(tempdir)) +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions(tempdir, use_legacy_dataset): + _test_write_to_dataset_with_partitions(str(tempdir), use_legacy_dataset) @pytest.mark.pandas -def test_write_to_dataset_with_partitions_and_schema(tempdir): +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions_and_schema( + tempdir, use_legacy_dataset +): schema = pa.schema([pa.field('group1', type=pa.string()), pa.field('group2', type=pa.string()), pa.field('num', type=pa.int64()), pa.field('nan', type=pa.int32()), pa.field('date', type=pa.timestamp(unit='us'))]) - _test_write_to_dataset_with_partitions(str(tempdir), schema=schema) + _test_write_to_dataset_with_partitions( + str(tempdir), use_legacy_dataset, schema=schema) @pytest.mark.pandas -def test_write_to_dataset_with_partitions_and_index_name(tempdir): - _test_write_to_dataset_with_partitions(str(tempdir), - index_name='index_name') +@parametrize_legacy_dataset +def test_write_to_dataset_with_partitions_and_index_name( + tempdir, use_legacy_dataset +): + _test_write_to_dataset_with_partitions( + str(tempdir), use_legacy_dataset, index_name='index_name') @pytest.mark.pandas -def test_write_to_dataset_no_partitions(tempdir): +@parametrize_legacy_dataset +def test_write_to_dataset_no_partitions(tempdir, use_legacy_dataset): _test_write_to_dataset_no_partitions(str(tempdir)) @@ -2628,6 +2815,7 @@ def test_large_table_int32_overflow(): _write_table(table, f) +# TODO(ARROW-8074) buffer support def _simple_table_roundtrip(table, **write_kwargs): stream = pa.BufferOutputStream() _write_table(table, stream, **write_kwargs) @@ -2696,7 +2884,8 @@ def test_list_of_binary_large_cell(): @pytest.mark.pandas -def test_index_column_name_duplicate(tempdir): +@parametrize_legacy_dataset +def test_index_column_name_duplicate(tempdir, use_legacy_dataset): data = { 'close': { pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998, @@ -2715,13 +2904,14 @@ def test_index_column_name_duplicate(tempdir): dfx = pd.DataFrame(data).set_index('time', drop=False) tdfx = pa.Table.from_pandas(dfx) _write_table(tdfx, path) - arrow_table = _read_table(path) + arrow_table = _read_table(path, use_legacy_dataset=use_legacy_dataset) result_df = arrow_table.to_pandas() tm.assert_frame_equal(result_df, dfx) @pytest.mark.pandas -def test_parquet_nested_convenience(tempdir): +@parametrize_legacy_dataset +def test_parquet_nested_convenience(tempdir, use_legacy_dataset): # ARROW-1684 df = pd.DataFrame({ 'a': [[1, 2, 3], None, [4, 5], []], @@ -2733,15 +2923,18 @@ def test_parquet_nested_convenience(tempdir): table = pa.Table.from_pandas(df, preserve_index=False) _write_table(table, path) - read = pq.read_table(path, columns=['a']) + read = pq.read_table( + path, columns=['a'], use_legacy_dataset=use_legacy_dataset) tm.assert_frame_equal(read.to_pandas(), df[['a']]) - read = pq.read_table(path, columns=['a', 'b']) + read = pq.read_table( + path, columns=['a', 'b'], use_legacy_dataset=use_legacy_dataset) tm.assert_frame_equal(read.to_pandas(), df) @pytest.mark.pandas -def test_backwards_compatible_index_naming(datadir): +@parametrize_legacy_dataset +def test_backwards_compatible_index_naming(datadir, use_legacy_dataset): expected_string = b"""\ carat cut color clarity depth table price x y z 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 @@ -2756,13 +2949,17 @@ def test_backwards_compatible_index_naming(datadir): 0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39""" expected = pd.read_csv(io.BytesIO(expected_string), sep=r'\s{2,}', index_col=None, header=0, engine='python') - table = _read_table(datadir / 'v0.7.1.parquet') + table = _read_table( + datadir / 'v0.7.1.parquet', use_legacy_dataset=use_legacy_dataset) result = table.to_pandas() tm.assert_frame_equal(result, expected) @pytest.mark.pandas -def test_backwards_compatible_index_multi_level_named(datadir): +@parametrize_legacy_dataset +def test_backwards_compatible_index_multi_level_named( + datadir, use_legacy_dataset +): expected_string = b"""\ carat cut color clarity depth table price x y z 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 @@ -2781,13 +2978,17 @@ def test_backwards_compatible_index_multi_level_named(datadir): header=0, engine='python' ).sort_index() - table = _read_table(datadir / 'v0.7.1.all-named-index.parquet') + table = _read_table(datadir / 'v0.7.1.all-named-index.parquet', + use_legacy_dataset=use_legacy_dataset) result = table.to_pandas() tm.assert_frame_equal(result, expected) @pytest.mark.pandas -def test_backwards_compatible_index_multi_level_some_named(datadir): +@parametrize_legacy_dataset +def test_backwards_compatible_index_multi_level_some_named( + datadir, use_legacy_dataset +): expected_string = b"""\ carat cut color clarity depth table price x y z 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 @@ -2807,13 +3008,17 @@ def test_backwards_compatible_index_multi_level_some_named(datadir): ).sort_index() expected.index = expected.index.set_names(['cut', None, 'clarity']) - table = _read_table(datadir / 'v0.7.1.some-named-index.parquet') + table = _read_table(datadir / 'v0.7.1.some-named-index.parquet', + use_legacy_dataset=use_legacy_dataset) result = table.to_pandas() tm.assert_frame_equal(result, expected) @pytest.mark.pandas -def test_backwards_compatible_column_metadata_handling(datadir): +@parametrize_legacy_dataset +def test_backwards_compatible_column_metadata_handling( + datadir, use_legacy_dataset +): expected = pd.DataFrame( {'a': [1, 2, 3], 'b': [.1, .2, .3], 'c': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')}) @@ -2823,15 +3028,17 @@ def test_backwards_compatible_column_metadata_handling(datadir): names=['index', None]) path = datadir / 'v0.7.1.column-metadata-handling.parquet' - table = _read_table(path) + table = _read_table(path, use_legacy_dataset=use_legacy_dataset) result = table.to_pandas() tm.assert_frame_equal(result, expected) - table = _read_table(path, columns=['a']) + table = _read_table( + path, columns=['a'], use_legacy_dataset=use_legacy_dataset) result = table.to_pandas() tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True)) +# TODO(dataset) support pickling def _make_dataset_for_pickling(tempdir, N=100): path = tempdir / 'data.parquet' fs = LocalFileSystem.get_instance() @@ -2894,7 +3101,8 @@ def test_cloudpickle_dataset(tempdir, datadir): @pytest.mark.pandas -def test_decimal_roundtrip(tempdir): +@parametrize_legacy_dataset +def test_decimal_roundtrip(tempdir, use_legacy_dataset): num_values = 10 columns = {} @@ -2914,7 +3122,8 @@ def test_decimal_roundtrip(tempdir): string_filename = str(filename) table = pa.Table.from_pandas(expected) _write_table(table, string_filename) - result_table = _read_table(string_filename) + result_table = _read_table( + string_filename, use_legacy_dataset=use_legacy_dataset) result = result_table.to_pandas() tm.assert_frame_equal(result, expected) @@ -2935,7 +3144,8 @@ def test_decimal_roundtrip_negative_scale(tempdir): @pytest.mark.pandas -def test_parquet_writer_context_obj(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_parquet_writer_context_obj(tempdir, use_legacy_dataset): df = _test_dataframe(100) df['unique_id'] = 0 @@ -2953,14 +3163,18 @@ def test_parquet_writer_context_obj(tempdir): frames.append(df.copy()) buf = out.getvalue() - result = _read_table(pa.BufferReader(buf)) + result = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) expected = pd.concat(frames, ignore_index=True) tm.assert_frame_equal(result.to_pandas(), expected) @pytest.mark.pandas -def test_parquet_writer_context_obj_with_exception(tempdir): +@parametrize_legacy_dataset_skip_buffer +def test_parquet_writer_context_obj_with_exception( + tempdir, use_legacy_dataset +): df = _test_dataframe(100) df['unique_id'] = 0 @@ -2985,21 +3199,23 @@ def test_parquet_writer_context_obj_with_exception(tempdir): assert str(e) == error_text buf = out.getvalue() - result = _read_table(pa.BufferReader(buf)) + result = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) expected = pd.concat(frames, ignore_index=True) tm.assert_frame_equal(result.to_pandas(), expected) @pytest.mark.pandas -def test_zlib_compression_bug(): +@parametrize_legacy_dataset_skip_buffer +def test_zlib_compression_bug(use_legacy_dataset): # ARROW-3514: "zlib deflate failed, output buffer too small" table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col']) f = io.BytesIO() pq.write_table(table, f, compression='gzip') f.seek(0) - roundtrip = pq.read_table(f) + roundtrip = pq.read_table(f, use_legacy_dataset=use_legacy_dataset) tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas()) @@ -3051,7 +3267,8 @@ def test_empty_row_groups(tempdir): @pytest.mark.pandas -def test_parquet_writer_with_caller_provided_filesystem(): +@parametrize_legacy_dataset_skip_buffer +def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset): out = pa.BufferOutputStream() class CustomFS(FileSystem): @@ -3078,7 +3295,8 @@ def open(self, path, mode='rb'): assert out.closed buf = out.getvalue() - table_read = _read_table(pa.BufferReader(buf)) + table_read = _read_table( + pa.BufferReader(buf), use_legacy_dataset=use_legacy_dataset) df_read = table_read.to_pandas() tm.assert_frame_equal(df_read, df) @@ -3097,7 +3315,8 @@ def test_writing_empty_lists(): _check_roundtrip(table) -def test_write_nested_zero_length_array_chunk_failure(): +@parametrize_legacy_dataset +def test_write_nested_zero_length_array_chunk_failure(use_legacy_dataset): # Bug report in ARROW-3792 cols = OrderedDict( int32=pa.int32(), @@ -3122,11 +3341,12 @@ def test_write_nested_zero_length_array_chunk_failure(): my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols)) for batch in my_arrays] tbl = pa.Table.from_batches(my_batches, pa.schema(cols)) - _check_roundtrip(tbl) + _check_roundtrip(tbl, use_legacy_dataset=use_legacy_dataset) @pytest.mark.pandas -def test_partitioned_dataset(tempdir): +@parametrize_legacy_dataset +def test_partitioned_dataset(tempdir, use_legacy_dataset): # ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset # to a Parquet file path = tempdir / "ARROW-3208" @@ -3138,7 +3358,8 @@ def test_partitioned_dataset(tempdir): table = pa.Table.from_pandas(df) pq.write_to_dataset(table, root_path=str(path), partition_cols=['one', 'two']) - table = pq.ParquetDataset(path).read() + table = pq.ParquetDataset( + path, use_legacy_dataset=use_legacy_dataset).read() pq.write_table(table, path / "output.parquet") @@ -3156,7 +3377,8 @@ def test_read_column_invalid_index(): @pytest.mark.pandas -def test_direct_read_dictionary(): +@parametrize_legacy_dataset_skip_buffer +def test_direct_read_dictionary(use_legacy_dataset): # ARROW-3325 repeats = 10 nunique = 5 @@ -3172,7 +3394,8 @@ def test_direct_read_dictionary(): contents = bio.getvalue() result = pq.read_table(pa.BufferReader(contents), - read_dictionary=['f0']) + read_dictionary=['f0'], + use_legacy_dataset=use_legacy_dataset) # Compute dictionary-encoded subfield expected = pa.table([table[0].dictionary_encode()], names=['f0']) @@ -3180,14 +3403,17 @@ def test_direct_read_dictionary(): @pytest.mark.pandas -def test_dataset_read_dictionary(tempdir): +@parametrize_legacy_dataset +def test_dataset_read_dictionary(tempdir, use_legacy_dataset): path = tempdir / "ARROW-3325-dataset" t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) pq.write_to_dataset(t1, root_path=str(path)) pq.write_to_dataset(t2, root_path=str(path)) - result = pq.ParquetDataset(path, read_dictionary=['f0']).read() + result = pq.ParquetDataset( + path, read_dictionary=['f0'], + use_legacy_dataset=use_legacy_dataset).read() # The order of the chunks is non-deterministic ex_chunks = [t1[0].chunk(0).dictionary_encode(), @@ -3203,7 +3429,8 @@ def test_dataset_read_dictionary(tempdir): @pytest.mark.pandas -def test_direct_read_dictionary_subfield(): +@parametrize_legacy_dataset_skip_buffer +def test_direct_read_dictionary_subfield(use_legacy_dataset): repeats = 10 nunique = 5 @@ -3216,7 +3443,8 @@ def test_direct_read_dictionary_subfield(): pq.write_table(table, bio) contents = bio.getvalue() result = pq.read_table(pa.BufferReader(contents), - read_dictionary=['f0.list.item']) + read_dictionary=['f0.list.item'], + use_legacy_dataset=use_legacy_dataset) arr = pa.array(data[0]) values_as_dict = arr.values.dictionary_encode() @@ -3298,7 +3526,10 @@ def test_write_to_dataset_metadata(tempdir): assert d1 == d2 -def test_parquet_file_too_small(tempdir): +# TODO(dataset) better error message for invalid files (certainly if it +# is the only one) +@parametrize_legacy_dataset_not_supported +def test_parquet_file_too_small(tempdir, use_legacy_dataset): path = str(tempdir / "test.parquet") with pytest.raises(pa.ArrowInvalid, match='size is 0 bytes'): @@ -3314,7 +3545,8 @@ def test_parquet_file_too_small(tempdir): @pytest.mark.pandas -def test_categorical_index_survives_roundtrip(): +@parametrize_legacy_dataset_skip_buffer +def test_categorical_index_survives_roundtrip(use_legacy_dataset): # ARROW-3652, addressed by ARROW-3246 df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2']) df['c1'] = df['c1'].astype('category') @@ -3323,13 +3555,15 @@ def test_categorical_index_survives_roundtrip(): table = pa.Table.from_pandas(df) bos = pa.BufferOutputStream() pq.write_table(table, bos) - ref_df = pq.read_pandas(bos.getvalue()).to_pandas() + ref_df = pq.read_pandas( + bos.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas() assert isinstance(ref_df.index, pd.CategoricalIndex) assert ref_df.index.equals(df.index) @pytest.mark.pandas -def test_categorical_order_survives_roundtrip(): +@parametrize_legacy_dataset_skip_buffer +def test_categorical_order_survives_roundtrip(use_legacy_dataset): # ARROW-6302 df = pd.DataFrame({"a": pd.Categorical( ["a", "b", "c", "a"], categories=["b", "c", "d"], ordered=True)}) @@ -3339,7 +3573,8 @@ def test_categorical_order_survives_roundtrip(): pq.write_table(table, bos) contents = bos.getvalue() - result = pq.read_pandas(contents).to_pandas() + result = pq.read_pandas( + contents, use_legacy_dataset=use_legacy_dataset).to_pandas() tm.assert_frame_equal(result, df) @@ -3351,7 +3586,8 @@ def _simple_table_write_read(table): return pq.read_table(pa.BufferReader(contents)) -def test_dictionary_array_automatically_read(): +@parametrize_legacy_dataset_skip_buffer +def test_dictionary_array_automatically_read(use_legacy_dataset): # ARROW-3246 # Make a large dictionary, a little over 4MB of data @@ -3416,7 +3652,8 @@ def test_field_id_metadata(): @pytest.mark.pandas -def test_pandas_categorical_na_type_row_groups(): +@parametrize_legacy_dataset_skip_buffer +def test_pandas_categorical_na_type_row_groups(use_legacy_dataset): # ARROW-5085 df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100}) df_category = df.astype({"col": "category", "int": "category"}) @@ -3426,7 +3663,8 @@ def test_pandas_categorical_na_type_row_groups(): # it works pq.write_table(table_cat, buf, version="2.0", chunk_size=10) - result = pq.read_table(buf.getvalue()) + result = pq.read_table( + buf.getvalue(), use_legacy_dataset=use_legacy_dataset) # Result is non-categorical assert result[0].equals(table[0]) @@ -3434,7 +3672,8 @@ def test_pandas_categorical_na_type_row_groups(): @pytest.mark.pandas -def test_pandas_categorical_roundtrip(): +@parametrize_legacy_dataset_skip_buffer +def test_pandas_categorical_roundtrip(use_legacy_dataset): # ARROW-5480, this was enabled by ARROW-3246 # Have one of the categories unobserved and include a null (-1) @@ -3446,7 +3685,8 @@ def test_pandas_categorical_roundtrip(): buf = pa.BufferOutputStream() pq.write_table(pa.table(df), buf) - result = pq.read_table(buf.getvalue()).to_pandas() + result = pq.read_table( + buf.getvalue(), use_legacy_dataset=use_legacy_dataset).to_pandas() assert result.x.dtype == 'category' assert (result.x.cat.categories == categories).all() tm.assert_frame_equal(result, df) @@ -3495,8 +3735,9 @@ def test_multi_dataset_metadata(tempdir): assert md['serialized_size'] > 0 +@parametrize_legacy_dataset @pytest.mark.pandas -def test_filter_before_validate_schema(tempdir): +def test_filter_before_validate_schema(tempdir, use_legacy_dataset): # ARROW-4076 apply filter before schema validation # to avoid checking unneeded schemas @@ -3513,7 +3754,8 @@ def test_filter_before_validate_schema(tempdir): pq.write_table(table2, dir2 / 'data.parquet') # read single file using filter - table = pq.read_table(tempdir, filters=[[('A', '==', 0)]]) + table = pq.read_table(tempdir, filters=[[('A', '==', 0)]], + use_legacy_dataset=use_legacy_dataset) assert table.column('B').equals(pa.chunked_array([[1, 2, 3]])) @@ -3556,6 +3798,7 @@ def test_fastparquet_cross_compatibility(tempdir): tm.assert_frame_equal(table_fp.to_pandas(), df) +@parametrize_legacy_dataset_skip_buffer @pytest.mark.parametrize('array_factory', [ lambda: pa.array([0, None] * 10), lambda: pa.array([0, None] * 10).dictionary_encode(), @@ -3564,7 +3807,9 @@ def test_fastparquet_cross_compatibility(tempdir): ]) @pytest.mark.parametrize('use_dictionary', [False, True]) @pytest.mark.parametrize('read_dictionary', [False, True]) -def test_buffer_contents(array_factory, use_dictionary, read_dictionary): +def test_buffer_contents( + array_factory, use_dictionary, read_dictionary, use_legacy_dataset +): # Test that null values are deterministically initialized to zero # after a roundtrip through Parquet. # See ARROW-8006 and ARROW-8011. @@ -3574,9 +3819,66 @@ def test_buffer_contents(array_factory, use_dictionary, read_dictionary): bio.seek(0) read_dictionary = ['col'] if read_dictionary else None table = pq.read_table(bio, use_threads=False, - read_dictionary=read_dictionary) + read_dictionary=read_dictionary, + use_legacy_dataset=use_legacy_dataset) for col in table.columns: [chunk] = col.chunks buf = chunk.buffers()[1] assert buf.to_pybytes() == buf.size * b"\0" + + +@pytest.mark.dataset +def test_dataset_unsupported_keywords(): + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([])) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([])) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, validate_schema=False) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, split_row_groups=True) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.ParquetDataset("", use_legacy_dataset=False, metadata_nthreads=4) + + with pytest.raises(ValueError, match="not yet supported with the new"): + pq.read_table("", use_legacy_dataset=False, metadata=pa.schema([])) + + +@pytest.mark.dataset +def test_dataset_partitioning(tempdir): + import pyarrow.dataset as ds + + # create small dataset with directory partitioning + root_path = tempdir / "test_partitioning" + (root_path / "2012" / "10" / "01").mkdir(parents=True) + + table = pa.table({'a': [1, 2, 3]}) + pq.write_table( + table, str(root_path / "2012" / "10" / "01" / "data.parquet")) + + # This works with new dataset API + + # read_table + part = ds.partitioning(field_names=["year", "month", "day"]) + result = pq.read_table( + str(root_path), partitioning=part, use_legacy_dataset=False) + assert result.column_names == ["a", "year", "month", "day"] + + result = pq.ParquetDataset( + str(root_path), partitioning=part, use_legacy_dataset=False).read() + assert result.column_names == ["a", "year", "month", "day"] + + # This raises an error for legacy dataset + with pytest.raises(ValueError): + pq.read_table( + str(root_path), partitioning=part, use_legacy_dataset=True) + + with pytest.raises(ValueError): + pq.ParquetDataset( + str(root_path), partitioning=part, use_legacy_dataset=True)