diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 7215f2d7136..ae14e396e16 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -355,7 +355,7 @@ format, filesystem, and partition expressions manually: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())]) - dataset = ds.FileSystemDataset( + dataset = ds.FileSystemDataset.from_paths( ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(), filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()), partitions=[ds.field('year') == 2018, ds.field('year') == 2019]) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index f4997a94a05..ae8f19b88cb 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -455,21 +455,19 @@ cdef class UnionDataset(Dataset): cdef class FileSystemDataset(Dataset): - """A Dataset created from a set of files on a particular filesystem. + """A Dataset of file fragments. + + A FileSystemDataset is composed of one or more FileFragment. Parameters ---------- - paths_or_selector : Union[FileSelector, List[str]] - List of files/directories to consume. + fragments : list[Fragments] + List of fragments to consume. schema : Schema - The top-level schema of the DataDataset. + The top-level schema of the Dataset. format : FileFormat - File format to create fragments from, currently only - ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported. - filesystem : FileSystem - The filesystem which files are from. - partitions : List[Expression], optional - Attach additional partition information for the file paths. + File format of the fragments, currently only ParquetFileFormat, + IpcFileFormat, and CsvFileFormat are supported. root_partition : Expression, optional The top-level partition of the DataDataset. """ @@ -477,20 +475,62 @@ cdef class FileSystemDataset(Dataset): cdef: CFileSystemDataset* filesystem_dataset - def __init__(self, paths_or_selector, schema=None, format=None, - filesystem=None, partitions=None, root_partition=None): + def __init__(self, fragments, Schema schema, FileFormat format, + root_partition=None): cdef: - FileInfo info - Expression expr FileFragment fragment - vector[CFileInfo] c_file_infos - vector[shared_ptr[CExpression]] c_partitions - shared_ptr[CFileFragment] c_fragment vector[shared_ptr[CFileFragment]] c_fragments CResult[shared_ptr[CDataset]] result root_partition = root_partition or _true + if not isinstance(root_partition, Expression): + raise TypeError( + "Argument 'root_partition' has incorrect type (expected " + "Epression, got {0})".format(type(root_partition)) + ) + + for fragment in fragments: + c_fragments.push_back( + static_pointer_cast[CFileFragment, CFragment]( + fragment.unwrap())) + + result = CFileSystemDataset.Make( + pyarrow_unwrap_schema(schema), + ( root_partition).unwrap(), + ( format).unwrap(), + c_fragments + ) + self.init(GetResultValue(result)) + + cdef void init(self, const shared_ptr[CDataset]& sp): + Dataset.init(self, sp) + self.filesystem_dataset = sp.get() + + @classmethod + def from_paths(cls, paths, schema=None, format=None, + filesystem=None, partitions=None, root_partition=None): + """A Dataset created from a list of paths on a particular filesystem. + + Parameters + ---------- + paths : list of str + List of file paths to create the fragments from. + schema : Schema + The top-level schema of the DataDataset. + format : FileFormat + File format to create fragments from, currently only + ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported. + filesystem : FileSystem + The filesystem which files are from. + partitions : List[Expression], optional + Attach additional partition information for the file paths. + root_partition : Expression, optional + The top-level partition of the DataDataset. + """ + cdef: + FileFragment fragment + root_partition = root_partition or _true for arg, class_, name in [ (schema, Schema, 'schema'), (format, FileFormat, 'format'), @@ -503,38 +543,19 @@ cdef class FileSystemDataset(Dataset): "got {2})".format(name, class_.__name__, type(arg)) ) - infos = filesystem.get_file_info(paths_or_selector) + partitions = partitions or [_true] * len(paths) - partitions = partitions or [_true] * len(infos) - - if len(infos) != len(partitions): + if len(paths) != len(partitions): raise ValueError( 'The number of files resulting from paths_or_selector ' 'must be equal to the number of partitions.' ) - for expr in partitions: - c_partitions.push_back(expr.unwrap()) - - for i, info in enumerate(infos): - if info.is_file: - fragment = format.make_fragment(info.path, filesystem, - partitions[i]) - c_fragments.push_back( - static_pointer_cast[CFileFragment, CFragment]( - fragment.unwrap())) - - result = CFileSystemDataset.Make( - pyarrow_unwrap_schema(schema), - ( root_partition).unwrap(), - ( format).unwrap(), - c_fragments - ) - self.init(GetResultValue(result)) - - cdef void init(self, const shared_ptr[CDataset]& sp): - Dataset.init(self, sp) - self.filesystem_dataset = sp.get() + fragments = [ + format.make_fragment(path, filesystem, partitions[i]) + for i, path in enumerate(paths) + ] + return FileSystemDataset(fragments, schema, format, root_partition) @property def files(self): diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index c581e0f1b1c..80b7c2a5467 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1406,8 +1406,11 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, # check for single NativeFile dataset if not isinstance(path_or_paths, list): if not _is_path_like(path_or_paths): - self._fragment = parquet_format.make_fragment(path_or_paths) - self._dataset = None + fragment = parquet_format.make_fragment(path_or_paths) + self._dataset = ds.FileSystemDataset( + [fragment], schema=fragment.physical_schema, + format=parquet_format + ) return # map old filesystems to new one @@ -1422,13 +1425,10 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, format=parquet_format, partitioning=partitioning) - self._fragment = None @property def schema(self): - if self._dataset is not None: - return self._dataset.schema - return self._fragment.physical_schema + return self._dataset.schema def read(self, columns=None, use_threads=True, use_pandas_metadata=False): """ @@ -1463,7 +1463,7 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False): ] columns = columns + list(set(index_columns) - set(columns)) - table = (self._dataset or self._fragment).to_table( + table = self._dataset.to_table( columns=columns, filter=self._filter_expression, use_threads=use_threads ) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6e760fb72a9..4150c1b0ea6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -194,70 +194,91 @@ def test_filesystem_dataset(mockfs): schema = pa.schema([ pa.field('const', pa.int64()) ]) - file_format = ds.ParquetFileFormat() - paths = ['subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet'] - partitions = [ds.scalar(True), ds.scalar(True)] + partitions = [ds.field('part') == x for x in range(1, 3)] + fragments = [file_format.make_fragment(path, mockfs, part) + for path, part in zip(paths, partitions)] + root_partition = ds.field('level') == ds.scalar(1337) - dataset = ds.FileSystemDataset( - schema=schema, - format=file_format, - filesystem=mockfs, - paths_or_selector=paths, - partitions=partitions + dataset_from_fragments = ds.FileSystemDataset( + fragments, schema=schema, format=file_format, + root_partition=root_partition, + ) + dataset_from_paths = ds.FileSystemDataset.from_paths( + paths, schema=schema, format=file_format, filesystem=mockfs, + partitions=partitions, root_partition=root_partition, ) - assert isinstance(dataset.format, ds.ParquetFileFormat) - - # the root_partition and partitions keywords have defaults + for dataset in [dataset_from_fragments, dataset_from_paths]: + assert isinstance(dataset, ds.FileSystemDataset) + assert isinstance(dataset.format, ds.ParquetFileFormat) + assert dataset.partition_expression.equals(root_partition) + assert set(dataset.files) == set(paths) + + fragments = list(dataset.get_fragments()) + for fragment, partition, path in zip(fragments, partitions, paths): + assert fragment.partition_expression.equals(partition) + assert fragment.path == path + assert isinstance(fragment.format, ds.ParquetFileFormat) + assert isinstance(fragment, ds.ParquetFileFragment) + assert fragment.row_groups is None + + row_group_fragments = list(fragment.split_by_row_group()) + assert len(row_group_fragments) == 1 + assert isinstance(row_group_fragments[0], ds.ParquetFileFragment) + assert row_group_fragments[0].path == path + assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)] + + fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) + assert len(fragments) == 2 + + # the root_partition keyword has a default dataset = ds.FileSystemDataset( - paths, schema, format=file_format, filesystem=mockfs, + fragments, schema=schema, format=file_format ) + assert dataset.partition_expression.equals(ds.scalar(True)) - assert isinstance(dataset.format, ds.ParquetFileFormat) + # from_paths partitions have defaults + dataset = ds.FileSystemDataset.from_paths( + paths, schema=schema, format=file_format, filesystem=mockfs + ) + assert dataset.partition_expression.equals(ds.scalar(True)) + for fragment in dataset.get_fragments(): + assert fragment.partition_expression.equals(ds.scalar(True)) # validation of required arguments with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset(paths, format=file_format, filesystem=mockfs) - with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset(paths, schema=schema, filesystem=mockfs) - with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset(paths, schema=schema, format=file_format) + ds.FileSystemDataset(fragments, file_format, schema) # validation of root_partition with pytest.raises(TypeError, match="incorrect type"): - ds.FileSystemDataset(paths, schema=schema, format=file_format, - filesystem=mockfs, root_partition=1) + ds.FileSystemDataset(fragments, schema=schema, format=file_format, + root_partition=1) + # missing required argument in from_paths + with pytest.raises(TypeError, match="incorrect type"): + ds.FileSystemDataset.from_paths(fragments, format=file_format) - root_partition = ds.field('level') == ds.scalar(1337) - partitions = [ds.field('part') == x for x in range(1, 3)] - dataset = ds.FileSystemDataset( - paths_or_selector=paths, - schema=schema, - root_partition=root_partition, - filesystem=mockfs, - partitions=partitions, - format=file_format - ) - assert dataset.partition_expression.equals(root_partition) - assert set(dataset.files) == set(paths) - fragments = list(dataset.get_fragments()) - for fragment, partition, path in zip(fragments, partitions, paths): - assert fragment.partition_expression.equals(partition) - assert fragment.path == path - assert isinstance(fragment.format, ds.ParquetFileFormat) - assert isinstance(fragment, ds.ParquetFileFragment) - assert fragment.row_groups is None +def test_filesystem_dataset_no_filesystem_interaction(): + # ARROW-8283 + schema = pa.schema([ + pa.field('f1', pa.int64()) + ]) + file_format = ds.IpcFileFormat() + paths = ['nonexistingfile.arrow'] - row_group_fragments = list(fragment.split_by_row_group()) - assert len(row_group_fragments) == 1 - assert isinstance(fragment, ds.ParquetFileFragment) - assert row_group_fragments[0].path == path - assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)] + # creating the dataset itself doesn't raise + dataset = ds.FileSystemDataset.from_paths( + paths, schema=schema, format=file_format, + filesystem=fs.LocalFileSystem(), + ) - fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) - assert len(fragments) == 2 + # getting fragments also doesn't raise + dataset.get_fragments() + + # scanning does raise + with pytest.raises(FileNotFoundError): + dataset.to_table() def test_dataset(dataset):