From 3395cdfa4f54913bfc674e70aa209133a9c42be4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 28 Feb 2020 17:42:43 +0100 Subject: [PATCH 01/27] hold a reference to the dataset factory --- python/pyarrow/_dataset.pyx | 23 +++++++++-- python/pyarrow/dataset.py | 39 ++++++++++++------- python/pyarrow/tests/test_dataset.py | 57 ++++++++++++++++++---------- 3 files changed, 83 insertions(+), 36 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ed99f36daf9..97b4b8a5b9f 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -54,12 +54,18 @@ cdef class Dataset: shared_ptr[CDataset] wrapped CDataset* dataset - def __init__(self, children, Schema schema not None): + cdef readonly: + # if a factory object has instantiated the dataset construction then + # hold a reference for the factory to reuse it later + DatasetFactory factory + + def __init__(self): _forbid_instantiation(self.__class__) cdef void init(self, const shared_ptr[CDataset]& sp): self.wrapped = sp self.dataset = sp.get() + self.factory = None @staticmethod cdef wrap(const shared_ptr[CDataset]& sp): @@ -1052,6 +1058,7 @@ cdef class DatasetFactory: Dataset """ cdef: + Dataset dataset shared_ptr[CSchema] sp_schema CResult[shared_ptr[CDataset]] result if schema is not None: @@ -1061,7 +1068,13 @@ cdef class DatasetFactory: else: with nogil: result = self.factory.Finish() - return Dataset.wrap(GetResultValue(result)) + + # instantiate the dataset object and store a reference for the + # factory in order to reuse the same factory object later on + dataset = Dataset.wrap(GetResultValue(result)) + dataset.factory = self + + return dataset cdef class FileSystemFactoryOptions: @@ -1245,7 +1258,7 @@ cdef class UnionDatasetFactory(DatasetFactory): """ cdef: - CDatasetFactory* union_factory + CUnionDatasetFactory* union_factory def __init__(self, list factories): cdef: @@ -1255,6 +1268,10 @@ cdef class UnionDatasetFactory(DatasetFactory): c_factories.push_back(factory.unwrap()) self.init(GetResultValue(CUnionDatasetFactory.Make(c_factories))) + cdef init(self, shared_ptr[CDatasetFactory]& sp): + DatasetFactory.init(self, sp) + self.union_factory = sp.get() + cdef class ScanTask: """Read record batches from a range of a single data fragment. diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b03c8c01f1f..a0a9edbbafe 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -232,10 +232,10 @@ def _ensure_format(obj): raise ValueError("format '{}' is not supported".format(obj)) -def factory(path_or_paths, filesystem=None, partitioning=None, - format=None): +def _filesystem_factory(path_or_paths, filesystem=None, partitioning=None, + format=None): """ - Create a factory which can be used to build a Dataset. + Create a FileSystemDatasetFactory which can be used to build a Dataset. Parameters ---------- @@ -285,22 +285,33 @@ def factory(path_or_paths, filesystem=None, partitioning=None, def _ensure_factory(src, **kwargs): # Need to return DatasetFactory since `dataset` might need to finish the # factory with a unified schema. - # TODO: return Dataset if a specific schema was passed? if _is_path_like(src): - return factory(src, **kwargs) + return _filesystem_factory(src, **kwargs) elif isinstance(src, DatasetFactory): if any(v is not None for v in kwargs.values()): # when passing a SourceFactory, the arguments cannot be specified raise ValueError( - "When passing a DatasetFactory, you cannot pass any " + "When passing a Source(Factory), you cannot pass any " "additional arguments" ) return src elif isinstance(src, Dataset): - raise TypeError( - "Dataset objects are currently not supported, only DatasetFactory " - "instances. Use the factory() function to create such objects." - ) + if any(v is not None for v in kwargs.values()): + # when passing a SourceFactory, the arguments cannot be specified + raise ValueError( + "When passing a DatasetFactory, you cannot pass any " + "additional arguments" + ) + if src.factory is not None: + # the dataset object holds a reference for the constructing factory + # so reuse it + return src.factory + else: + raise TypeError( + "Dataset objects are only supported if they are constructed " + "using the dataset() function or by directly instantiating a " + "DatasetFactory subclass." + ) else: raise TypeError( "Expected a path-like or DatasetFactory, got {}".format(type(src)) @@ -314,7 +325,7 @@ def dataset(paths_or_factories, filesystem=None, partitioning=None, Parameters ---------- - paths_or_factories : path or list of paths or factory or list of factories + sources : path, list of paths, dataset or list of datasets Path to a file or to a directory containing the data files, or a list of paths for a multi-directory dataset. To have more control, a list of factories can be passed, created with the ``factory()`` function (in @@ -345,12 +356,12 @@ def dataset(paths_or_factories, filesystem=None, partitioning=None, Construction from multiple factories: >>> dataset([ - ... factory("s3://old-taxi-data", format="parquet"), - ... factory("local/path/to/new/data", format="csv") + ... dataset("s3://old-taxi-data", format="parquet"), + ... dataset("local/path/to/new/data", format="csv") ... ]) """ - # bundle the keyword arguments + # reuse the keyword arguments for later use kwargs = dict(filesystem=filesystem, partitioning=partitioning, format=format) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 088c834a1dd..f8d4e3ed32c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -79,8 +79,8 @@ def _table_from_pandas(df): @pytest.fixture -def mockfs(request): - request.config.pyarrow.requires('parquet') +@pytest.mark.parquet +def mockfs(): import pyarrow.parquet as pq mockfs = fs._MockFileSystem() @@ -115,9 +115,9 @@ def mockfs(request): @pytest.fixture(scope='module') -def multisourcefs(request): - request.config.pyarrow.requires('pandas') - request.config.pyarrow.requires('parquet') +@pytest.mark.pandas +@pytest.mark.parquet +def multisourcefs(): import pyarrow.parquet as pq df = _generate_data(1000) @@ -955,8 +955,8 @@ def test_open_dataset_list_of_files(tempdir): # list of exact files needs to be passed to source() function # (dataset() will interpret it as separate sources) datasets = [ - ds.dataset(ds.factory([path1, path2])), - ds.dataset(ds.factory([str(path1), str(path2)])) + ds.dataset([path1, path2]), + ds.dataset([str(path1), str(path2)]) ] for dataset in datasets: assert dataset.schema.equals(table.schema) @@ -1046,8 +1046,24 @@ def test_open_dataset_unsupported_format(tempdir): def test_open_dataset_validate_sources(tempdir): _, path = _create_single_file(tempdir) dataset = ds.dataset(path) - with pytest.raises(TypeError, - match="Dataset objects are currently not supported"): + # reuse the dataset factory for construction a union dataset later + assert dataset.factory is not None + + union_dataset = ds.dataset([dataset, dataset]) + assert isinstance(union_dataset, ds.UnionDataset) + + # if the dataset is constructed directly then the factory object is not + # vailable for later reuse, so raise + dataset = ds.FileSystemDataset( + schema=pa.schema([]), + root_partition=None, + file_format=ds.ParquetFileFormat(), + filesystem=fs._MockFileSystem(), + paths_or_selector=[], + partitions=[] + ) + expected_msg = "Dataset objects are only supported if they are constructed" + with pytest.raises(TypeError, match=expected_msg): ds.dataset([dataset]) @@ -1109,8 +1125,11 @@ def test_filter_implicit_cast(tempdir): assert len(result) == 3 -def test_dataset_factory(multisourcefs): - child = ds.factory('/plain', filesystem=multisourcefs, format='parquet') +def test_dataset_union(multisourcefs): + child = ds.FileSystemDatasetFactory( + multisourcefs, fs.FileSelector('/plain'), + format=ds.ParquetFileFormat() + ) factory = ds.UnionDatasetFactory([child]) # TODO(bkietz) reintroduce factory.children property @@ -1122,14 +1141,14 @@ def test_dataset_factory(multisourcefs): def test_multiple_factories(multisourcefs): - src1 = ds.factory('/plain', filesystem=multisourcefs, format='parquet') - src2 = ds.factory('/schema', filesystem=multisourcefs, format='parquet', - partitioning=['week', 'color']) - src3 = ds.factory('/hive', filesystem=multisourcefs, format='parquet', - partitioning='hive') - - assembled = ds.dataset([src1, src2, src3]) - assert isinstance(assembled, ds.Dataset) + child1 = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') + child2 = ds.dataset('/schema', filesystem=multisourcefs, format='parquet', + partitioning=['week', 'color']) + child3 = ds.dataset('/hive', filesystem=multisourcefs, format='parquet', + partitioning='hive') + + assembled = ds.dataset([child1, child2, child3]) + assert isinstance(assembled, ds.UnionDataset) expected_schema = pa.schema([ ('date', pa.date32()), From 12902c7e74ec422adfde40efbcf59ffae53d6676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 28 Feb 2020 19:11:48 +0100 Subject: [PATCH 02/27] fix pandas marker --- python/pyarrow/tests/test_dataset.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index f8d4e3ed32c..ef47ea3b693 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -115,9 +115,9 @@ def mockfs(): @pytest.fixture(scope='module') -@pytest.mark.pandas -@pytest.mark.parquet -def multisourcefs(): +def multisourcefs(request): + request.config.pyarrow.requires('pandas') + request.config.pyarrow.requires('parquet') import pyarrow.parquet as pq df = _generate_data(1000) From fb8bb4b383e4359433aa0827be024e2f83201708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Thu, 9 Apr 2020 20:32:44 +0200 Subject: [PATCH 03/27] rebase --- python/pyarrow/tests/test_dataset.py | 35 ++++++---------------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index ef47ea3b693..b284fdd33b0 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -18,6 +18,7 @@ import contextlib import operator import os +import pathlib import pickle import numpy as np @@ -896,43 +897,21 @@ def _check_dataset(dataset, table): def _check_dataset_from_path(path, table, **kwargs): - import pathlib - # pathlib object assert isinstance(path, pathlib.Path) - dataset = ds.dataset(ds.factory(path, **kwargs)) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) - # string path - dataset = ds.dataset(ds.factory(str(path), **kwargs)) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) + # accept Path, str, List[Path], List[str] + for p in [path, str(path), [path], [str(path)]]: + dataset = ds.dataset(path, **kwargs) + assert isinstance(dataset, ds.FileSystemDataset) + _check_dataset(dataset, table) # relative string path with change_cwd(path.parent): - dataset = ds.dataset(ds.factory(path.name, **kwargs)) + dataset = ds.dataset(path.name, **kwargs) assert isinstance(dataset, ds.FileSystemDataset) _check_dataset(dataset, table) - # passing directly to dataset - dataset = ds.dataset(path, **kwargs) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) - - dataset = ds.dataset(str(path), **kwargs) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) - - # passing list of files (even of length-1) gives UnionDataset - dataset = ds.dataset([path], **kwargs) - assert isinstance(dataset, ds.UnionDataset) - _check_dataset(dataset, table) - - dataset = ds.dataset([str(path)], **kwargs) - assert isinstance(dataset, ds.UnionDataset) - _check_dataset(dataset, table) - @pytest.mark.parquet def test_open_dataset_single_file(tempdir): From 9b5a94a64afc6d66d6a02354fb2fe9f856dc5894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 03:38:53 +0200 Subject: [PATCH 04/27] refine dataset() api --- python/pyarrow/_dataset.pyx | 8 +- python/pyarrow/dataset.py | 476 +++++++++++++++++---------- python/pyarrow/includes/libarrow.pxd | 3 + python/pyarrow/tests/test_dataset.py | 150 ++++++--- python/pyarrow/tests/test_schema.py | 32 ++ python/pyarrow/types.pxi | 40 +++ 6 files changed, 489 insertions(+), 220 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 97b4b8a5b9f..9bcad5b7361 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1106,8 +1106,14 @@ cdef class FileSystemFactoryOptions: __slots__ = () # avoid mistakingly creating attributes - def __init__(self, partition_base_dir=None, exclude_invalid_files=None, + def __init__(self, partition_base_dir=None, partitioning=None, + exclude_invalid_files=None, list selector_ignore_prefixes=None): + if isinstance(partitioning, PartitioningFactory): + self.partitioning_factory = partitioning + elif isinstance(partitioning, Partitioning): + self.partitioning = partitioning + if partition_base_dir is not None: self.partition_base_dir = partition_base_dir if exclude_invalid_files is not None: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index a0a9edbbafe..f5518af05d1 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -20,6 +20,16 @@ import pyarrow as pa from pyarrow.util import _stringify_path, _is_path_like +from pyarrow.fs import ( + _normalize_path, + FileSelector, + FileSystem, + FileType, + LocalFileSystem, + SubTreeFileSystem, + _MockFileSystem +) + from pyarrow._dataset import ( # noqa AndExpression, CastExpression, @@ -55,6 +65,40 @@ ) +def field(name): + """References a named column of the dataset. + + Stores only the field's name. Type and other information is known only when + the expression is applied on a dataset having an explicit scheme. + + Parameters + ---------- + name : string + The name of the field the expression references to. + + Returns + ------- + field_expr : FieldExpression + """ + return FieldExpression(name) + + +def scalar(value): + """Expression representing a scalar value. + + Parameters + ---------- + value : bool, int, float or string + Python value of the scalar. Note that only a subset of types are + currently supported. + + Returns + ------- + scalar_expr : ScalarExpression + """ + return ScalarExpression(value) + + def partitioning(schema=None, field_names=None, flavor=None): """ Specify a partitioning scheme. @@ -153,59 +197,12 @@ def partitioning(schema=None, field_names=None, flavor=None): raise ValueError("Unsupported flavor") -def _ensure_fs(filesystem, path): - # Validate or infer the filesystem from the path - from pyarrow.fs import ( - FileSystem, LocalFileSystem, FileType, _normalize_path) - - if filesystem is None: - # First check if the file exists as a local (relative) file path - filesystem = LocalFileSystem() - try: - infos = filesystem.get_file_info([path])[0] - except OSError: - local_path_exists = False - else: - local_path_exists = (infos.type != FileType.NotFound) - - if not local_path_exists: - # Perhaps it's a URI? - try: - return FileSystem.from_uri(path) - except ValueError as e: - if "empty scheme" not in str(e): - raise - # ARROW-8213: not a URI, assume local path - # to get a nice error message. - - # ensure we have a proper path (eg no backslashes on Windows) - path = _normalize_path(filesystem, path) - - return filesystem, path - - -def _ensure_fs_and_paths(path, filesystem=None): - # Return filesystem and list of string paths or FileSelector - from pyarrow.fs import FileType, FileSelector - - path = _stringify_path(path) - filesystem, path = _ensure_fs(filesystem, path) - infos = filesystem.get_file_info([path])[0] - if infos.type == FileType.Directory: - # for directory, pass a selector - paths_or_selector = FileSelector(path, recursive=True) - elif infos.type == FileType.File: - # for a single file path, pass it as a list - paths_or_selector = [path] - else: - raise FileNotFoundError(path) - - return filesystem, paths_or_selector - - def _ensure_partitioning(scheme): - # Validate input and return a Partitioning(Factory) or passthrough None - # for no partitioning + """ + Validate input and return a Partitioning(Factory). + + It passes None through if no partitioning scheme is defiend. + """ if scheme is None: pass elif isinstance(scheme, str): @@ -215,9 +212,8 @@ def _ensure_partitioning(scheme): elif isinstance(scheme, (Partitioning, PartitioningFactory)): pass else: - ValueError( - "Expected Partitioning or PartitioningFactory, got {}".format( - type(scheme))) + ValueError("Expected Partitioning or PartitioningFactory, got {}" + .format(type(scheme))) return scheme @@ -232,116 +228,261 @@ def _ensure_format(obj): raise ValueError("format '{}' is not supported".format(obj)) -def _filesystem_factory(path_or_paths, filesystem=None, partitioning=None, - format=None): +def _ensure_multiple_sources(paths, filesystem=None): """ - Create a FileSystemDatasetFactory which can be used to build a Dataset. + Treat a list of paths as files belonging to a single file system + + If the file system is local then also validates that all paths + are referencing existing *files* otherwise any non-file paths will be + silently skipped (for example on a remote filesystem). Parameters ---------- - path_or_paths : str, pathlib.Path, or list of those - Path to a file or to a directory containing the data files, or - a list of paths. - filesystem : FileSystem, default None - By default will be inferred from the path. - partitioning : Partitioning or PartitioningFactory or str or list of str - The partitioning scheme specified with the ``partitioning()`` - function. A flavor string can be used as shortcut, and with a list of - field names a DirectionaryPartitioning will be inferred. - format : str, default None - Currently only "parquet" is supported. + paths : list of path-like + Note that URIs are not allowed. + filesystem : FileSystem or str, optional + If an URI is passed, then its path component will act as a prefix for + the file paths. Returns ------- - FileSystemDatasetFactory + (FileSystem, list of str) + File system object and a list of normalized paths. + + Raises + ------ + TypeError + If the passed filesystem has wrong type. + FileNotFoundError + If the file system is local and a referenced path is not available. + ValueError + If the file system is local and a path references a directory or its + type cannot be determined. """ - if not isinstance(path_or_paths, (list, tuple)): - path_or_paths = [path_or_paths] - - partitioning = _ensure_partitioning(partitioning) - format = _ensure_format(format or "parquet") - - # TODO pass through options - options = FileSystemFactoryOptions() - if isinstance(partitioning, PartitioningFactory): - options.partitioning_factory = partitioning - elif isinstance(partitioning, Partitioning): - options.partitioning = partitioning - - factories = [] - for path in path_or_paths: - fs, paths_or_selector = _ensure_fs_and_paths(path, filesystem) - factories.append(FileSystemDatasetFactory(fs, paths_or_selector, - format, options)) - - if len(factories) == 0: - raise ValueError("Need at least one path") - elif len(factories) == 1: - return factories[0] + if filesystem is None: + # fall back to local file system as the default + filesystem = LocalFileSystem() + paths_are_local = True + elif isinstance(filesystem, str): + # instantiate the file system from an uri, if the uri has a path + # component then it will be treated as a path prefix + filesystem, prefix = FileSystem.from_uri(filesystem) + paths_are_local = isinstance(filesystem, LocalFileSystem) + prefix = _normalize_path(filesystem, prefix) + if prefix: + filesystem = SubTreeFileSystem(prefix, filesystem) + elif isinstance(filesystem, (LocalFileSystem, _MockFileSystem)): + paths_are_local = True + elif not isinstance(filesystem, FileSystem): + raise TypeError( + '`filesystem` argument must be a FileSystem instance or a valid ' + 'file system URI' + ) else: - return UnionDatasetFactory(factories) + paths_are_local = False + + # allow normalizing irregular paths such as Windows local paths + paths = [_normalize_path(filesystem, _stringify_path(p)) for p in paths] + + # validate that all of the paths are pointing to existing *files* + # possible improvement is to group the file_infos by type and raise for + # multiple paths per error category + if paths_are_local: + for info in filesystem.get_file_info(paths): + file_type = info.type + if file_type == FileType.File: + continue + elif file_type == FileType.NotFound: + raise FileNotFoundError(info.path) + elif file_type == FileType.Directory: + raise ValueError( + 'Path {} points to a directory, but only file paths are ' + 'supported. To construct a nested or union dataset pass ' + 'a list of dataset objects instead.'.format(info.path) + ) + else: + raise ValueError( + 'Path {} exists but its type is unknown (could be a ' + 'special file such as a Unix socket or character device, ' + 'or Windows NUL / CON / ...'.format(info.path) + ) + return (filesystem, paths) -def _ensure_factory(src, **kwargs): - # Need to return DatasetFactory since `dataset` might need to finish the - # factory with a unified schema. - if _is_path_like(src): - return _filesystem_factory(src, **kwargs) - elif isinstance(src, DatasetFactory): - if any(v is not None for v in kwargs.values()): - # when passing a SourceFactory, the arguments cannot be specified - raise ValueError( - "When passing a Source(Factory), you cannot pass any " - "additional arguments" - ) - return src - elif isinstance(src, Dataset): - if any(v is not None for v in kwargs.values()): - # when passing a SourceFactory, the arguments cannot be specified - raise ValueError( - "When passing a DatasetFactory, you cannot pass any " - "additional arguments" - ) - if src.factory is not None: - # the dataset object holds a reference for the constructing factory - # so reuse it - return src.factory + +def _ensure_single_source(path, filesystem=None): + """ + Treat path as either a recursively traversable directory or a single file. + + Parameters + ---------- + path : path-like + filesystem : FileSystem or str, optional + If an URI is passed, then its path component will act as a prefix for + the file paths. + + Returns + ------- + (FileSystem, list of str or fs.Selector) + File system object and either a single item list pointing to a file or + an fs.Selector object pointing to a directory. + + Raises + ------ + TypeError + If the passed filesystem has wrong type. + FileNotFoundError + If the referenced file or directory doesn't exist. + """ + path = _stringify_path(path) + + # if filesystem is not given try to automatically determine one + # first check if the file exists as a local (relative) file path + # if not then try to parse the path as an URI + file_info = None + if filesystem is None: + filesystem = LocalFileSystem() + try: + file_info = filesystem.get_file_info([path])[0] + except OSError: + file_info = None + exists_locally = False else: - raise TypeError( - "Dataset objects are only supported if they are constructed " - "using the dataset() function or by directly instantiating a " - "DatasetFactory subclass." - ) - else: + exists_locally = (file_info.type != FileType.NotFound) + + # if the file or directory doesn't exists locally, then assume that + # the path is an URI describing the file system as well + if not exists_locally: + try: + filesystem, path = FileSystem.from_uri(path) + except ValueError as e: + # ARROW-8213: neither an URI nor a locally existing path, + # so assume that local path was given and propagate a nicer + # file not found error instead of a more confusing scheme + # parsing error + if "empty scheme" not in str(e): + raise + else: + # unset file_info to query it again from the new filesystem + file_info = None + elif isinstance(filesystem, str): + # instantiate the file system from an uri, if the uri has a path + # component then it will be treated as a path prefix + filesystem, prefix = FileSystem.from_uri(filesystem) + prefix = _normalize_path(filesystem, prefix) + if prefix: + filesystem = SubTreeFileSystem(prefix, filesystem) + elif not isinstance(filesystem, FileSystem): raise TypeError( - "Expected a path-like or DatasetFactory, got {}".format(type(src)) + '`filesystem` argument must be a FileSystem instance or a valid ' + 'file system URI' ) + # retrieve the file descriptor if it is available already + if file_info is None: + file_info = filesystem.get_file_info([path])[0] + + # depending on the path type either return with a recursive + # directory selector or as a list containing a single file + if file_info.type == FileType.Directory: + paths_or_selector = FileSelector(path, recursive=True) + elif file_info.type == FileType.File: + paths_or_selector = [path] + else: + raise FileNotFoundError(path) + + return (filesystem, paths_or_selector) + -def dataset(paths_or_factories, filesystem=None, partitioning=None, - format=None, schema=None): +def _filesystem_dataset(source, schema=None, filesystem=None, + partitioning=None, format=None, + partition_base_dir=None, exclude_invalid_files=None, + ignore_prefixes=None): + """ + Create a FileSystemDataset which can be used to build a Dataset. + + Parameters are documented in the dataset function. + + Returns + ------- + FileSystemDataset + """ + format = _ensure_format(format or 'parquet') + partitioning = _ensure_partitioning(partitioning) + + if isinstance(source, (list, tuple)): + fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) + else: + fs, paths_or_selector = _ensure_single_source(source, filesystem) + + options = FileSystemFactoryOptions( + partitioning=partitioning, + partition_base_dir=partition_base_dir, + exclude_invalid_files=exclude_invalid_files, + ignore_prefixes=ignore_prefixes + ) + factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options) + + return factory.finish(schema) + + +def _union_dataset(source, schema=None, **kwargs): + if any(v is not None for v in kwargs.values()): + raise ValueError( + "When other datasets you cannot pass any additional arguments" + ) + + if schema is None: + # unify the children datasets' schemas + schema = pa.Schema.merge([ds.schema for ds in source]) + + # create datasets with the requested schema + children = [ds.replace_schema(schema) for ds in source] + + return UnionDataset(schema, children) + + +def dataset(source, schema=None, filesystem=None, partitioning=None, + format=None, partition_base_dir=None, exclude_invalid_files=None, + ignore_prefixes=None): """ Open a dataset. Parameters ---------- - sources : path, list of paths, dataset or list of datasets + source : path, list of paths, dataset or list of datasets Path to a file or to a directory containing the data files, or a list of paths for a multi-directory dataset. To have more control, a list of factories can be passed, created with the ``factory()`` function (in this case, the additional keywords will be ignored). + schema : Schema, optional + Optionally provide the Schema for the Dataset, in which case it will + not be inferred from the source. filesystem : FileSystem, default None By default will be inferred from the path. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of field names a DirectionaryPartitioning will be inferred. - format : str + format : FileFormat or str Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For Feather, only version 2 files are supported. - schema : Schema, optional - Optionally provide the Schema for the Dataset, in which case it will - not be inferred from the source. + partition_base_dir : str, optional + For the purposes of applying the partitioning, paths will be + stripped of the partition_base_dir. Files not matching the + partition_base_dir prefix will be skipped for partitioning discovery. + The ignored files will still be part of the Dataset, but will not + have partition information. + exclude_invalid_files : bool, optional (default True) + If True, invalid files will be excluded (file format specific check). + This will incur IO for each files in a serial and single threaded + fashion. Disabling this feature will skip the IO, but unsupported + files may be present in the Dataset (resulting in an error at scan + time). + ignore_prefixes : list, optional + Files matching one of those prefixes will be ignored by the + discovery process. This is matched to the basename of a path. + By default this is ['.', '_']. Returns ------- @@ -357,54 +498,29 @@ def dataset(paths_or_factories, filesystem=None, partitioning=None, >>> dataset([ ... dataset("s3://old-taxi-data", format="parquet"), - ... dataset("local/path/to/new/data", format="csv") + ... dataset("local/path/to/new/data", format="ipc") ... ]) """ - # reuse the keyword arguments for later use - kwargs = dict(filesystem=filesystem, partitioning=partitioning, - format=format) - - single_dataset = False - if not isinstance(paths_or_factories, list): - paths_or_factories = [paths_or_factories] - single_dataset = True - - factories = [_ensure_factory(f, **kwargs) for f in paths_or_factories] - if single_dataset: - return factories[0].finish(schema=schema) - return UnionDatasetFactory(factories).finish(schema=schema) - - -def field(name): - """References a named column of the dataset. - - Stores only the field's name. Type and other information is known only when - the expression is applied on a dataset having an explicit scheme. - - Parameters - ---------- - name : string - The name of the field the expression references to. - - Returns - ------- - field_expr : FieldExpression - """ - return FieldExpression(name) - - -def scalar(value): - """Expression representing a scalar value. - - Parameters - ---------- - value : bool, int, float or string - Python value of the scalar. Note that only a subset of types are - currently supported. - - Returns - ------- - scalar_expr : ScalarExpression - """ - return ScalarExpression(value) + # collect the keyword arguments for later reuse + kwargs = dict( + filesystem=filesystem, + partitioning=partitioning, + format=format, + partition_base_dir=partition_base_dir, + exclude_invalid_files=exclude_invalid_files, + ignore_prefixes=ignore_prefixes + ) + + # TODO(kszucs): support InMemoryDataset for a table input + if _is_path_like(source): + return _filesystem_dataset(source, schema=schema, **kwargs) + elif isinstance(source, (tuple, list)): + if all(_is_path_like(elem) for elem in source): + return _filesystem_dataset(source, schema=schema, **kwargs) + elif all(isinstance(elem, Dataset) for elem in source): + return _union_dataset(source, schema=schema, **kwargs) + else: + raise TypeError('vvvvvvvvvvvvvvv') + else: + raise TypeError('yyyyyyyyyyyy') diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 1511ac29db9..fc5b73563bb 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -411,6 +411,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: const shared_ptr[CKeyValueMetadata]& metadata) shared_ptr[CSchema] RemoveMetadata() + CResult[shared_ptr[CSchema]] UnifySchemas( + const vector[shared_ptr[CSchema]]& schemas) + cdef cppclass PrettyPrintOptions: PrettyPrintOptions() PrettyPrintOptions(int indent_arg) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b284fdd33b0..cc6fcfa0d05 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -273,7 +273,6 @@ def test_dataset(dataset): assert isinstance(dataset.schema, pa.Schema) # TODO(kszucs): test non-boolean Exprs for filter do raise - expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) for task in dataset.scan(): @@ -550,12 +549,7 @@ def test_file_format_pickling(): @pytest.mark.parametrize('paths_or_selector', [ fs.FileSelector('subdir', recursive=True), [ - 'subdir', - 'subdir/1', - 'subdir/1/xxx', 'subdir/1/xxx/file0.parquet', - 'subdir/2', - 'subdir/2/yyy', 'subdir/2/yyy/file1.parquet', ] ]) @@ -603,7 +597,8 @@ def test_filesystem_factory(mockfs, paths_or_selector): expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) expected_str = pa.DictionaryArray.from_arrays( pa.array([0, 1, 2, 3, 4], type=pa.int32()), - pa.array("0 1 2 3 4".split(), type=pa.string())) + pa.array("0 1 2 3 4".split(), type=pa.string()) + ) for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']): expected_group = pa.array([group] * 5, type=pa.int32()) expected_key = pa.array([key] * 5, type=pa.string()) @@ -840,7 +835,6 @@ def test_partitioning_function(): names = ["year", "month"] # default DirectoryPartitioning - part = ds.partitioning(schema) assert isinstance(part, ds.DirectoryPartitioning) part = ds.partitioning(field_names=names) @@ -854,7 +848,6 @@ def test_partitioning_function(): ds.partitioning(schema, field_names=schema) # Hive partitioning - part = ds.partitioning(schema, flavor="hive") assert isinstance(part, ds.HivePartitioning) part = ds.partitioning(flavor="hive") @@ -943,6 +936,88 @@ def test_open_dataset_list_of_files(tempdir): assert result.equals(table) +def test_costruct_from_single_file(tempdir): + directory = tempdir / 'single-file' + directory.mkdir() + table, path = _create_single_file(directory) + relative_path = path.relative_to(directory) + + # instantiate from a single file + d1 = ds.dataset(path) + # instantiate from a single file with a filesystem object + d2 = ds.dataset(path, filesystem=fs.LocalFileSystem()) + # instantiate from a single file with prefixed filesystem URI + d3 = ds.dataset(relative_path, filesystem='file://{}'.format(directory)) + assert d1.to_table() == d2.to_table() == d3.to_table() + + +def test_costruct_from_single_directory(tempdir): + directory = tempdir / 'single-directory' + directory.mkdir() + tables, paths = _create_directory_of_files(directory) + + # instantiate from a single directory + d1 = ds.dataset(directory) + d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) + d3 = ds.dataset(directory.name, + filesystem='file://{}'.format(directory.parent)) + assert d1.to_table() == d2.to_table() == d3.to_table() + + +def test_costruct_from_list_of_files(tempdir): + # instantiate from a list of files + directory = tempdir / 'list-of-files' + directory.mkdir() + tables, paths = _create_directory_of_files(directory) + + relative_paths = [p.relative_to(tempdir) for p in paths] + with change_cwd(tempdir): + d1 = ds.dataset(relative_paths) + t1 = d1.to_table() + assert len(t1) == sum(map(len, tables)) + + d2 = ds.dataset(relative_paths, filesystem='file://{}'.format(tempdir)) + t2 = d2.to_table() + d3 = ds.dataset(paths) + t3 = d3.to_table() + d4 = ds.dataset(paths, filesystem='file://') + t4 = d4.to_table() + d5 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) + t5 = d5.to_table() + + assert t1 == t2 == t3 == t4 == t5 + + +def test_construct_from_list_of_mixed_paths_fails(mockfs): + # isntantiate from a list of mixed paths + files = [ + 'subdir/1/xxx/file0.parquet', + 'subdir/1/xxx/doesnt-exist.parquet', + ] + with pytest.raises(FileNotFoundError, match='doesnt-exist'): + ds.dataset(files, filesystem=mockfs) + + +def test_construct_from_mixed_child_datasets(mockfs): + # isntantiate from a list of mixed paths + dataset = ds.dataset([ + ds.dataset(['subdir/1/xxx/file0.parquet', + 'subdir/2/yyy/file1.parquet'], filesystem=mockfs), + ds.dataset('subdir', filesystem=mockfs) + ]) + assert isinstance(dataset, ds.UnionDataset) + assert len(list(dataset.get_fragments())) == 4 + + table = dataset.to_table() + assert len(table) == 20 + assert table.num_columns == 4 + + +def test_construct_from_datasets_with_different_schemas(): + # TODO(kszucs) + pass + + @pytest.mark.parquet def test_open_dataset_partitioned_directory(tempdir): import pyarrow.parquet as pq @@ -1022,37 +1097,18 @@ def test_open_dataset_unsupported_format(tempdir): @pytest.mark.parquet -def test_open_dataset_validate_sources(tempdir): +def test_open_union_dataset(tempdir): _, path = _create_single_file(tempdir) dataset = ds.dataset(path) - # reuse the dataset factory for construction a union dataset later - assert dataset.factory is not None - union_dataset = ds.dataset([dataset, dataset]) - assert isinstance(union_dataset, ds.UnionDataset) + union = ds.dataset([dataset, dataset]) + assert isinstance(union, ds.UnionDataset) - # if the dataset is constructed directly then the factory object is not - # vailable for later reuse, so raise - dataset = ds.FileSystemDataset( - schema=pa.schema([]), - root_partition=None, - file_format=ds.ParquetFileFormat(), - filesystem=fs._MockFileSystem(), - paths_or_selector=[], - partitions=[] - ) - expected_msg = "Dataset objects are only supported if they are constructed" - with pytest.raises(TypeError, match=expected_msg): - ds.dataset([dataset]) - -def test_open_dataset_from_source_additional_kwargs(multisourcefs): - child = ds.FileSystemDatasetFactory( - multisourcefs, fs.FileSelector('/plain'), - format=ds.ParquetFileFormat() - ) +def test_open_union_dataset_with_additional_kwargs(multisourcefs): + child = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') with pytest.raises(ValueError, match="cannot pass any additional"): - ds.dataset(child, format="parquet") + ds.dataset([child], format="parquet") def test_open_dataset_non_existing_file(): @@ -1129,6 +1185,10 @@ def test_multiple_factories(multisourcefs): assembled = ds.dataset([child1, child2, child3]) assert isinstance(assembled, ds.UnionDataset) + msg = 'cannot pass any additional arguments' + with pytest.raises(ValueError, match=msg): + ds.dataset([child1, child2], filesystem=multisourcefs) + expected_schema = pa.schema([ ('date', pa.date32()), ('index', pa.int64()), @@ -1141,10 +1201,19 @@ def test_multiple_factories(multisourcefs): assert assembled.schema.equals(expected_schema) -def test_multiple_factories_with_selectors(multisourcefs): +def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): + msg = 'points to a directory, but only file paths are supported' + with pytest.raises(ValueError, match=msg): + ds.dataset(['/plain', '/schema', '/hive'], filesystem=multisourcefs) + + +def test_union_dataset_filesystem_datasets(multisourcefs): # without partitioning - dataset = ds.dataset(['/plain', '/schema', '/hive'], - filesystem=multisourcefs, format='parquet') + dataset = ds.dataset([ + ds.dataset('/plain', filesystem=multisourcefs), + ds.dataset('/schema', filesystem=multisourcefs), + ds.dataset('/hive', filesystem=multisourcefs), + ]) expected_schema = pa.schema([ ('date', pa.date32()), ('index', pa.int64()), @@ -1154,8 +1223,11 @@ def test_multiple_factories_with_selectors(multisourcefs): assert dataset.schema.equals(expected_schema) # with hive partitioning for two hive sources - dataset = ds.dataset(['/hive', '/hive_color'], filesystem=multisourcefs, - format='parquet', partitioning='hive') + dataset = ds.dataset([ + ds.dataset('/plain', filesystem=multisourcefs), + ds.dataset('/schema', filesystem=multisourcefs), + ds.dataset('/hive', filesystem=multisourcefs, partitioning='hive') + ]) expected_schema = pa.schema([ ('date', pa.date32()), ('index', pa.int64()), diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py index dac76def856..cbaef51bba0 100644 --- a/python/pyarrow/tests/test_schema.py +++ b/python/pyarrow/tests/test_schema.py @@ -654,3 +654,35 @@ def test_schema_sizeof(): assert sys.getsizeof(schema2) > sys.getsizeof(schema) schema3 = schema.with_metadata({"key": "some more metadata"}) assert sys.getsizeof(schema3) > sys.getsizeof(schema2) + + +def test_schema_merge(): + a = pa.schema([ + pa.field('foo', pa.int32()), + pa.field('bar', pa.string()), + pa.field('baz', pa.list_(pa.int8())) + ]) + b = pa.schema([ + pa.field('foo', pa.int32()), + pa.field('qux', pa.bool_()) + ]) + c = pa.schema([ + pa.field('quux', pa.dictionary(pa.int32(), pa.string())) + ]) + d = pa.schema([ + pa.field('foo', pa.int64()), + pa.field('qux', pa.bool_()) + ]) + + result = pa.Schema.merge([a, b, c]) + expected = pa.schema([ + pa.field('foo', pa.int32()), + pa.field('bar', pa.string()), + pa.field('baz', pa.list_(pa.int8())), + pa.field('qux', pa.bool_()), + pa.field('quux', pa.dictionary(pa.int32(), pa.string())) + ]) + assert result.equals(expected) + + with pytest.raises(pa.ArrowInvalid): + pa.Schema.merge([b, d]) diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 6609a81581f..837c41f9c69 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1278,6 +1278,46 @@ cdef class Schema: return self.sp_schema.get().Equals(deref(other.schema), check_metadata) + @classmethod + def merge(cls, list schemas): + """ + Unifies schemas by merging fields by name. + + The resulting schema will contain the union of fields from all schemas. + Fields with the same name will be merged. + - The unified field will inherit the metadata from the schema where + that field is first defined. + - The first N fields in the schema will be ordered the same as the + N fields in the first schema. + + The resulting schema will inherit its metadata from the first input + schema. + + Returns an error if: + + + Parameters + ---------- + schemas : list of Schema + Schemas to merge into a single one. + + Returns + ------- + Schema + + Raises + ------ + ArrowInvalid : + If any input schema contains fields with duplicate names. + If Fields of the same name are not mergeable. + """ + cdef: + Schema schema + vector[shared_ptr[CSchema]] c_schemas + for schema in schemas: + c_schemas.push_back(pyarrow_unwrap_schema(schema)) + return pyarrow_wrap_schema(GetResultValue(UnifySchemas(c_schemas))) + @classmethod def from_pandas(cls, df, preserve_index=None): """ From 8967e41ecc1d396cdd437499eb1110670156b83a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 03:46:06 +0200 Subject: [PATCH 05/27] remove remnanst of factory property --- python/pyarrow/_dataset.pyx | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 9bcad5b7361..ff7f4885d12 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -54,18 +54,12 @@ cdef class Dataset: shared_ptr[CDataset] wrapped CDataset* dataset - cdef readonly: - # if a factory object has instantiated the dataset construction then - # hold a reference for the factory to reuse it later - DatasetFactory factory - def __init__(self): _forbid_instantiation(self.__class__) cdef void init(self, const shared_ptr[CDataset]& sp): self.wrapped = sp self.dataset = sp.get() - self.factory = None @staticmethod cdef wrap(const shared_ptr[CDataset]& sp): @@ -1069,12 +1063,7 @@ cdef class DatasetFactory: with nogil: result = self.factory.Finish() - # instantiate the dataset object and store a reference for the - # factory in order to reuse the same factory object later on - dataset = Dataset.wrap(GetResultValue(result)) - dataset.factory = self - - return dataset + return Dataset.wrap(GetResultValue(result)) cdef class FileSystemFactoryOptions: From 20a091a462a1a211c3a4f89caf28bdb2f1f275d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 14:48:09 +0200 Subject: [PATCH 06/27] address review comments --- python/pyarrow/__init__.py | 1 + python/pyarrow/dataset.py | 29 ++++++----- python/pyarrow/tests/test_dataset.py | 8 ++- python/pyarrow/tests/test_schema.py | 4 +- python/pyarrow/types.pxi | 77 +++++++++++++--------------- 5 files changed, 59 insertions(+), 60 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index a2d4a2c4ce1..1cf8cf63885 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -81,6 +81,7 @@ def parse_git(root, **kwargs): Field, Schema, schema, + unify_schemas, Array, Tensor, array, chunked_array, record_batch, table, SparseCOOTensor, SparseCSRMatrix, SparseCSCMatrix, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index f5518af05d1..38d2b426245 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -307,7 +307,7 @@ def _ensure_multiple_sources(paths, filesystem=None): 'or Windows NUL / CON / ...'.format(info.path) ) - return (filesystem, paths) + return filesystem, paths def _ensure_single_source(path, filesystem=None): @@ -391,7 +391,7 @@ def _ensure_single_source(path, filesystem=None): else: raise FileNotFoundError(path) - return (filesystem, paths_or_selector) + return filesystem, paths_or_selector def _filesystem_dataset(source, schema=None, filesystem=None, @@ -429,12 +429,13 @@ def _filesystem_dataset(source, schema=None, filesystem=None, def _union_dataset(source, schema=None, **kwargs): if any(v is not None for v in kwargs.values()): raise ValueError( - "When other datasets you cannot pass any additional arguments" + "When passing a list of Datasets, you cannot pass any additional " + "arguments" ) if schema is None: # unify the children datasets' schemas - schema = pa.Schema.merge([ds.schema for ds in source]) + schema = pa.unify_schemas([ds.schema for ds in source]) # create datasets with the requested schema children = [ds.replace_schema(schema) for ds in source] @@ -442,9 +443,9 @@ def _union_dataset(source, schema=None, **kwargs): return UnionDataset(schema, children) -def dataset(source, schema=None, filesystem=None, partitioning=None, - format=None, partition_base_dir=None, exclude_invalid_files=None, - ignore_prefixes=None): +def dataset(source, schema=None, format=None, filesystem=None, + partitioning=None, partition_base_dir=None, + exclude_invalid_files=None, ignore_prefixes=None): """ Open a dataset. @@ -458,15 +459,16 @@ def dataset(source, schema=None, filesystem=None, partitioning=None, schema : Schema, optional Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. + format : FileFormat or str + Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For + Feather, only version 2 files are supported. filesystem : FileSystem, default None By default will be inferred from the path. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of field names a DirectionaryPartitioning will be inferred. - format : FileFormat or str - Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For - Feather, only version 2 files are supported. + partition_base_dir : str, optional For the purposes of applying the partitioning, paths will be stripped of the partition_base_dir. Files not matching the @@ -504,6 +506,7 @@ def dataset(source, schema=None, filesystem=None, partitioning=None, """ # collect the keyword arguments for later reuse kwargs = dict( + schema=schema, filesystem=filesystem, partitioning=partitioning, format=format, @@ -514,12 +517,12 @@ def dataset(source, schema=None, filesystem=None, partitioning=None, # TODO(kszucs): support InMemoryDataset for a table input if _is_path_like(source): - return _filesystem_dataset(source, schema=schema, **kwargs) + return _filesystem_dataset(source, **kwargs) elif isinstance(source, (tuple, list)): if all(_is_path_like(elem) for elem in source): - return _filesystem_dataset(source, schema=schema, **kwargs) + return _filesystem_dataset(source, **kwargs) elif all(isinstance(elem, Dataset) for elem in source): - return _union_dataset(source, schema=schema, **kwargs) + return _union_dataset(source, **kwargs) else: raise TypeError('vvvvvvvvvvvvvvv') else: diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index cc6fcfa0d05..e7d7b580015 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -924,8 +924,6 @@ def test_open_dataset_list_of_files(tempdir): tables, (path1, path2) = _create_directory_of_files(tempdir) table = pa.concat_tables(tables) - # list of exact files needs to be passed to source() function - # (dataset() will interpret it as separate sources) datasets = [ ds.dataset([path1, path2]), ds.dataset([str(path1), str(path2)]) @@ -936,7 +934,7 @@ def test_open_dataset_list_of_files(tempdir): assert result.equals(table) -def test_costruct_from_single_file(tempdir): +def test_construct_from_single_file(tempdir): directory = tempdir / 'single-file' directory.mkdir() table, path = _create_single_file(directory) @@ -951,7 +949,7 @@ def test_costruct_from_single_file(tempdir): assert d1.to_table() == d2.to_table() == d3.to_table() -def test_costruct_from_single_directory(tempdir): +def test_construct_from_single_directory(tempdir): directory = tempdir / 'single-directory' directory.mkdir() tables, paths = _create_directory_of_files(directory) @@ -964,7 +962,7 @@ def test_costruct_from_single_directory(tempdir): assert d1.to_table() == d2.to_table() == d3.to_table() -def test_costruct_from_list_of_files(tempdir): +def test_construct_from_list_of_files(tempdir): # instantiate from a list of files directory = tempdir / 'list-of-files' directory.mkdir() diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py index cbaef51bba0..d30a7bce7c3 100644 --- a/python/pyarrow/tests/test_schema.py +++ b/python/pyarrow/tests/test_schema.py @@ -674,7 +674,7 @@ def test_schema_merge(): pa.field('qux', pa.bool_()) ]) - result = pa.Schema.merge([a, b, c]) + result = pa.unify_schemas([a, b, c]) expected = pa.schema([ pa.field('foo', pa.int32()), pa.field('bar', pa.string()), @@ -685,4 +685,4 @@ def test_schema_merge(): assert result.equals(expected) with pytest.raises(pa.ArrowInvalid): - pa.Schema.merge([b, d]) + pa.unify_schemas([b, d]) diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 837c41f9c69..0d1db99824d 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1278,46 +1278,6 @@ cdef class Schema: return self.sp_schema.get().Equals(deref(other.schema), check_metadata) - @classmethod - def merge(cls, list schemas): - """ - Unifies schemas by merging fields by name. - - The resulting schema will contain the union of fields from all schemas. - Fields with the same name will be merged. - - The unified field will inherit the metadata from the schema where - that field is first defined. - - The first N fields in the schema will be ordered the same as the - N fields in the first schema. - - The resulting schema will inherit its metadata from the first input - schema. - - Returns an error if: - - - Parameters - ---------- - schemas : list of Schema - Schemas to merge into a single one. - - Returns - ------- - Schema - - Raises - ------ - ArrowInvalid : - If any input schema contains fields with duplicate names. - If Fields of the same name are not mergeable. - """ - cdef: - Schema schema - vector[shared_ptr[CSchema]] c_schemas - for schema in schemas: - c_schemas.push_back(pyarrow_unwrap_schema(schema)) - return pyarrow_wrap_schema(GetResultValue(UnifySchemas(c_schemas))) - @classmethod def from_pandas(cls, df, preserve_index=None): """ @@ -1655,6 +1615,43 @@ cdef class Schema: return self.__str__() +def unify_schemas(list schemas): + """ + Unifies schemas by merging fields by name. + + The resulting schema will contain the union of fields from all schemas. + Fields with the same name will be merged. + - The unified field will inherit the metadata from the schema where + that field is first defined. + - The first N fields in the schema will be ordered the same as the + N fields in the first schema. + + The resulting schema will inherit its metadata from the first input + schema. + + Parameters + ---------- + schemas : list of Schema + Schemas to merge into a single one. + + Returns + ------- + Schema + + Raises + ------ + ArrowInvalid : + If any input schema contains fields with duplicate names. + If Fields of the same name are not mergeable. + """ + cdef: + Schema schema + vector[shared_ptr[CSchema]] c_schemas + for schema in schemas: + c_schemas.push_back(pyarrow_unwrap_schema(schema)) + return pyarrow_wrap_schema(GetResultValue(UnifySchemas(c_schemas))) + + cdef dict _type_cache = {} From 858cf759c7bd12e7f1be7189c55cb9194b18e177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 16:30:01 +0200 Subject: [PATCH 07/27] normalize --- python/pyarrow/dataset.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 38d2b426245..a3b93ee9baf 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -378,6 +378,9 @@ def _ensure_single_source(path, filesystem=None): 'file system URI' ) + # ensure that the path is normalized before passing to dataset discovery + path = _normalize_path(filesystem, path) + # retrieve the file descriptor if it is available already if file_info is None: file_info = filesystem.get_file_info([path])[0] From b10cc5ff2a87f296355a2e1879f5cb7400113f2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 18:16:29 +0200 Subject: [PATCH 08/27] more tests --- python/pyarrow/_dataset.pyx | 4 +- python/pyarrow/dataset.py | 14 ++++- python/pyarrow/tests/test_dataset.py | 88 ++++++++++++++++++++++++++-- 3 files changed, 97 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ff7f4885d12..08321afdb9c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1052,9 +1052,9 @@ cdef class DatasetFactory: Dataset """ cdef: - Dataset dataset shared_ptr[CSchema] sp_schema CResult[shared_ptr[CDataset]] result + if schema is not None: sp_schema = pyarrow_unwrap_schema(schema) with nogil: @@ -1263,7 +1263,7 @@ cdef class UnionDatasetFactory(DatasetFactory): c_factories.push_back(factory.unwrap()) self.init(GetResultValue(CUnionDatasetFactory.Make(c_factories))) - cdef init(self, shared_ptr[CDatasetFactory]& sp): + cdef init(self, const shared_ptr[CDatasetFactory]& sp): DatasetFactory.init(self, sp) self.union_factory = sp.get() diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index a3b93ee9baf..65f235c5c09 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -69,7 +69,7 @@ def field(name): """References a named column of the dataset. Stores only the field's name. Type and other information is known only when - the expression is applied on a dataset having an explicit scheme. + the expression is bound to a dataset having an explicit scheme. Parameters ---------- @@ -527,6 +527,14 @@ def dataset(source, schema=None, format=None, filesystem=None, elif all(isinstance(elem, Dataset) for elem in source): return _union_dataset(source, **kwargs) else: - raise TypeError('vvvvvvvvvvvvvvv') + unique_types = set(type(elem).__name__ for elem in source) + type_names = ', '.join('{}'.format(t) for t in unique_types) + raise TypeError( + 'Expected a list of path-like or dataset objects. The given ' + 'list contains the following types: {}'.format(type_names) + ) else: - raise TypeError('yyyyyyyyyyyy') + raise TypeError( + 'Expected a path-like, list of path-likes or a list of Datasets ' + 'instead of the given type: {}'.format(type(source).__name__) + ) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e7d7b580015..d966665ef1f 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1011,9 +1011,49 @@ def test_construct_from_mixed_child_datasets(mockfs): assert table.num_columns == 4 -def test_construct_from_datasets_with_different_schemas(): - # TODO(kszucs) - pass +def test_construct_empty_dataset(): + empty = ds.dataset([]) + table = empty.to_table() + assert table.num_rows == 0 + assert table.num_columns == 0 + + empty = ds.dataset([], schema=pa.schema([ + ('a', pa.int64()), + ('a', pa.string()) + ])) + table = empty.to_table() + assert table.num_rows == 0 + assert table.num_columns == 2 + + +def test_construct_from_invalid_sources_raise(multisourcefs): + child1 = ds.FileSystemDatasetFactory( + multisourcefs, + fs.FileSelector('/plain'), + format=ds.ParquetFileFormat() + ) + child2 = ds.FileSystemDatasetFactory( + multisourcefs, + fs.FileSelector('/schema'), + format=ds.ParquetFileFormat() + ) + + with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'): + ds.dataset([child1, child2]) + + expected = ( + "Expected a list of path-like or dataset objects. The given list " + "contains the following types: int" + ) + with pytest.raises(TypeError, match=expected): + ds.dataset([1, 2, 3]) + + expected = ( + "Expected a path-like, list of path-likes or a list of Datasets " + "instead of the given type: NoneType" + ) + with pytest.raises(TypeError, match=expected): + ds.dataset(None) @pytest.mark.parquet @@ -1173,13 +1213,15 @@ def test_dataset_union(multisourcefs): assert isinstance(factory.finish(), ds.Dataset) -def test_multiple_factories(multisourcefs): +def test_union_dataset_from_other_datasets(tempdir, multisourcefs): child1 = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') child2 = ds.dataset('/schema', filesystem=multisourcefs, format='parquet', partitioning=['week', 'color']) child3 = ds.dataset('/hive', filesystem=multisourcefs, format='parquet', partitioning='hive') + assert child1.schema != child2.schema != child3.schema + assembled = ds.dataset([child1, child2, child3]) assert isinstance(assembled, ds.UnionDataset) @@ -1197,6 +1239,44 @@ def test_multiple_factories(multisourcefs): ('month', pa.int32()), ]) assert assembled.schema.equals(expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + + assembled = ds.dataset([child1, child3]) + expected_schema = pa.schema([ + ('date', pa.date32()), + ('index', pa.int64()), + ('value', pa.float64()), + ('color', pa.string()), + ('year', pa.int32()), + ('month', pa.int32()), + ]) + assert assembled.schema.equals(expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + + expected_schema = pa.schema([ + ('month', pa.int32()), + ('color', pa.string()), + ('date', pa.date32()), + ]) + assembled = ds.dataset([child1, child3], schema=expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + + expected_schema = pa.schema([ + ('month', pa.int32()), + ('color', pa.string()), + ('unkown', pa.string()) # fill with nulls + ]) + assembled = ds.dataset([child1, child3], schema=expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + + # incompatible schemas, date and index columns have conflicting types + table = pa.table([range(9), [0.] * 4 + [1.] * 5, 'abcdefghj'], + names=['date', 'value', 'index']) + _, path = _create_single_file(tempdir, table=table) + child4 = ds.dataset(path) + + with pytest.raises(pa.ArrowInvalid, match='Unable to merge'): + ds.dataset([child1, child4]) def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): From 9523ff8dbfc13f3e7a2df06b3fec54e74460733c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 18:21:06 +0200 Subject: [PATCH 09/27] address review comments --- python/pyarrow/dataset.py | 6 +++--- python/pyarrow/types.pxi | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 65f235c5c09..35adaae944f 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -429,7 +429,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, return factory.finish(schema) -def _union_dataset(source, schema=None, **kwargs): +def _union_dataset(children, schema=None, **kwargs): if any(v is not None for v in kwargs.values()): raise ValueError( "When passing a list of Datasets, you cannot pass any additional " @@ -438,10 +438,10 @@ def _union_dataset(source, schema=None, **kwargs): if schema is None: # unify the children datasets' schemas - schema = pa.unify_schemas([ds.schema for ds in source]) + schema = pa.unify_schemas([child.schema for child in children]) # create datasets with the requested schema - children = [ds.replace_schema(schema) for ds in source] + children = [child.replace_schema(schema) for child in children] return UnionDataset(schema, children) diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 0d1db99824d..c0bd4f6a78b 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1621,6 +1621,7 @@ def unify_schemas(list schemas): The resulting schema will contain the union of fields from all schemas. Fields with the same name will be merged. + - The unified field will inherit the metadata from the schema where that field is first defined. - The first N fields in the schema will be ordered the same as the From b8e6c2d3472adaebe88a4e8442f0e65a32544302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 18:51:36 +0200 Subject: [PATCH 10/27] more examples --- python/pyarrow/dataset.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 35adaae944f..2f70d6a87c8 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -495,17 +495,47 @@ def dataset(source, schema=None, format=None, filesystem=None, Examples -------- + Opening a single file: + + >>> dataset("path/to/file.parquet", format="parquet") + + Opening a single file with an explicit schema: + + >>> dataset("path/to/file.parquet", schema=myschema, format="parquet") + Opening a dataset for a single directory: >>> dataset("path/to/nyc-taxi/", format="parquet") + >>> dataset("s3://mybucket/nyc-taxi/", format="parquet") + + Opening a dataset from an explicit list of files: + + >>> dataset([ + ... "part0/data.parquet", + ... "part1/data.parquet", + ... "part3/data.parquet", + ... ], format='parquet') + + With filesystem provided: + + >>> paths = [ + ... 'part0/data.parquet', + ... 'part1/data.parquet', + ... 'part3/data.parquet', + ... ] + >>> dataset(paths, filesystem='file:///directory/prefix, format='parquet') - Construction from multiple factories: + Which is equivalent with: + + >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem()) + >>> dataset(paths, filesystem=fs, format='parquet') + + Construction of a nested dataset: >>> dataset([ ... dataset("s3://old-taxi-data", format="parquet"), - ... dataset("local/path/to/new/data", format="ipc") + ... dataset("local/path/to/data", format="ipc") ... ]) - """ # collect the keyword arguments for later reuse kwargs = dict( From 0ee0a1f0f9d686eddaa2c6393866d6c2d6a8507c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 19:15:25 +0200 Subject: [PATCH 11/27] docstring --- python/pyarrow/dataset.py | 40 +++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 2f70d6a87c8..0e5478c84a3 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -452,13 +452,37 @@ def dataset(source, schema=None, format=None, filesystem=None, """ Open a dataset. + Dataset provides functionality to efficiently work with tabular, + potentially larger than memory and multi-file dataset. + + - A unified interface for different sources, like Parquet and Feather + - Discovery of sources (crawling directories, handle directory-based + partitioned datasets, basic schema normalization) + - Optimized reading with pedicate pushdown (filtering rows), projection + (selecting columns), parallel reading or fine-grained managing of tasks. + + Note that this is the high-level API, to have more control over the dataset + construction use the low-level API classes (FileSystemDataset, + FilesystemDatasetFactory, etc.) + Parameters ---------- source : path, list of paths, dataset or list of datasets - Path to a file or to a directory containing the data files, or a list - of paths for a multi-directory dataset. To have more control, a list of - factories can be passed, created with the ``factory()`` function (in - this case, the additional keywords will be ignored). + Path pointing to a single file: + Open a FileSystemDataset from a single file. + Path pointing to a directory: + The directory gets discovered recursively according to a + partitioning scheme if given. + List of file paths: + Create a FileSystemDataset from explicitly given files. The files + must be located on the same filesystem given by the filesystem + parameter. + Note that in contrary of construction from a single file, passing + URIs as paths is not allowed. + List of datasets: + A nested UnionDataset gets constructed, it allows arbitrary + composition of other datasets. + Note that additional keyword arguments are not allowed. schema : Schema, optional Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. @@ -466,7 +490,9 @@ def dataset(source, schema=None, format=None, filesystem=None, Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For Feather, only version 2 files are supported. filesystem : FileSystem, default None - By default will be inferred from the path. + If a single path is given as source, it will be inferred from the path. + If an URI is passed, then its path component will act as a prefix for + the paths. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of @@ -491,7 +517,9 @@ def dataset(source, schema=None, format=None, filesystem=None, Returns ------- - Dataset + dataset : Dataset + Either a FileSystemDataset or a UnionDataset depending on the source + parameter. Examples -------- From 8586b449910295646d29b70ace39b55b4d6baf14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 19:35:51 +0200 Subject: [PATCH 12/27] control exposed symbols --- python/pyarrow/dataset.py | 62 +++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 0e5478c84a3..6d4475d0240 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -17,20 +17,46 @@ """Dataset is currently unstable. APIs subject to change without notice.""" -import pyarrow as pa -from pyarrow.util import _stringify_path, _is_path_like - -from pyarrow.fs import ( - _normalize_path, - FileSelector, - FileSystem, - FileType, - LocalFileSystem, - SubTreeFileSystem, - _MockFileSystem -) +__all__ = [ + 'AndExpression', + 'CastExpression', + 'CompareOperator', + 'ComparisonExpression', + 'dataset', + 'Dataset', + 'DatasetFactory', + 'DirectoryPartitioning', + 'Expression', + 'field', + 'FieldExpression', + 'FileFormat', + 'FileFragment', + 'FileSystemDataset', + 'FileSystemDatasetFactory', + 'FileSystemFactoryOptions', + 'Fragment', + 'HivePartitioning', + 'InExpression', + 'IpcFileFormat', + 'IsValidExpression', + 'NotExpression', + 'OrExpression', + 'ParquetFileFormat', + 'ParquetFileFragment', + 'ParquetReadOptions', + 'partitioning', + 'Partitioning', + 'PartitioningFactory', + 'scalar', + 'ScalarExpression', + 'Scanner', + 'ScanTask', + 'UnionDataset', + 'UnionDatasetFactory' +] -from pyarrow._dataset import ( # noqa +import pyarrow as pa +from pyarrow._dataset import ( AndExpression, CastExpression, CompareOperator, @@ -63,6 +89,16 @@ UnionDataset, UnionDatasetFactory ) +from pyarrow.fs import ( + _normalize_path, + FileSelector, + FileSystem, + FileType, + LocalFileSystem, + SubTreeFileSystem, + _MockFileSystem +) +from pyarrow.util import _stringify_path, _is_path_like def field(name): From 88e79d33e070c7e7280613f80ae3a956929ad555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 19:53:38 +0200 Subject: [PATCH 13/27] more const --- python/pyarrow/_dataset.pyx | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 08321afdb9c..ba13770ab2a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -985,12 +985,12 @@ cdef class DatasetFactory: def __init__(self, list children): _forbid_instantiation(self.__class__) - cdef init(self, shared_ptr[CDatasetFactory]& sp): + cdef init(self, const shared_ptr[CDatasetFactory]& sp): self.wrapped = sp self.factory = sp.get() @staticmethod - cdef wrap(shared_ptr[CDatasetFactory]& sp): + cdef wrap(const shared_ptr[CDatasetFactory]& sp): cdef DatasetFactory self = \ DatasetFactory.__new__(DatasetFactory) self.init(sp) @@ -1030,8 +1030,9 @@ cdef class DatasetFactory: ------- Schema """ - cdef CResult[shared_ptr[CSchema]] result - cdef CInspectOptions options + cdef: + CInspectOptions options + CResult[shared_ptr[CSchema]] result with nogil: result = self.factory.Inspect(options) return pyarrow_wrap_schema(GetResultValue(result)) From f5b4682c79275cc95c0a3501aa5d85f90f4c3ba8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Fri, 10 Apr 2020 20:57:51 +0200 Subject: [PATCH 14/27] import from functions --- python/pyarrow/dataset.py | 61 ++++++++------------------------------- 1 file changed, 12 insertions(+), 49 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 6d4475d0240..cff66f14f8e 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -17,46 +17,11 @@ """Dataset is currently unstable. APIs subject to change without notice.""" -__all__ = [ - 'AndExpression', - 'CastExpression', - 'CompareOperator', - 'ComparisonExpression', - 'dataset', - 'Dataset', - 'DatasetFactory', - 'DirectoryPartitioning', - 'Expression', - 'field', - 'FieldExpression', - 'FileFormat', - 'FileFragment', - 'FileSystemDataset', - 'FileSystemDatasetFactory', - 'FileSystemFactoryOptions', - 'Fragment', - 'HivePartitioning', - 'InExpression', - 'IpcFileFormat', - 'IsValidExpression', - 'NotExpression', - 'OrExpression', - 'ParquetFileFormat', - 'ParquetFileFragment', - 'ParquetReadOptions', - 'partitioning', - 'Partitioning', - 'PartitioningFactory', - 'scalar', - 'ScalarExpression', - 'Scanner', - 'ScanTask', - 'UnionDataset', - 'UnionDatasetFactory' -] - import pyarrow as pa -from pyarrow._dataset import ( +from pyarrow.fs import _normalize_path, _MockFileSystem +from pyarrow.util import _stringify_path, _is_path_like + +from pyarrow._dataset import ( # noqa AndExpression, CastExpression, CompareOperator, @@ -89,16 +54,6 @@ UnionDataset, UnionDatasetFactory ) -from pyarrow.fs import ( - _normalize_path, - FileSelector, - FileSystem, - FileType, - LocalFileSystem, - SubTreeFileSystem, - _MockFileSystem -) -from pyarrow.util import _stringify_path, _is_path_like def field(name): @@ -295,6 +250,10 @@ def _ensure_multiple_sources(paths, filesystem=None): If the file system is local and a path references a directory or its type cannot be determined. """ + from pyarrow.fs import ( + FileSystem, LocalFileSystem, SubTreeFileSystem, FileType + ) + if filesystem is None: # fall back to local file system as the default filesystem = LocalFileSystem() @@ -370,6 +329,10 @@ def _ensure_single_source(path, filesystem=None): FileNotFoundError If the referenced file or directory doesn't exist. """ + from pyarrow.fs import ( + FileSystem, LocalFileSystem, SubTreeFileSystem, FileType, FileSelector + ) + path = _stringify_path(path) # if filesystem is not given try to automatically determine one From 5d06afed79b12d47a0507585e3e5058a985722f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Sat, 11 Apr 2020 14:45:11 +0200 Subject: [PATCH 15/27] always retrieve file info --- python/pyarrow/dataset.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index cff66f14f8e..9706793088a 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -380,9 +380,8 @@ def _ensure_single_source(path, filesystem=None): # ensure that the path is normalized before passing to dataset discovery path = _normalize_path(filesystem, path) - # retrieve the file descriptor if it is available already - if file_info is None: - file_info = filesystem.get_file_info([path])[0] + # retrieve the file descriptor + file_info = filesystem.get_file_info([path])[0] # depending on the path type either return with a recursive # directory selector or as a list containing a single file From d5c70469f8d2ffb789ea0215de4a8c960bd33cd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Sun, 12 Apr 2020 11:55:33 +0200 Subject: [PATCH 16/27] make pytest verbose on appveyor --- ci/cpp-msvc-build-main.bat | 2 +- python/pyarrow/dataset.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat index 735073c49cc..d1cb342d62e 100644 --- a/ci/cpp-msvc-build-main.bat +++ b/ci/cpp-msvc-build-main.bat @@ -132,7 +132,7 @@ python setup.py develop -q || exit /B set PYTHONDEVMODE=1 -py.test -r sxX --durations=15 --pyargs pyarrow.tests || exit /B +py.test -v -r sxX --durations=15 --pyargs pyarrow.tests || exit /B @rem @rem Wheels are built and tested separately (see ARROW-5142). diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 9706793088a..4c8495b217a 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -381,7 +381,8 @@ def _ensure_single_source(path, filesystem=None): path = _normalize_path(filesystem, path) # retrieve the file descriptor - file_info = filesystem.get_file_info([path])[0] + if file_info is None: + file_info = filesystem.get_file_info([path])[0] # depending on the path type either return with a recursive # directory selector or as a list containing a single file From ff69b9da8d0fa258c6d59bfe8a87b008e9f78dcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 00:11:37 +0200 Subject: [PATCH 17/27] more tests; make subtreefs path concatenation more robust --- cpp/src/arrow/filesystem/path_util.cc | 5 +- cpp/src/arrow/filesystem/s3fs.cc | 11 +++- python/pyarrow/_fs.pyx | 9 +++ python/pyarrow/dataset.py | 81 +++++++++++++++------------ python/pyarrow/tests/test_dataset.py | 78 +++++++++++++++++++++++++- python/pyarrow/tests/test_fs.py | 12 ++++ 6 files changed, 153 insertions(+), 43 deletions(-) diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 2dec6399a07..652ca80ffa6 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -99,11 +99,8 @@ std::string ConcatAbstractPath(const std::string& base, const std::string& stem) DCHECK(!stem.empty()); if (base.empty()) { return stem; - } else if (base.back() == kSep) { - return base + stem; - } else { - return base + kSep + stem; } + return EnsureTrailingSlash(base) + RemoveLeadingSlash(stem).to_string(); } std::string EnsureTrailingSlash(util::string_view v) { diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3870c39725f..14559cbe61c 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -291,7 +291,16 @@ struct S3Path { out->bucket = std::string(src.substr(0, first_sep)); out->key = std::string(src.substr(first_sep + 1)); out->key_parts = internal::SplitAbstractPath(out->key); - return internal::ValidateAbstractPathParts(out->key_parts); + return Validate(out); + } + + static Status Validate(S3Path* path) { + auto result = internal::ValidateAbstractPathParts(path->key_parts); + if (!result.ok()) { + return Status::Invalid(result.message(), " in path ", path->full_path); + } else { + return result; + } } Aws::String ToURLEncodedAwsString() const { diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx index 42d1c4c6193..fc8f937fb93 100644 --- a/python/pyarrow/_fs.pyx +++ b/python/pyarrow/_fs.pyx @@ -657,6 +657,15 @@ cdef class SubTreeFileSystem(FileSystem): FileSystem.wrap(self.subtreefs.base_fs()) ) + @property + def base_path(self): + return frombytes(self.subtreefs.base_path()) + + @property + def base_fs(self): + return FileSystem.wrap(self.subtreefs.base_fs()) + + cdef class _MockFileSystem(FileSystem): def __init__(self, datetime current_time=None): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 4c8495b217a..f97e067b7f5 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -219,6 +219,41 @@ def _ensure_format(obj): raise ValueError("format '{}' is not supported".format(obj)) +def _ensure_filesystem(fs_or_uri): + from pyarrow.fs import ( + FileSystem, LocalFileSystem, SubTreeFileSystem, FileType + ) + + if isinstance(fs_or_uri, str): + # instantiate the file system from an uri, if the uri has a path + # component then it will be treated as a path prefix + filesystem, prefix = FileSystem.from_uri(fs_or_uri) + is_local = isinstance(filesystem, LocalFileSystem) + prefix = _normalize_path(filesystem, prefix) + if prefix: + # validate that the prefix is pointing to a directory + prefix_info = filesystem.get_file_info([prefix])[0] + if prefix_info.type != FileType.Directory: + raise ValueError( + "The path component of the filesystem URI must point to a " + "directory but it has a type: `{}`. The path component " + "is `{}` and the given filesystem URI is `{}`".format( + prefix_info.type.name, prefix_info.path, fs_or_uri + ) + ) + filesystem = SubTreeFileSystem(prefix, filesystem) + return filesystem, is_local + elif isinstance(fs_or_uri, (LocalFileSystem, _MockFileSystem)): + return fs_or_uri, True + elif isinstance(fs_or_uri, FileSystem): + return fs_or_uri, False + else: + raise TypeError( + '`filesystem` argument must be a FileSystem instance or a valid ' + 'file system URI' + ) + + def _ensure_multiple_sources(paths, filesystem=None): """ Treat a list of paths as files belonging to a single file system @@ -250,31 +285,14 @@ def _ensure_multiple_sources(paths, filesystem=None): If the file system is local and a path references a directory or its type cannot be determined. """ - from pyarrow.fs import ( - FileSystem, LocalFileSystem, SubTreeFileSystem, FileType - ) + from pyarrow.fs import LocalFileSystem, FileType if filesystem is None: # fall back to local file system as the default filesystem = LocalFileSystem() - paths_are_local = True - elif isinstance(filesystem, str): - # instantiate the file system from an uri, if the uri has a path - # component then it will be treated as a path prefix - filesystem, prefix = FileSystem.from_uri(filesystem) - paths_are_local = isinstance(filesystem, LocalFileSystem) - prefix = _normalize_path(filesystem, prefix) - if prefix: - filesystem = SubTreeFileSystem(prefix, filesystem) - elif isinstance(filesystem, (LocalFileSystem, _MockFileSystem)): - paths_are_local = True - elif not isinstance(filesystem, FileSystem): - raise TypeError( - '`filesystem` argument must be a FileSystem instance or a valid ' - 'file system URI' - ) - else: - paths_are_local = False + + # construct a filesystem if it is a valid URI + filesystem, is_local = _ensure_filesystem(filesystem) # allow normalizing irregular paths such as Windows local paths paths = [_normalize_path(filesystem, _stringify_path(p)) for p in paths] @@ -282,7 +300,7 @@ def _ensure_multiple_sources(paths, filesystem=None): # validate that all of the paths are pointing to existing *files* # possible improvement is to group the file_infos by type and raise for # multiple paths per error category - if paths_are_local: + if is_local: for info in filesystem.get_file_info(paths): file_type = info.type if file_type == FileType.File: @@ -329,9 +347,7 @@ def _ensure_single_source(path, filesystem=None): FileNotFoundError If the referenced file or directory doesn't exist. """ - from pyarrow.fs import ( - FileSystem, LocalFileSystem, SubTreeFileSystem, FileType, FileSelector - ) + from pyarrow.fs import FileSystem, LocalFileSystem, FileType, FileSelector path = _stringify_path(path) @@ -364,18 +380,9 @@ def _ensure_single_source(path, filesystem=None): else: # unset file_info to query it again from the new filesystem file_info = None - elif isinstance(filesystem, str): - # instantiate the file system from an uri, if the uri has a path - # component then it will be treated as a path prefix - filesystem, prefix = FileSystem.from_uri(filesystem) - prefix = _normalize_path(filesystem, prefix) - if prefix: - filesystem = SubTreeFileSystem(prefix, filesystem) - elif not isinstance(filesystem, FileSystem): - raise TypeError( - '`filesystem` argument must be a FileSystem instance or a valid ' - 'file system URI' - ) + + # construct a filesystem if it is a valid URI + filesystem, _ = _ensure_filesystem(filesystem) # ensure that the path is normalized before passing to dataset discovery path = _normalize_path(filesystem, path) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d966665ef1f..27ee8f5299d 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -978,7 +978,7 @@ def test_construct_from_list_of_files(tempdir): t2 = d2.to_table() d3 = ds.dataset(paths) t3 = d3.to_table() - d4 = ds.dataset(paths, filesystem='file://') + d4 = ds.dataset(paths, filesystem='file:/') t4 = d4.to_table() d5 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) t5 = d5.to_table() @@ -1155,6 +1155,9 @@ def test_open_dataset_non_existing_file(): with pytest.raises(FileNotFoundError): ds.dataset('i-am-not-existing.parquet', format='parquet') + with pytest.raises(pa.ArrowInvalid, match='cannot be relative'): + ds.dataset('file:i-am-not-existing.parquet', format='parquet') + @pytest.mark.parquet @pytest.mark.s3 @@ -1185,6 +1188,79 @@ def test_open_dataset_from_uri_s3(s3_connection, s3_server): assert dataset.to_table().equals(table) +@pytest.mark.parquet +@pytest.mark.s3 +def test_open_dataset_pina(s3_connection, s3_server): + from pyarrow.fs import FileSystem + import pyarrow.parquet as pq + + host, port, access_key, secret_key = s3_connection + bucket = 'theirbucket' + path = 'nested/folder/data.parquet' + uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format( + access_key, secret_key, bucket, path, host, port + ) + + fs, path = FileSystem.from_uri(uri) + assert path == 'theirbucket/nested/folder/data.parquet' + + fs.create_dir(bucket) + table = pa.table({'a': [1, 2, 3]}) + with fs.open_output_stream(path) as out: + pq.write_table(table, out) + + # full string URI + dataset = ds.dataset(uri, format="parquet") + assert dataset.to_table().equals(table) + + # passing filesystem as an uri + template = ( + "s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format( + access_key, secret_key, host, port + ) + ) + cases = [ + ('theirbucket/nested/folder/', '/data.parquet'), + ('theirbucket/nested/folder/', 'data.parquet'), + ('theirbucket/nested/folder', '/data.parquet'), + ('theirbucket/nested/folder', 'data.parquet'), + ('theirbucket/nested/', '/folder/data.parquet'), + ('theirbucket/nested/', 'folder/data.parquet'), + ('theirbucket/nested', '/folder/data.parquet'), + ('theirbucket/nested', 'folder/data.parquet'), + ('theirbucket/', '/nested/folder/data.parquet'), + ('theirbucket/', 'nested/folder/data.parquet'), + ('theirbucket', '/nested/folder/data.parquet'), + ('theirbucket', 'nested/folder/data.parquet'), + ] + for prefix, path in cases: + uri = template.format(prefix) + dataset = ds.dataset(path, filesystem=uri, format="parquet") + assert dataset.to_table().equals(table) + + with pytest.raises(pa.ArrowInvalid, match='Missing bucket name'): + uri = template.format('/') + ds.dataset('/theirbucket/nested/folder/data.parquet', filesystem=uri) + + error = ( + "The path component of the filesystem URI must point to a directory " + "but it has a type: `{}`. The path component is `{}` and the given " + "filesystem URI is `{}`" + ) + + path = 'theirbucket/doesnt/exist' + uri = template.format(path) + with pytest.raises(ValueError) as exc: + ds.dataset('data.parquet', filesystem=uri) + assert str(exc.value) == error.format('NotFound', path, uri) + + path = 'theirbucket/nested/folder/data.parquet' + uri = template.format(path) + with pytest.raises(ValueError) as exc: + ds.dataset('data.parquet', filesystem=uri) + assert str(exc.value) == error.format('File', path, uri) + + @pytest.mark.parquet def test_filter_implicit_cast(tempdir): # ARROW-7652 diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index cec18b1e45a..8bb2e5c6937 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -211,6 +211,18 @@ def test_filesystem_equals(): assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/other', fs0) +def test_subtree_filesystem(): + localfs = LocalFileSystem() + + subfs = SubTreeFileSystem('/base', localfs) + assert subfs.base_path == '/base/' + assert subfs.base_fs == localfs + + subfs = SubTreeFileSystem('/another/base/', LocalFileSystem()) + assert subfs.base_path == '/another/base/' + assert subfs.base_fs == localfs + + def test_filesystem_pickling(fs): if isinstance(fs, _MockFileSystem): pytest.xfail(reason='MockFileSystem is not serializable') From 5692edc1cafd5a10ea52a0038c4b7d3010385920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 00:43:24 +0200 Subject: [PATCH 18/27] more examples --- ci/cpp-msvc-build-main.bat | 2 +- python/pyarrow/dataset.py | 28 ++++++++++++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat index d1cb342d62e..735073c49cc 100644 --- a/ci/cpp-msvc-build-main.bat +++ b/ci/cpp-msvc-build-main.bat @@ -132,7 +132,7 @@ python setup.py develop -q || exit /B set PYTHONDEVMODE=1 -py.test -v -r sxX --durations=15 --pyargs pyarrow.tests || exit /B +py.test -r sxX --durations=15 --pyargs pyarrow.tests || exit /B @rem @rem Wheels are built and tested separately (see ARROW-5142). diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index f97e067b7f5..cd3a1644158 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -495,15 +495,16 @@ def dataset(source, schema=None, format=None, filesystem=None, format : FileFormat or str Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For Feather, only version 2 files are supported. - filesystem : FileSystem, default None - If a single path is given as source, it will be inferred from the path. - If an URI is passed, then its path component will act as a prefix for - the paths. + filesystem : FileSystem or URI string, default None + If a single path is given as source and filesystem is None, then the + filesystem will be inferred from the path. + If an URI string is passed, then a filesystem object is constructed + using the URI's optional path component as a directory prefix. See the + examples below. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of field names a DirectionaryPartitioning will be inferred. - partition_base_dir : str, optional For the purposes of applying the partitioning, paths will be stripped of the partition_base_dir. Files not matching the @@ -542,7 +543,7 @@ def dataset(source, schema=None, format=None, filesystem=None, >>> dataset("path/to/nyc-taxi/", format="parquet") >>> dataset("s3://mybucket/nyc-taxi/", format="parquet") - Opening a dataset from an explicit list of files: + Opening a dataset from a list of relatives local paths: >>> dataset([ ... "part0/data.parquet", @@ -564,6 +565,21 @@ def dataset(source, schema=None, format=None, filesystem=None, >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem()) >>> dataset(paths, filesystem=fs, format='parquet') + With a remote filesystem URI: + + >>> paths = [ + ... 'nested/directory/part0/data.parquet', + ... 'nested/directory/part1/data.parquet', + ... 'nested/directory/part3/data.parquet', + ... ] + >>> dataset(paths, filesystem='s3://bucket/', format='parquet') + + Similarly to the local example, the directory prefix may be included in the + filesystem URI: + + >>> dataset(paths, filesystem='s3://bucket/nested/directory', + ... format='parquet') + Construction of a nested dataset: >>> dataset([ From acbe2cb54fb6f3387d336b4c6c336db6361bebf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 12:11:03 +0200 Subject: [PATCH 19/27] update selector_ignore_prefixes --- python/pyarrow/dataset.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index cd3a1644158..46378ac0ad7 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -406,7 +406,7 @@ def _ensure_single_source(path, filesystem=None): def _filesystem_dataset(source, schema=None, filesystem=None, partitioning=None, format=None, partition_base_dir=None, exclude_invalid_files=None, - ignore_prefixes=None): + selector_ignore_prefixes=None): """ Create a FileSystemDataset which can be used to build a Dataset. @@ -428,7 +428,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, partitioning=partitioning, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - ignore_prefixes=ignore_prefixes + selector_ignore_prefixes=selector_ignore_prefixes ) factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options) @@ -454,7 +454,7 @@ def _union_dataset(children, schema=None, **kwargs): def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, - exclude_invalid_files=None, ignore_prefixes=None): + exclude_invalid_files=None, selector_ignore_prefixes=None): """ Open a dataset. @@ -517,10 +517,11 @@ def dataset(source, schema=None, format=None, filesystem=None, fashion. Disabling this feature will skip the IO, but unsupported files may be present in the Dataset (resulting in an error at scan time). - ignore_prefixes : list, optional + selector_ignore_prefixes : list, optional Files matching one of those prefixes will be ignored by the discovery process. This is matched to the basename of a path. By default this is ['.', '_']. + Note that discovery happens only if a directory is passed as source. Returns ------- @@ -595,7 +596,7 @@ def dataset(source, schema=None, format=None, filesystem=None, format=format, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - ignore_prefixes=ignore_prefixes + selector_ignore_prefixes=selector_ignore_prefixes ) # TODO(kszucs): support InMemoryDataset for a table input From a002b493d56d192edc016f0fb98557d1d338368f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 12:47:49 +0200 Subject: [PATCH 20/27] fix filesystem uri --- python/pyarrow/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 27ee8f5299d..9b650eeb1cc 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -945,7 +945,7 @@ def test_construct_from_single_file(tempdir): # instantiate from a single file with a filesystem object d2 = ds.dataset(path, filesystem=fs.LocalFileSystem()) # instantiate from a single file with prefixed filesystem URI - d3 = ds.dataset(relative_path, filesystem='file://{}'.format(directory)) + d3 = ds.dataset(relative_path, filesystem='file:{}'.format(directory)) assert d1.to_table() == d2.to_table() == d3.to_table() From 101babefe68fb7103aae8f1500af8bdf0c3e265b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 13:17:10 +0200 Subject: [PATCH 21/27] fix remaining occurences of wrong file uri --- python/pyarrow/tests/test_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 9b650eeb1cc..2061de01b40 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -958,7 +958,7 @@ def test_construct_from_single_directory(tempdir): d1 = ds.dataset(directory) d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) d3 = ds.dataset(directory.name, - filesystem='file://{}'.format(directory.parent)) + filesystem='file:{}'.format(directory.parent)) assert d1.to_table() == d2.to_table() == d3.to_table() @@ -974,7 +974,7 @@ def test_construct_from_list_of_files(tempdir): t1 = d1.to_table() assert len(t1) == sum(map(len, tables)) - d2 = ds.dataset(relative_paths, filesystem='file://{}'.format(tempdir)) + d2 = ds.dataset(relative_paths, filesystem='file:{}'.format(tempdir)) t2 = d2.to_table() d3 = ds.dataset(paths) t3 = d3.to_table() From a94bc10129cbfe8c2be9d488ccd2da2eafff1f57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 14:55:22 +0200 Subject: [PATCH 22/27] provide different uris on windows --- python/pyarrow/dataset.py | 2 ++ python/pyarrow/tests/test_dataset.py | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 46378ac0ad7..450ed498c4c 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -501,6 +501,8 @@ def dataset(source, schema=None, format=None, filesystem=None, If an URI string is passed, then a filesystem object is constructed using the URI's optional path component as a directory prefix. See the examples below. + Note that the URIs on Windows should follow 'file:///C:...' or + 'file:/C:...' patterns. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 2061de01b40..d6b022a08bb 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -79,6 +79,15 @@ def _table_from_pandas(df): return table.replace_schema_metadata() +def _filesystem_uri(path): + # URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns. + if isinstance(path, pathlib.WindowsPath): + uri = 'file:/{}'.format(path) + else: + uri = 'file://{}'.format(path) + return uri + + @pytest.fixture @pytest.mark.parquet def mockfs(): @@ -945,7 +954,7 @@ def test_construct_from_single_file(tempdir): # instantiate from a single file with a filesystem object d2 = ds.dataset(path, filesystem=fs.LocalFileSystem()) # instantiate from a single file with prefixed filesystem URI - d3 = ds.dataset(relative_path, filesystem='file:{}'.format(directory)) + d3 = ds.dataset(relative_path, filesystem=_filesystem_uri(directory)) assert d1.to_table() == d2.to_table() == d3.to_table() @@ -954,11 +963,9 @@ def test_construct_from_single_directory(tempdir): directory.mkdir() tables, paths = _create_directory_of_files(directory) - # instantiate from a single directory d1 = ds.dataset(directory) d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) - d3 = ds.dataset(directory.name, - filesystem='file:{}'.format(directory.parent)) + d3 = ds.dataset(directory.name, filesystem=_filesystem_uri(tempdir)) assert d1.to_table() == d2.to_table() == d3.to_table() @@ -974,7 +981,7 @@ def test_construct_from_list_of_files(tempdir): t1 = d1.to_table() assert len(t1) == sum(map(len, tables)) - d2 = ds.dataset(relative_paths, filesystem='file:{}'.format(tempdir)) + d2 = ds.dataset(relative_paths, filesystem=_filesystem_uri(tempdir)) t2 = d2.to_table() d3 = ds.dataset(paths) t3 = d3.to_table() From 9eb97043d610a868da42be24bac2382c88c2ffd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 15:02:26 +0200 Subject: [PATCH 23/27] address review comments --- python/pyarrow/dataset.py | 22 +++++++++++----------- python/pyarrow/tests/test_dataset.py | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 450ed498c4c..44e92c344d1 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -57,7 +57,7 @@ def field(name): - """References a named column of the dataset. + """Reference a named column of the dataset. Stores only the field's name. Type and other information is known only when the expression is bound to a dataset having an explicit scheme. @@ -308,16 +308,16 @@ def _ensure_multiple_sources(paths, filesystem=None): elif file_type == FileType.NotFound: raise FileNotFoundError(info.path) elif file_type == FileType.Directory: - raise ValueError( + raise IsADirectoryError( 'Path {} points to a directory, but only file paths are ' 'supported. To construct a nested or union dataset pass ' 'a list of dataset objects instead.'.format(info.path) ) else: - raise ValueError( + raise IOError( 'Path {} exists but its type is unknown (could be a ' 'special file such as a Unix socket or character device, ' - 'or Windows NUL / CON / ...'.format(info.path) + 'or Windows NUL / CON / ...)'.format(info.path) ) return filesystem, paths @@ -454,17 +454,17 @@ def _union_dataset(children, schema=None, **kwargs): def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, - exclude_invalid_files=None, selector_ignore_prefixes=None): + exclude_invalid_files=None, ignore_prefixes=None): """ Open a dataset. - Dataset provides functionality to efficiently work with tabular, + Datasets provides functionality to efficiently work with tabular, potentially larger than memory and multi-file dataset. - A unified interface for different sources, like Parquet and Feather - Discovery of sources (crawling directories, handle directory-based partitioned datasets, basic schema normalization) - - Optimized reading with pedicate pushdown (filtering rows), projection + - Optimized reading with predicate pushdown (filtering rows), projection (selecting columns), parallel reading or fine-grained managing of tasks. Note that this is the high-level API, to have more control over the dataset @@ -473,7 +473,7 @@ def dataset(source, schema=None, format=None, filesystem=None, Parameters ---------- - source : path, list of paths, dataset or list of datasets + source : path, list of paths, dataset, list of datasets or URI Path pointing to a single file: Open a FileSystemDataset from a single file. Path pointing to a directory: @@ -501,7 +501,7 @@ def dataset(source, schema=None, format=None, filesystem=None, If an URI string is passed, then a filesystem object is constructed using the URI's optional path component as a directory prefix. See the examples below. - Note that the URIs on Windows should follow 'file:///C:...' or + Note that the URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` @@ -519,7 +519,7 @@ def dataset(source, schema=None, format=None, filesystem=None, fashion. Disabling this feature will skip the IO, but unsupported files may be present in the Dataset (resulting in an error at scan time). - selector_ignore_prefixes : list, optional + ignore_prefixes : list, optional Files matching one of those prefixes will be ignored by the discovery process. This is matched to the basename of a path. By default this is ['.', '_']. @@ -598,7 +598,7 @@ def dataset(source, schema=None, format=None, filesystem=None, format=format, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - selector_ignore_prefixes=selector_ignore_prefixes + selector_ignore_prefixes=ignore_prefixes ) # TODO(kszucs): support InMemoryDataset for a table input diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d6b022a08bb..c176b0a7cc4 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1364,7 +1364,7 @@ def test_union_dataset_from_other_datasets(tempdir, multisourcefs): def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): msg = 'points to a directory, but only file paths are supported' - with pytest.raises(ValueError, match=msg): + with pytest.raises(IsADirectoryError, match=msg): ds.dataset(['/plain', '/schema', '/hive'], filesystem=multisourcefs) From 024a5ccfaef97a1dd686011768bc60cfaa5ce4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 16:10:26 +0200 Subject: [PATCH 24/27] windows paths... --- python/pyarrow/tests/test_dataset.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index c176b0a7cc4..8b4aee269d3 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -82,7 +82,7 @@ def _table_from_pandas(df): def _filesystem_uri(path): # URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns. if isinstance(path, pathlib.WindowsPath): - uri = 'file:/{}'.format(path) + uri = 'file:///{}'.format(path) else: uri = 'file://{}'.format(path) return uri @@ -985,12 +985,10 @@ def test_construct_from_list_of_files(tempdir): t2 = d2.to_table() d3 = ds.dataset(paths) t3 = d3.to_table() - d4 = ds.dataset(paths, filesystem='file:/') + d4 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) t4 = d4.to_table() - d5 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) - t5 = d5.to_table() - assert t1 == t2 == t3 == t4 == t5 + assert t1 == t2 == t3 == t4 def test_construct_from_list_of_mixed_paths_fails(mockfs): From e8c063bd56298aa1ac988b3d396a0bede1885ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 16:55:24 +0200 Subject: [PATCH 25/27] address review comments --- python/pyarrow/dataset.py | 7 ++++--- python/pyarrow/tests/test_dataset.py | 2 +- python/pyarrow/tests/test_parquet.py | 8 +------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 44e92c344d1..94f67ca1571 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -281,9 +281,10 @@ def _ensure_multiple_sources(paths, filesystem=None): If the passed filesystem has wrong type. FileNotFoundError If the file system is local and a referenced path is not available. - ValueError - If the file system is local and a path references a directory or its - type cannot be determined. + IsADirectoryError + If the file system is local and a path references a directory. + IOError: + If the file system is local and a path's type cannot be determined. """ from pyarrow.fs import LocalFileSystem, FileType diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 8b4aee269d3..e6bedcb33c6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1195,7 +1195,7 @@ def test_open_dataset_from_uri_s3(s3_connection, s3_server): @pytest.mark.parquet @pytest.mark.s3 -def test_open_dataset_pina(s3_connection, s3_server): +def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server): from pyarrow.fs import FileSystem import pyarrow.parquet as pq diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 56b5cbadf04..9388a3c5523 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -2488,13 +2488,7 @@ def _assert_dataset_paths(dataset, paths, use_legacy_dataset): assert set(map(str, paths)) == {x.path for x in dataset.pieces} else: paths = [str(path.as_posix()) for path in paths] - if hasattr(dataset._dataset, 'files'): - assert set(paths) == set(dataset._dataset.files) - else: - # UnionDataset - # TODO(temp hack) remove this branch once ARROW-7965 is in (which - # will change this to a FileSystemDataset) - assert dataset.read().num_rows == 50 + assert set(paths) == set(dataset._dataset.files) @pytest.mark.pandas From 6fed0579cfdcc99bddd82cbe9173b635888c53bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 20:05:40 +0200 Subject: [PATCH 26/27] address review comments --- python/pyarrow/dataset.py | 9 +++------ python/pyarrow/tests/test_dataset.py | 9 ++------- python/pyarrow/types.pxi | 5 +++-- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 94f67ca1571..37acf239aec 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -279,12 +279,9 @@ def _ensure_multiple_sources(paths, filesystem=None): ------ TypeError If the passed filesystem has wrong type. - FileNotFoundError - If the file system is local and a referenced path is not available. - IsADirectoryError - If the file system is local and a path references a directory. - IOError: - If the file system is local and a path's type cannot be determined. + IOError + If the file system is local and a referenced path is not available or + not a file. """ from pyarrow.fs import LocalFileSystem, FileType diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e6bedcb33c6..24454dced0d 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -81,7 +81,7 @@ def _table_from_pandas(df): def _filesystem_uri(path): # URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns. - if isinstance(path, pathlib.WindowsPath): + if os.name == 'nt': uri = 'file:///{}'.format(path) else: uri = 'file://{}'.format(path) @@ -1210,6 +1210,7 @@ def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server): assert path == 'theirbucket/nested/folder/data.parquet' fs.create_dir(bucket) + table = pa.table({'a': [1, 2, 3]}) with fs.open_output_stream(path) as out: pq.write_table(table, out) @@ -1226,15 +1227,9 @@ def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server): ) cases = [ ('theirbucket/nested/folder/', '/data.parquet'), - ('theirbucket/nested/folder/', 'data.parquet'), - ('theirbucket/nested/folder', '/data.parquet'), ('theirbucket/nested/folder', 'data.parquet'), - ('theirbucket/nested/', '/folder/data.parquet'), ('theirbucket/nested/', 'folder/data.parquet'), - ('theirbucket/nested', '/folder/data.parquet'), ('theirbucket/nested', 'folder/data.parquet'), - ('theirbucket/', '/nested/folder/data.parquet'), - ('theirbucket/', 'nested/folder/data.parquet'), ('theirbucket', '/nested/folder/data.parquet'), ('theirbucket', 'nested/folder/data.parquet'), ] diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index c0bd4f6a78b..c2835a744a7 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1617,10 +1617,11 @@ cdef class Schema: def unify_schemas(list schemas): """ - Unifies schemas by merging fields by name. + Unify schemas by merging fields by name. The resulting schema will contain the union of fields from all schemas. - Fields with the same name will be merged. + Fields with the same name will be merged. Note that two fields with + different types will fail merging. - The unified field will inherit the metadata from the schema where that field is first defined. From b9d95ed709143872a33d8cdacb703c47aa868b96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 14 Apr 2020 23:59:39 +0200 Subject: [PATCH 27/27] don't use threads --- python/pyarrow/tests/test_dataset.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 24454dced0d..d67a6b54a55 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -966,7 +966,10 @@ def test_construct_from_single_directory(tempdir): d1 = ds.dataset(directory) d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) d3 = ds.dataset(directory.name, filesystem=_filesystem_uri(tempdir)) - assert d1.to_table() == d2.to_table() == d3.to_table() + t1 = d1.to_table(use_threads=False) + t2 = d2.to_table(use_threads=False) + t3 = d3.to_table(use_threads=False) + assert t1 == t2 == t3 def test_construct_from_list_of_files(tempdir): @@ -982,11 +985,11 @@ def test_construct_from_list_of_files(tempdir): assert len(t1) == sum(map(len, tables)) d2 = ds.dataset(relative_paths, filesystem=_filesystem_uri(tempdir)) - t2 = d2.to_table() + t2 = d2.to_table(use_threads=False) d3 = ds.dataset(paths) - t3 = d3.to_table() + t3 = d3.to_table(use_threads=False) d4 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) - t4 = d4.to_table() + t4 = d4.to_table(use_threads=False) assert t1 == t2 == t3 == t4