diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 2dec6399a07..652ca80ffa6 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -99,11 +99,8 @@ std::string ConcatAbstractPath(const std::string& base, const std::string& stem) DCHECK(!stem.empty()); if (base.empty()) { return stem; - } else if (base.back() == kSep) { - return base + stem; - } else { - return base + kSep + stem; } + return EnsureTrailingSlash(base) + RemoveLeadingSlash(stem).to_string(); } std::string EnsureTrailingSlash(util::string_view v) { diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3870c39725f..14559cbe61c 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -291,7 +291,16 @@ struct S3Path { out->bucket = std::string(src.substr(0, first_sep)); out->key = std::string(src.substr(first_sep + 1)); out->key_parts = internal::SplitAbstractPath(out->key); - return internal::ValidateAbstractPathParts(out->key_parts); + return Validate(out); + } + + static Status Validate(S3Path* path) { + auto result = internal::ValidateAbstractPathParts(path->key_parts); + if (!result.ok()) { + return Status::Invalid(result.message(), " in path ", path->full_path); + } else { + return result; + } } Aws::String ToURLEncodedAwsString() const { diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index a2d4a2c4ce1..1cf8cf63885 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -81,6 +81,7 @@ def parse_git(root, **kwargs): Field, Schema, schema, + unify_schemas, Array, Tensor, array, chunked_array, record_batch, table, SparseCOOTensor, SparseCSRMatrix, SparseCSCMatrix, diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ed99f36daf9..ba13770ab2a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -54,7 +54,7 @@ cdef class Dataset: shared_ptr[CDataset] wrapped CDataset* dataset - def __init__(self, children, Schema schema not None): + def __init__(self): _forbid_instantiation(self.__class__) cdef void init(self, const shared_ptr[CDataset]& sp): @@ -985,12 +985,12 @@ cdef class DatasetFactory: def __init__(self, list children): _forbid_instantiation(self.__class__) - cdef init(self, shared_ptr[CDatasetFactory]& sp): + cdef init(self, const shared_ptr[CDatasetFactory]& sp): self.wrapped = sp self.factory = sp.get() @staticmethod - cdef wrap(shared_ptr[CDatasetFactory]& sp): + cdef wrap(const shared_ptr[CDatasetFactory]& sp): cdef DatasetFactory self = \ DatasetFactory.__new__(DatasetFactory) self.init(sp) @@ -1030,8 +1030,9 @@ cdef class DatasetFactory: ------- Schema """ - cdef CResult[shared_ptr[CSchema]] result - cdef CInspectOptions options + cdef: + CInspectOptions options + CResult[shared_ptr[CSchema]] result with nogil: result = self.factory.Inspect(options) return pyarrow_wrap_schema(GetResultValue(result)) @@ -1054,6 +1055,7 @@ cdef class DatasetFactory: cdef: shared_ptr[CSchema] sp_schema CResult[shared_ptr[CDataset]] result + if schema is not None: sp_schema = pyarrow_unwrap_schema(schema) with nogil: @@ -1061,6 +1063,7 @@ cdef class DatasetFactory: else: with nogil: result = self.factory.Finish() + return Dataset.wrap(GetResultValue(result)) @@ -1093,8 +1096,14 @@ cdef class FileSystemFactoryOptions: __slots__ = () # avoid mistakingly creating attributes - def __init__(self, partition_base_dir=None, exclude_invalid_files=None, + def __init__(self, partition_base_dir=None, partitioning=None, + exclude_invalid_files=None, list selector_ignore_prefixes=None): + if isinstance(partitioning, PartitioningFactory): + self.partitioning_factory = partitioning + elif isinstance(partitioning, Partitioning): + self.partitioning = partitioning + if partition_base_dir is not None: self.partition_base_dir = partition_base_dir if exclude_invalid_files is not None: @@ -1245,7 +1254,7 @@ cdef class UnionDatasetFactory(DatasetFactory): """ cdef: - CDatasetFactory* union_factory + CUnionDatasetFactory* union_factory def __init__(self, list factories): cdef: @@ -1255,6 +1264,10 @@ cdef class UnionDatasetFactory(DatasetFactory): c_factories.push_back(factory.unwrap()) self.init(GetResultValue(CUnionDatasetFactory.Make(c_factories))) + cdef init(self, const shared_ptr[CDatasetFactory]& sp): + DatasetFactory.init(self, sp) + self.union_factory = sp.get() + cdef class ScanTask: """Read record batches from a range of a single data fragment. diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx index 42d1c4c6193..fc8f937fb93 100644 --- a/python/pyarrow/_fs.pyx +++ b/python/pyarrow/_fs.pyx @@ -657,6 +657,15 @@ cdef class SubTreeFileSystem(FileSystem): FileSystem.wrap(self.subtreefs.base_fs()) ) + @property + def base_path(self): + return frombytes(self.subtreefs.base_path()) + + @property + def base_fs(self): + return FileSystem.wrap(self.subtreefs.base_fs()) + + cdef class _MockFileSystem(FileSystem): def __init__(self, datetime current_time=None): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b03c8c01f1f..37acf239aec 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -18,6 +18,7 @@ """Dataset is currently unstable. APIs subject to change without notice.""" import pyarrow as pa +from pyarrow.fs import _normalize_path, _MockFileSystem from pyarrow.util import _stringify_path, _is_path_like from pyarrow._dataset import ( # noqa @@ -55,6 +56,40 @@ ) +def field(name): + """Reference a named column of the dataset. + + Stores only the field's name. Type and other information is known only when + the expression is bound to a dataset having an explicit scheme. + + Parameters + ---------- + name : string + The name of the field the expression references to. + + Returns + ------- + field_expr : FieldExpression + """ + return FieldExpression(name) + + +def scalar(value): + """Expression representing a scalar value. + + Parameters + ---------- + value : bool, int, float or string + Python value of the scalar. Note that only a subset of types are + currently supported. + + Returns + ------- + scalar_expr : ScalarExpression + """ + return ScalarExpression(value) + + def partitioning(schema=None, field_names=None, flavor=None): """ Specify a partitioning scheme. @@ -153,59 +188,12 @@ def partitioning(schema=None, field_names=None, flavor=None): raise ValueError("Unsupported flavor") -def _ensure_fs(filesystem, path): - # Validate or infer the filesystem from the path - from pyarrow.fs import ( - FileSystem, LocalFileSystem, FileType, _normalize_path) - - if filesystem is None: - # First check if the file exists as a local (relative) file path - filesystem = LocalFileSystem() - try: - infos = filesystem.get_file_info([path])[0] - except OSError: - local_path_exists = False - else: - local_path_exists = (infos.type != FileType.NotFound) - - if not local_path_exists: - # Perhaps it's a URI? - try: - return FileSystem.from_uri(path) - except ValueError as e: - if "empty scheme" not in str(e): - raise - # ARROW-8213: not a URI, assume local path - # to get a nice error message. - - # ensure we have a proper path (eg no backslashes on Windows) - path = _normalize_path(filesystem, path) - - return filesystem, path - - -def _ensure_fs_and_paths(path, filesystem=None): - # Return filesystem and list of string paths or FileSelector - from pyarrow.fs import FileType, FileSelector - - path = _stringify_path(path) - filesystem, path = _ensure_fs(filesystem, path) - infos = filesystem.get_file_info([path])[0] - if infos.type == FileType.Directory: - # for directory, pass a selector - paths_or_selector = FileSelector(path, recursive=True) - elif infos.type == FileType.File: - # for a single file path, pass it as a list - paths_or_selector = [path] - else: - raise FileNotFoundError(path) - - return filesystem, paths_or_selector - - def _ensure_partitioning(scheme): - # Validate input and return a Partitioning(Factory) or passthrough None - # for no partitioning + """ + Validate input and return a Partitioning(Factory). + + It passes None through if no partitioning scheme is defiend. + """ if scheme is None: pass elif isinstance(scheme, str): @@ -215,9 +203,8 @@ def _ensure_partitioning(scheme): elif isinstance(scheme, (Partitioning, PartitioningFactory)): pass else: - ValueError( - "Expected Partitioning or PartitioningFactory, got {}".format( - type(scheme))) + ValueError("Expected Partitioning or PartitioningFactory, got {}" + .format(type(scheme))) return scheme @@ -232,168 +219,403 @@ def _ensure_format(obj): raise ValueError("format '{}' is not supported".format(obj)) -def factory(path_or_paths, filesystem=None, partitioning=None, - format=None): +def _ensure_filesystem(fs_or_uri): + from pyarrow.fs import ( + FileSystem, LocalFileSystem, SubTreeFileSystem, FileType + ) + + if isinstance(fs_or_uri, str): + # instantiate the file system from an uri, if the uri has a path + # component then it will be treated as a path prefix + filesystem, prefix = FileSystem.from_uri(fs_or_uri) + is_local = isinstance(filesystem, LocalFileSystem) + prefix = _normalize_path(filesystem, prefix) + if prefix: + # validate that the prefix is pointing to a directory + prefix_info = filesystem.get_file_info([prefix])[0] + if prefix_info.type != FileType.Directory: + raise ValueError( + "The path component of the filesystem URI must point to a " + "directory but it has a type: `{}`. The path component " + "is `{}` and the given filesystem URI is `{}`".format( + prefix_info.type.name, prefix_info.path, fs_or_uri + ) + ) + filesystem = SubTreeFileSystem(prefix, filesystem) + return filesystem, is_local + elif isinstance(fs_or_uri, (LocalFileSystem, _MockFileSystem)): + return fs_or_uri, True + elif isinstance(fs_or_uri, FileSystem): + return fs_or_uri, False + else: + raise TypeError( + '`filesystem` argument must be a FileSystem instance or a valid ' + 'file system URI' + ) + + +def _ensure_multiple_sources(paths, filesystem=None): """ - Create a factory which can be used to build a Dataset. + Treat a list of paths as files belonging to a single file system + + If the file system is local then also validates that all paths + are referencing existing *files* otherwise any non-file paths will be + silently skipped (for example on a remote filesystem). Parameters ---------- - path_or_paths : str, pathlib.Path, or list of those - Path to a file or to a directory containing the data files, or - a list of paths. - filesystem : FileSystem, default None - By default will be inferred from the path. - partitioning : Partitioning or PartitioningFactory or str or list of str - The partitioning scheme specified with the ``partitioning()`` - function. A flavor string can be used as shortcut, and with a list of - field names a DirectionaryPartitioning will be inferred. - format : str, default None - Currently only "parquet" is supported. + paths : list of path-like + Note that URIs are not allowed. + filesystem : FileSystem or str, optional + If an URI is passed, then its path component will act as a prefix for + the file paths. Returns ------- - FileSystemDatasetFactory + (FileSystem, list of str) + File system object and a list of normalized paths. + + Raises + ------ + TypeError + If the passed filesystem has wrong type. + IOError + If the file system is local and a referenced path is not available or + not a file. """ - if not isinstance(path_or_paths, (list, tuple)): - path_or_paths = [path_or_paths] + from pyarrow.fs import LocalFileSystem, FileType - partitioning = _ensure_partitioning(partitioning) - format = _ensure_format(format or "parquet") - - # TODO pass through options - options = FileSystemFactoryOptions() - if isinstance(partitioning, PartitioningFactory): - options.partitioning_factory = partitioning - elif isinstance(partitioning, Partitioning): - options.partitioning = partitioning - - factories = [] - for path in path_or_paths: - fs, paths_or_selector = _ensure_fs_and_paths(path, filesystem) - factories.append(FileSystemDatasetFactory(fs, paths_or_selector, - format, options)) - - if len(factories) == 0: - raise ValueError("Need at least one path") - elif len(factories) == 1: - return factories[0] + if filesystem is None: + # fall back to local file system as the default + filesystem = LocalFileSystem() + + # construct a filesystem if it is a valid URI + filesystem, is_local = _ensure_filesystem(filesystem) + + # allow normalizing irregular paths such as Windows local paths + paths = [_normalize_path(filesystem, _stringify_path(p)) for p in paths] + + # validate that all of the paths are pointing to existing *files* + # possible improvement is to group the file_infos by type and raise for + # multiple paths per error category + if is_local: + for info in filesystem.get_file_info(paths): + file_type = info.type + if file_type == FileType.File: + continue + elif file_type == FileType.NotFound: + raise FileNotFoundError(info.path) + elif file_type == FileType.Directory: + raise IsADirectoryError( + 'Path {} points to a directory, but only file paths are ' + 'supported. To construct a nested or union dataset pass ' + 'a list of dataset objects instead.'.format(info.path) + ) + else: + raise IOError( + 'Path {} exists but its type is unknown (could be a ' + 'special file such as a Unix socket or character device, ' + 'or Windows NUL / CON / ...)'.format(info.path) + ) + + return filesystem, paths + + +def _ensure_single_source(path, filesystem=None): + """ + Treat path as either a recursively traversable directory or a single file. + + Parameters + ---------- + path : path-like + filesystem : FileSystem or str, optional + If an URI is passed, then its path component will act as a prefix for + the file paths. + + Returns + ------- + (FileSystem, list of str or fs.Selector) + File system object and either a single item list pointing to a file or + an fs.Selector object pointing to a directory. + + Raises + ------ + TypeError + If the passed filesystem has wrong type. + FileNotFoundError + If the referenced file or directory doesn't exist. + """ + from pyarrow.fs import FileSystem, LocalFileSystem, FileType, FileSelector + + path = _stringify_path(path) + + # if filesystem is not given try to automatically determine one + # first check if the file exists as a local (relative) file path + # if not then try to parse the path as an URI + file_info = None + if filesystem is None: + filesystem = LocalFileSystem() + try: + file_info = filesystem.get_file_info([path])[0] + except OSError: + file_info = None + exists_locally = False + else: + exists_locally = (file_info.type != FileType.NotFound) + + # if the file or directory doesn't exists locally, then assume that + # the path is an URI describing the file system as well + if not exists_locally: + try: + filesystem, path = FileSystem.from_uri(path) + except ValueError as e: + # ARROW-8213: neither an URI nor a locally existing path, + # so assume that local path was given and propagate a nicer + # file not found error instead of a more confusing scheme + # parsing error + if "empty scheme" not in str(e): + raise + else: + # unset file_info to query it again from the new filesystem + file_info = None + + # construct a filesystem if it is a valid URI + filesystem, _ = _ensure_filesystem(filesystem) + + # ensure that the path is normalized before passing to dataset discovery + path = _normalize_path(filesystem, path) + + # retrieve the file descriptor + if file_info is None: + file_info = filesystem.get_file_info([path])[0] + + # depending on the path type either return with a recursive + # directory selector or as a list containing a single file + if file_info.type == FileType.Directory: + paths_or_selector = FileSelector(path, recursive=True) + elif file_info.type == FileType.File: + paths_or_selector = [path] else: - return UnionDatasetFactory(factories) - - -def _ensure_factory(src, **kwargs): - # Need to return DatasetFactory since `dataset` might need to finish the - # factory with a unified schema. - # TODO: return Dataset if a specific schema was passed? - if _is_path_like(src): - return factory(src, **kwargs) - elif isinstance(src, DatasetFactory): - if any(v is not None for v in kwargs.values()): - # when passing a SourceFactory, the arguments cannot be specified - raise ValueError( - "When passing a DatasetFactory, you cannot pass any " - "additional arguments" - ) - return src - elif isinstance(src, Dataset): - raise TypeError( - "Dataset objects are currently not supported, only DatasetFactory " - "instances. Use the factory() function to create such objects." - ) + raise FileNotFoundError(path) + + return filesystem, paths_or_selector + + +def _filesystem_dataset(source, schema=None, filesystem=None, + partitioning=None, format=None, + partition_base_dir=None, exclude_invalid_files=None, + selector_ignore_prefixes=None): + """ + Create a FileSystemDataset which can be used to build a Dataset. + + Parameters are documented in the dataset function. + + Returns + ------- + FileSystemDataset + """ + format = _ensure_format(format or 'parquet') + partitioning = _ensure_partitioning(partitioning) + + if isinstance(source, (list, tuple)): + fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) else: - raise TypeError( - "Expected a path-like or DatasetFactory, got {}".format(type(src)) + fs, paths_or_selector = _ensure_single_source(source, filesystem) + + options = FileSystemFactoryOptions( + partitioning=partitioning, + partition_base_dir=partition_base_dir, + exclude_invalid_files=exclude_invalid_files, + selector_ignore_prefixes=selector_ignore_prefixes + ) + factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options) + + return factory.finish(schema) + + +def _union_dataset(children, schema=None, **kwargs): + if any(v is not None for v in kwargs.values()): + raise ValueError( + "When passing a list of Datasets, you cannot pass any additional " + "arguments" ) + if schema is None: + # unify the children datasets' schemas + schema = pa.unify_schemas([child.schema for child in children]) + + # create datasets with the requested schema + children = [child.replace_schema(schema) for child in children] + + return UnionDataset(schema, children) -def dataset(paths_or_factories, filesystem=None, partitioning=None, - format=None, schema=None): + +def dataset(source, schema=None, format=None, filesystem=None, + partitioning=None, partition_base_dir=None, + exclude_invalid_files=None, ignore_prefixes=None): """ Open a dataset. + Datasets provides functionality to efficiently work with tabular, + potentially larger than memory and multi-file dataset. + + - A unified interface for different sources, like Parquet and Feather + - Discovery of sources (crawling directories, handle directory-based + partitioned datasets, basic schema normalization) + - Optimized reading with predicate pushdown (filtering rows), projection + (selecting columns), parallel reading or fine-grained managing of tasks. + + Note that this is the high-level API, to have more control over the dataset + construction use the low-level API classes (FileSystemDataset, + FilesystemDatasetFactory, etc.) + Parameters ---------- - paths_or_factories : path or list of paths or factory or list of factories - Path to a file or to a directory containing the data files, or a list - of paths for a multi-directory dataset. To have more control, a list of - factories can be passed, created with the ``factory()`` function (in - this case, the additional keywords will be ignored). - filesystem : FileSystem, default None - By default will be inferred from the path. + source : path, list of paths, dataset, list of datasets or URI + Path pointing to a single file: + Open a FileSystemDataset from a single file. + Path pointing to a directory: + The directory gets discovered recursively according to a + partitioning scheme if given. + List of file paths: + Create a FileSystemDataset from explicitly given files. The files + must be located on the same filesystem given by the filesystem + parameter. + Note that in contrary of construction from a single file, passing + URIs as paths is not allowed. + List of datasets: + A nested UnionDataset gets constructed, it allows arbitrary + composition of other datasets. + Note that additional keyword arguments are not allowed. + schema : Schema, optional + Optionally provide the Schema for the Dataset, in which case it will + not be inferred from the source. + format : FileFormat or str + Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For + Feather, only version 2 files are supported. + filesystem : FileSystem or URI string, default None + If a single path is given as source and filesystem is None, then the + filesystem will be inferred from the path. + If an URI string is passed, then a filesystem object is constructed + using the URI's optional path component as a directory prefix. See the + examples below. + Note that the URIs on Windows must follow 'file:///C:...' or + 'file:/C:...' patterns. partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of field names a DirectionaryPartitioning will be inferred. - format : str - Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For - Feather, only version 2 files are supported. - schema : Schema, optional - Optionally provide the Schema for the Dataset, in which case it will - not be inferred from the source. + partition_base_dir : str, optional + For the purposes of applying the partitioning, paths will be + stripped of the partition_base_dir. Files not matching the + partition_base_dir prefix will be skipped for partitioning discovery. + The ignored files will still be part of the Dataset, but will not + have partition information. + exclude_invalid_files : bool, optional (default True) + If True, invalid files will be excluded (file format specific check). + This will incur IO for each files in a serial and single threaded + fashion. Disabling this feature will skip the IO, but unsupported + files may be present in the Dataset (resulting in an error at scan + time). + ignore_prefixes : list, optional + Files matching one of those prefixes will be ignored by the + discovery process. This is matched to the basename of a path. + By default this is ['.', '_']. + Note that discovery happens only if a directory is passed as source. Returns ------- - Dataset + dataset : Dataset + Either a FileSystemDataset or a UnionDataset depending on the source + parameter. Examples -------- + Opening a single file: + + >>> dataset("path/to/file.parquet", format="parquet") + + Opening a single file with an explicit schema: + + >>> dataset("path/to/file.parquet", schema=myschema, format="parquet") + Opening a dataset for a single directory: >>> dataset("path/to/nyc-taxi/", format="parquet") + >>> dataset("s3://mybucket/nyc-taxi/", format="parquet") - Construction from multiple factories: + Opening a dataset from a list of relatives local paths: >>> dataset([ - ... factory("s3://old-taxi-data", format="parquet"), - ... factory("local/path/to/new/data", format="csv") - ... ]) - - """ - # bundle the keyword arguments - kwargs = dict(filesystem=filesystem, partitioning=partitioning, - format=format) + ... "part0/data.parquet", + ... "part1/data.parquet", + ... "part3/data.parquet", + ... ], format='parquet') - single_dataset = False - if not isinstance(paths_or_factories, list): - paths_or_factories = [paths_or_factories] - single_dataset = True + With filesystem provided: - factories = [_ensure_factory(f, **kwargs) for f in paths_or_factories] - if single_dataset: - return factories[0].finish(schema=schema) - return UnionDatasetFactory(factories).finish(schema=schema) + >>> paths = [ + ... 'part0/data.parquet', + ... 'part1/data.parquet', + ... 'part3/data.parquet', + ... ] + >>> dataset(paths, filesystem='file:///directory/prefix, format='parquet') + Which is equivalent with: -def field(name): - """References a named column of the dataset. + >>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem()) + >>> dataset(paths, filesystem=fs, format='parquet') - Stores only the field's name. Type and other information is known only when - the expression is applied on a dataset having an explicit scheme. - - Parameters - ---------- - name : string - The name of the field the expression references to. + With a remote filesystem URI: - Returns - ------- - field_expr : FieldExpression - """ - return FieldExpression(name) + >>> paths = [ + ... 'nested/directory/part0/data.parquet', + ... 'nested/directory/part1/data.parquet', + ... 'nested/directory/part3/data.parquet', + ... ] + >>> dataset(paths, filesystem='s3://bucket/', format='parquet') + Similarly to the local example, the directory prefix may be included in the + filesystem URI: -def scalar(value): - """Expression representing a scalar value. + >>> dataset(paths, filesystem='s3://bucket/nested/directory', + ... format='parquet') - Parameters - ---------- - value : bool, int, float or string - Python value of the scalar. Note that only a subset of types are - currently supported. + Construction of a nested dataset: - Returns - ------- - scalar_expr : ScalarExpression + >>> dataset([ + ... dataset("s3://old-taxi-data", format="parquet"), + ... dataset("local/path/to/data", format="ipc") + ... ]) """ - return ScalarExpression(value) + # collect the keyword arguments for later reuse + kwargs = dict( + schema=schema, + filesystem=filesystem, + partitioning=partitioning, + format=format, + partition_base_dir=partition_base_dir, + exclude_invalid_files=exclude_invalid_files, + selector_ignore_prefixes=ignore_prefixes + ) + + # TODO(kszucs): support InMemoryDataset for a table input + if _is_path_like(source): + return _filesystem_dataset(source, **kwargs) + elif isinstance(source, (tuple, list)): + if all(_is_path_like(elem) for elem in source): + return _filesystem_dataset(source, **kwargs) + elif all(isinstance(elem, Dataset) for elem in source): + return _union_dataset(source, **kwargs) + else: + unique_types = set(type(elem).__name__ for elem in source) + type_names = ', '.join('{}'.format(t) for t in unique_types) + raise TypeError( + 'Expected a list of path-like or dataset objects. The given ' + 'list contains the following types: {}'.format(type_names) + ) + else: + raise TypeError( + 'Expected a path-like, list of path-likes or a list of Datasets ' + 'instead of the given type: {}'.format(type(source).__name__) + ) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 1511ac29db9..fc5b73563bb 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -411,6 +411,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: const shared_ptr[CKeyValueMetadata]& metadata) shared_ptr[CSchema] RemoveMetadata() + CResult[shared_ptr[CSchema]] UnifySchemas( + const vector[shared_ptr[CSchema]]& schemas) + cdef cppclass PrettyPrintOptions: PrettyPrintOptions() PrettyPrintOptions(int indent_arg) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 088c834a1dd..d67a6b54a55 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -18,6 +18,7 @@ import contextlib import operator import os +import pathlib import pickle import numpy as np @@ -78,9 +79,18 @@ def _table_from_pandas(df): return table.replace_schema_metadata() +def _filesystem_uri(path): + # URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns. + if os.name == 'nt': + uri = 'file:///{}'.format(path) + else: + uri = 'file://{}'.format(path) + return uri + + @pytest.fixture -def mockfs(request): - request.config.pyarrow.requires('parquet') +@pytest.mark.parquet +def mockfs(): import pyarrow.parquet as pq mockfs = fs._MockFileSystem() @@ -272,7 +282,6 @@ def test_dataset(dataset): assert isinstance(dataset.schema, pa.Schema) # TODO(kszucs): test non-boolean Exprs for filter do raise - expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) for task in dataset.scan(): @@ -549,12 +558,7 @@ def test_file_format_pickling(): @pytest.mark.parametrize('paths_or_selector', [ fs.FileSelector('subdir', recursive=True), [ - 'subdir', - 'subdir/1', - 'subdir/1/xxx', 'subdir/1/xxx/file0.parquet', - 'subdir/2', - 'subdir/2/yyy', 'subdir/2/yyy/file1.parquet', ] ]) @@ -602,7 +606,8 @@ def test_filesystem_factory(mockfs, paths_or_selector): expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) expected_str = pa.DictionaryArray.from_arrays( pa.array([0, 1, 2, 3, 4], type=pa.int32()), - pa.array("0 1 2 3 4".split(), type=pa.string())) + pa.array("0 1 2 3 4".split(), type=pa.string()) + ) for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']): expected_group = pa.array([group] * 5, type=pa.int32()) expected_key = pa.array([key] * 5, type=pa.string()) @@ -839,7 +844,6 @@ def test_partitioning_function(): names = ["year", "month"] # default DirectoryPartitioning - part = ds.partitioning(schema) assert isinstance(part, ds.DirectoryPartitioning) part = ds.partitioning(field_names=names) @@ -853,7 +857,6 @@ def test_partitioning_function(): ds.partitioning(schema, field_names=schema) # Hive partitioning - part = ds.partitioning(schema, flavor="hive") assert isinstance(part, ds.HivePartitioning) part = ds.partitioning(flavor="hive") @@ -896,43 +899,21 @@ def _check_dataset(dataset, table): def _check_dataset_from_path(path, table, **kwargs): - import pathlib - # pathlib object assert isinstance(path, pathlib.Path) - dataset = ds.dataset(ds.factory(path, **kwargs)) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) - # string path - dataset = ds.dataset(ds.factory(str(path), **kwargs)) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) + # accept Path, str, List[Path], List[str] + for p in [path, str(path), [path], [str(path)]]: + dataset = ds.dataset(path, **kwargs) + assert isinstance(dataset, ds.FileSystemDataset) + _check_dataset(dataset, table) # relative string path with change_cwd(path.parent): - dataset = ds.dataset(ds.factory(path.name, **kwargs)) + dataset = ds.dataset(path.name, **kwargs) assert isinstance(dataset, ds.FileSystemDataset) _check_dataset(dataset, table) - # passing directly to dataset - dataset = ds.dataset(path, **kwargs) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) - - dataset = ds.dataset(str(path), **kwargs) - assert isinstance(dataset, ds.FileSystemDataset) - _check_dataset(dataset, table) - - # passing list of files (even of length-1) gives UnionDataset - dataset = ds.dataset([path], **kwargs) - assert isinstance(dataset, ds.UnionDataset) - _check_dataset(dataset, table) - - dataset = ds.dataset([str(path)], **kwargs) - assert isinstance(dataset, ds.UnionDataset) - _check_dataset(dataset, table) - @pytest.mark.parquet def test_open_dataset_single_file(tempdir): @@ -952,11 +933,9 @@ def test_open_dataset_list_of_files(tempdir): tables, (path1, path2) = _create_directory_of_files(tempdir) table = pa.concat_tables(tables) - # list of exact files needs to be passed to source() function - # (dataset() will interpret it as separate sources) datasets = [ - ds.dataset(ds.factory([path1, path2])), - ds.dataset(ds.factory([str(path1), str(path2)])) + ds.dataset([path1, path2]), + ds.dataset([str(path1), str(path2)]) ] for dataset in datasets: assert dataset.schema.equals(table.schema) @@ -964,6 +943,127 @@ def test_open_dataset_list_of_files(tempdir): assert result.equals(table) +def test_construct_from_single_file(tempdir): + directory = tempdir / 'single-file' + directory.mkdir() + table, path = _create_single_file(directory) + relative_path = path.relative_to(directory) + + # instantiate from a single file + d1 = ds.dataset(path) + # instantiate from a single file with a filesystem object + d2 = ds.dataset(path, filesystem=fs.LocalFileSystem()) + # instantiate from a single file with prefixed filesystem URI + d3 = ds.dataset(relative_path, filesystem=_filesystem_uri(directory)) + assert d1.to_table() == d2.to_table() == d3.to_table() + + +def test_construct_from_single_directory(tempdir): + directory = tempdir / 'single-directory' + directory.mkdir() + tables, paths = _create_directory_of_files(directory) + + d1 = ds.dataset(directory) + d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) + d3 = ds.dataset(directory.name, filesystem=_filesystem_uri(tempdir)) + t1 = d1.to_table(use_threads=False) + t2 = d2.to_table(use_threads=False) + t3 = d3.to_table(use_threads=False) + assert t1 == t2 == t3 + + +def test_construct_from_list_of_files(tempdir): + # instantiate from a list of files + directory = tempdir / 'list-of-files' + directory.mkdir() + tables, paths = _create_directory_of_files(directory) + + relative_paths = [p.relative_to(tempdir) for p in paths] + with change_cwd(tempdir): + d1 = ds.dataset(relative_paths) + t1 = d1.to_table() + assert len(t1) == sum(map(len, tables)) + + d2 = ds.dataset(relative_paths, filesystem=_filesystem_uri(tempdir)) + t2 = d2.to_table(use_threads=False) + d3 = ds.dataset(paths) + t3 = d3.to_table(use_threads=False) + d4 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) + t4 = d4.to_table(use_threads=False) + + assert t1 == t2 == t3 == t4 + + +def test_construct_from_list_of_mixed_paths_fails(mockfs): + # isntantiate from a list of mixed paths + files = [ + 'subdir/1/xxx/file0.parquet', + 'subdir/1/xxx/doesnt-exist.parquet', + ] + with pytest.raises(FileNotFoundError, match='doesnt-exist'): + ds.dataset(files, filesystem=mockfs) + + +def test_construct_from_mixed_child_datasets(mockfs): + # isntantiate from a list of mixed paths + dataset = ds.dataset([ + ds.dataset(['subdir/1/xxx/file0.parquet', + 'subdir/2/yyy/file1.parquet'], filesystem=mockfs), + ds.dataset('subdir', filesystem=mockfs) + ]) + assert isinstance(dataset, ds.UnionDataset) + assert len(list(dataset.get_fragments())) == 4 + + table = dataset.to_table() + assert len(table) == 20 + assert table.num_columns == 4 + + +def test_construct_empty_dataset(): + empty = ds.dataset([]) + table = empty.to_table() + assert table.num_rows == 0 + assert table.num_columns == 0 + + empty = ds.dataset([], schema=pa.schema([ + ('a', pa.int64()), + ('a', pa.string()) + ])) + table = empty.to_table() + assert table.num_rows == 0 + assert table.num_columns == 2 + + +def test_construct_from_invalid_sources_raise(multisourcefs): + child1 = ds.FileSystemDatasetFactory( + multisourcefs, + fs.FileSelector('/plain'), + format=ds.ParquetFileFormat() + ) + child2 = ds.FileSystemDatasetFactory( + multisourcefs, + fs.FileSelector('/schema'), + format=ds.ParquetFileFormat() + ) + + with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'): + ds.dataset([child1, child2]) + + expected = ( + "Expected a list of path-like or dataset objects. The given list " + "contains the following types: int" + ) + with pytest.raises(TypeError, match=expected): + ds.dataset([1, 2, 3]) + + expected = ( + "Expected a path-like, list of path-likes or a list of Datasets " + "instead of the given type: NoneType" + ) + with pytest.raises(TypeError, match=expected): + ds.dataset(None) + + @pytest.mark.parquet def test_open_dataset_partitioned_directory(tempdir): import pyarrow.parquet as pq @@ -1043,21 +1143,18 @@ def test_open_dataset_unsupported_format(tempdir): @pytest.mark.parquet -def test_open_dataset_validate_sources(tempdir): +def test_open_union_dataset(tempdir): _, path = _create_single_file(tempdir) dataset = ds.dataset(path) - with pytest.raises(TypeError, - match="Dataset objects are currently not supported"): - ds.dataset([dataset]) + union = ds.dataset([dataset, dataset]) + assert isinstance(union, ds.UnionDataset) -def test_open_dataset_from_source_additional_kwargs(multisourcefs): - child = ds.FileSystemDatasetFactory( - multisourcefs, fs.FileSelector('/plain'), - format=ds.ParquetFileFormat() - ) + +def test_open_union_dataset_with_additional_kwargs(multisourcefs): + child = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') with pytest.raises(ValueError, match="cannot pass any additional"): - ds.dataset(child, format="parquet") + ds.dataset([child], format="parquet") def test_open_dataset_non_existing_file(): @@ -1066,6 +1163,9 @@ def test_open_dataset_non_existing_file(): with pytest.raises(FileNotFoundError): ds.dataset('i-am-not-existing.parquet', format='parquet') + with pytest.raises(pa.ArrowInvalid, match='cannot be relative'): + ds.dataset('file:i-am-not-existing.parquet', format='parquet') + @pytest.mark.parquet @pytest.mark.s3 @@ -1096,6 +1196,74 @@ def test_open_dataset_from_uri_s3(s3_connection, s3_server): assert dataset.to_table().equals(table) +@pytest.mark.parquet +@pytest.mark.s3 +def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server): + from pyarrow.fs import FileSystem + import pyarrow.parquet as pq + + host, port, access_key, secret_key = s3_connection + bucket = 'theirbucket' + path = 'nested/folder/data.parquet' + uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format( + access_key, secret_key, bucket, path, host, port + ) + + fs, path = FileSystem.from_uri(uri) + assert path == 'theirbucket/nested/folder/data.parquet' + + fs.create_dir(bucket) + + table = pa.table({'a': [1, 2, 3]}) + with fs.open_output_stream(path) as out: + pq.write_table(table, out) + + # full string URI + dataset = ds.dataset(uri, format="parquet") + assert dataset.to_table().equals(table) + + # passing filesystem as an uri + template = ( + "s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format( + access_key, secret_key, host, port + ) + ) + cases = [ + ('theirbucket/nested/folder/', '/data.parquet'), + ('theirbucket/nested/folder', 'data.parquet'), + ('theirbucket/nested/', 'folder/data.parquet'), + ('theirbucket/nested', 'folder/data.parquet'), + ('theirbucket', '/nested/folder/data.parquet'), + ('theirbucket', 'nested/folder/data.parquet'), + ] + for prefix, path in cases: + uri = template.format(prefix) + dataset = ds.dataset(path, filesystem=uri, format="parquet") + assert dataset.to_table().equals(table) + + with pytest.raises(pa.ArrowInvalid, match='Missing bucket name'): + uri = template.format('/') + ds.dataset('/theirbucket/nested/folder/data.parquet', filesystem=uri) + + error = ( + "The path component of the filesystem URI must point to a directory " + "but it has a type: `{}`. The path component is `{}` and the given " + "filesystem URI is `{}`" + ) + + path = 'theirbucket/doesnt/exist' + uri = template.format(path) + with pytest.raises(ValueError) as exc: + ds.dataset('data.parquet', filesystem=uri) + assert str(exc.value) == error.format('NotFound', path, uri) + + path = 'theirbucket/nested/folder/data.parquet' + uri = template.format(path) + with pytest.raises(ValueError) as exc: + ds.dataset('data.parquet', filesystem=uri) + assert str(exc.value) == error.format('File', path, uri) + + @pytest.mark.parquet def test_filter_implicit_cast(tempdir): # ARROW-7652 @@ -1109,8 +1277,11 @@ def test_filter_implicit_cast(tempdir): assert len(result) == 3 -def test_dataset_factory(multisourcefs): - child = ds.factory('/plain', filesystem=multisourcefs, format='parquet') +def test_dataset_union(multisourcefs): + child = ds.FileSystemDatasetFactory( + multisourcefs, fs.FileSelector('/plain'), + format=ds.ParquetFileFormat() + ) factory = ds.UnionDatasetFactory([child]) # TODO(bkietz) reintroduce factory.children property @@ -1121,15 +1292,21 @@ def test_dataset_factory(multisourcefs): assert isinstance(factory.finish(), ds.Dataset) -def test_multiple_factories(multisourcefs): - src1 = ds.factory('/plain', filesystem=multisourcefs, format='parquet') - src2 = ds.factory('/schema', filesystem=multisourcefs, format='parquet', - partitioning=['week', 'color']) - src3 = ds.factory('/hive', filesystem=multisourcefs, format='parquet', - partitioning='hive') +def test_union_dataset_from_other_datasets(tempdir, multisourcefs): + child1 = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') + child2 = ds.dataset('/schema', filesystem=multisourcefs, format='parquet', + partitioning=['week', 'color']) + child3 = ds.dataset('/hive', filesystem=multisourcefs, format='parquet', + partitioning='hive') + + assert child1.schema != child2.schema != child3.schema + + assembled = ds.dataset([child1, child2, child3]) + assert isinstance(assembled, ds.UnionDataset) - assembled = ds.dataset([src1, src2, src3]) - assert isinstance(assembled, ds.Dataset) + msg = 'cannot pass any additional arguments' + with pytest.raises(ValueError, match=msg): + ds.dataset([child1, child2], filesystem=multisourcefs) expected_schema = pa.schema([ ('date', pa.date32()), @@ -1141,12 +1318,59 @@ def test_multiple_factories(multisourcefs): ('month', pa.int32()), ]) assert assembled.schema.equals(expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + assembled = ds.dataset([child1, child3]) + expected_schema = pa.schema([ + ('date', pa.date32()), + ('index', pa.int64()), + ('value', pa.float64()), + ('color', pa.string()), + ('year', pa.int32()), + ('month', pa.int32()), + ]) + assert assembled.schema.equals(expected_schema) + assert assembled.to_table().schema.equals(expected_schema) -def test_multiple_factories_with_selectors(multisourcefs): + expected_schema = pa.schema([ + ('month', pa.int32()), + ('color', pa.string()), + ('date', pa.date32()), + ]) + assembled = ds.dataset([child1, child3], schema=expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + + expected_schema = pa.schema([ + ('month', pa.int32()), + ('color', pa.string()), + ('unkown', pa.string()) # fill with nulls + ]) + assembled = ds.dataset([child1, child3], schema=expected_schema) + assert assembled.to_table().schema.equals(expected_schema) + + # incompatible schemas, date and index columns have conflicting types + table = pa.table([range(9), [0.] * 4 + [1.] * 5, 'abcdefghj'], + names=['date', 'value', 'index']) + _, path = _create_single_file(tempdir, table=table) + child4 = ds.dataset(path) + + with pytest.raises(pa.ArrowInvalid, match='Unable to merge'): + ds.dataset([child1, child4]) + + +def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): + msg = 'points to a directory, but only file paths are supported' + with pytest.raises(IsADirectoryError, match=msg): + ds.dataset(['/plain', '/schema', '/hive'], filesystem=multisourcefs) + + +def test_union_dataset_filesystem_datasets(multisourcefs): # without partitioning - dataset = ds.dataset(['/plain', '/schema', '/hive'], - filesystem=multisourcefs, format='parquet') + dataset = ds.dataset([ + ds.dataset('/plain', filesystem=multisourcefs), + ds.dataset('/schema', filesystem=multisourcefs), + ds.dataset('/hive', filesystem=multisourcefs), + ]) expected_schema = pa.schema([ ('date', pa.date32()), ('index', pa.int64()), @@ -1156,8 +1380,11 @@ def test_multiple_factories_with_selectors(multisourcefs): assert dataset.schema.equals(expected_schema) # with hive partitioning for two hive sources - dataset = ds.dataset(['/hive', '/hive_color'], filesystem=multisourcefs, - format='parquet', partitioning='hive') + dataset = ds.dataset([ + ds.dataset('/plain', filesystem=multisourcefs), + ds.dataset('/schema', filesystem=multisourcefs), + ds.dataset('/hive', filesystem=multisourcefs, partitioning='hive') + ]) expected_schema = pa.schema([ ('date', pa.date32()), ('index', pa.int64()), diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index cec18b1e45a..8bb2e5c6937 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -211,6 +211,18 @@ def test_filesystem_equals(): assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/other', fs0) +def test_subtree_filesystem(): + localfs = LocalFileSystem() + + subfs = SubTreeFileSystem('/base', localfs) + assert subfs.base_path == '/base/' + assert subfs.base_fs == localfs + + subfs = SubTreeFileSystem('/another/base/', LocalFileSystem()) + assert subfs.base_path == '/another/base/' + assert subfs.base_fs == localfs + + def test_filesystem_pickling(fs): if isinstance(fs, _MockFileSystem): pytest.xfail(reason='MockFileSystem is not serializable') diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 56b5cbadf04..9388a3c5523 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -2488,13 +2488,7 @@ def _assert_dataset_paths(dataset, paths, use_legacy_dataset): assert set(map(str, paths)) == {x.path for x in dataset.pieces} else: paths = [str(path.as_posix()) for path in paths] - if hasattr(dataset._dataset, 'files'): - assert set(paths) == set(dataset._dataset.files) - else: - # UnionDataset - # TODO(temp hack) remove this branch once ARROW-7965 is in (which - # will change this to a FileSystemDataset) - assert dataset.read().num_rows == 50 + assert set(paths) == set(dataset._dataset.files) @pytest.mark.pandas diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py index dac76def856..d30a7bce7c3 100644 --- a/python/pyarrow/tests/test_schema.py +++ b/python/pyarrow/tests/test_schema.py @@ -654,3 +654,35 @@ def test_schema_sizeof(): assert sys.getsizeof(schema2) > sys.getsizeof(schema) schema3 = schema.with_metadata({"key": "some more metadata"}) assert sys.getsizeof(schema3) > sys.getsizeof(schema2) + + +def test_schema_merge(): + a = pa.schema([ + pa.field('foo', pa.int32()), + pa.field('bar', pa.string()), + pa.field('baz', pa.list_(pa.int8())) + ]) + b = pa.schema([ + pa.field('foo', pa.int32()), + pa.field('qux', pa.bool_()) + ]) + c = pa.schema([ + pa.field('quux', pa.dictionary(pa.int32(), pa.string())) + ]) + d = pa.schema([ + pa.field('foo', pa.int64()), + pa.field('qux', pa.bool_()) + ]) + + result = pa.unify_schemas([a, b, c]) + expected = pa.schema([ + pa.field('foo', pa.int32()), + pa.field('bar', pa.string()), + pa.field('baz', pa.list_(pa.int8())), + pa.field('qux', pa.bool_()), + pa.field('quux', pa.dictionary(pa.int32(), pa.string())) + ]) + assert result.equals(expected) + + with pytest.raises(pa.ArrowInvalid): + pa.unify_schemas([b, d]) diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 6609a81581f..c2835a744a7 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1615,6 +1615,45 @@ cdef class Schema: return self.__str__() +def unify_schemas(list schemas): + """ + Unify schemas by merging fields by name. + + The resulting schema will contain the union of fields from all schemas. + Fields with the same name will be merged. Note that two fields with + different types will fail merging. + + - The unified field will inherit the metadata from the schema where + that field is first defined. + - The first N fields in the schema will be ordered the same as the + N fields in the first schema. + + The resulting schema will inherit its metadata from the first input + schema. + + Parameters + ---------- + schemas : list of Schema + Schemas to merge into a single one. + + Returns + ------- + Schema + + Raises + ------ + ArrowInvalid : + If any input schema contains fields with duplicate names. + If Fields of the same name are not mergeable. + """ + cdef: + Schema schema + vector[shared_ptr[CSchema]] c_schemas + for schema in schemas: + c_schemas.push_back(pyarrow_unwrap_schema(schema)) + return pyarrow_wrap_schema(GetResultValue(UnifySchemas(c_schemas))) + + cdef dict _type_cache = {}