Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/python/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ format, filesystem, and partition expressions manually:

schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

dataset = ds.FileSystemDataset(
dataset = ds.FileSystemDataset.from_paths(
["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
Expand Down
107 changes: 64 additions & 43 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -455,42 +455,82 @@ cdef class UnionDataset(Dataset):


cdef class FileSystemDataset(Dataset):
"""A Dataset created from a set of files on a particular filesystem.
"""A Dataset of file fragments.

A FileSystemDataset is composed of one or more FileFragment.

Parameters
----------
paths_or_selector : Union[FileSelector, List[str]]
List of files/directories to consume.
fragments : list[Fragments]
List of fragments to consume.
schema : Schema
The top-level schema of the DataDataset.
The top-level schema of the Dataset.
format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
partitions : List[Expression], optional
Attach additional partition information for the file paths.
File format of the fragments, currently only ParquetFileFormat,
IpcFileFormat, and CsvFileFormat are supported.
root_partition : Expression, optional
The top-level partition of the DataDataset.
"""

cdef:
CFileSystemDataset* filesystem_dataset

def __init__(self, paths_or_selector, schema=None, format=None,
filesystem=None, partitions=None, root_partition=None):
def __init__(self, fragments, Schema schema, FileFormat format,
root_partition=None):
cdef:
FileInfo info
Expression expr
FileFragment fragment
vector[CFileInfo] c_file_infos
vector[shared_ptr[CExpression]] c_partitions
shared_ptr[CFileFragment] c_fragment
vector[shared_ptr[CFileFragment]] c_fragments
CResult[shared_ptr[CDataset]] result

root_partition = root_partition or _true
if not isinstance(root_partition, Expression):
raise TypeError(
"Argument 'root_partition' has incorrect type (expected "
"Epression, got {0})".format(type(root_partition))
)

for fragment in fragments:
c_fragments.push_back(
static_pointer_cast[CFileFragment, CFragment](
fragment.unwrap()))

result = CFileSystemDataset.Make(
pyarrow_unwrap_schema(schema),
(<Expression> root_partition).unwrap(),
(<FileFormat> format).unwrap(),
c_fragments
)
self.init(GetResultValue(result))

cdef void init(self, const shared_ptr[CDataset]& sp):
Dataset.init(self, sp)
self.filesystem_dataset = <CFileSystemDataset*> sp.get()

@classmethod
def from_paths(cls, paths, schema=None, format=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema shouldn't be optional, add a test if the user doesn't pass a schema to see the result.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not optional in practice: a bit below I check that the value is a Schema (and not None).
I did this here to give proper error messages when omitting one of those (there is some discussion about this in the original PR that added this: #6913 (comment)), because cython is otherwise generating very confusing error messages. I still need to create a minimal example to report to cython ..

But added a test for from_paths that covers this to ensure it is indeed checked.

filesystem=None, partitions=None, root_partition=None):
"""A Dataset created from a list of paths on a particular filesystem.

Parameters
----------
paths : list of str
List of file paths to create the fragments from.
schema : Schema
The top-level schema of the DataDataset.
format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
partitions : List[Expression], optional
Attach additional partition information for the file paths.
root_partition : Expression, optional
The top-level partition of the DataDataset.
"""
cdef:
FileFragment fragment

root_partition = root_partition or _true
for arg, class_, name in [
(schema, Schema, 'schema'),
(format, FileFormat, 'format'),
Expand All @@ -503,38 +543,19 @@ cdef class FileSystemDataset(Dataset):
"got {2})".format(name, class_.__name__, type(arg))
)

infos = filesystem.get_file_info(paths_or_selector)
partitions = partitions or [_true] * len(paths)

partitions = partitions or [_true] * len(infos)

if len(infos) != len(partitions):
if len(paths) != len(partitions):
raise ValueError(
'The number of files resulting from paths_or_selector '
'must be equal to the number of partitions.'
)

for expr in partitions:
c_partitions.push_back(expr.unwrap())

for i, info in enumerate(infos):
if info.is_file:
fragment = format.make_fragment(info.path, filesystem,
partitions[i])
c_fragments.push_back(
static_pointer_cast[CFileFragment, CFragment](
fragment.unwrap()))

result = CFileSystemDataset.Make(
pyarrow_unwrap_schema(schema),
(<Expression> root_partition).unwrap(),
(<FileFormat> format).unwrap(),
c_fragments
)
self.init(GetResultValue(result))

cdef void init(self, const shared_ptr[CDataset]& sp):
Dataset.init(self, sp)
self.filesystem_dataset = <CFileSystemDataset*> sp.get()
fragments = [
format.make_fragment(path, filesystem, partitions[i])
for i, path in enumerate(paths)
]
return FileSystemDataset(fragments, schema, format, root_partition)

@property
def files(self):
Expand Down
14 changes: 7 additions & 7 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1406,8 +1406,11 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
# check for single NativeFile dataset
if not isinstance(path_or_paths, list):
if not _is_path_like(path_or_paths):
self._fragment = parquet_format.make_fragment(path_or_paths)
self._dataset = None
fragment = parquet_format.make_fragment(path_or_paths)
self._dataset = ds.FileSystemDataset(
[fragment], schema=fragment.physical_schema,
format=parquet_format
)
return

# map old filesystems to new one
Expand All @@ -1422,13 +1425,10 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
format=parquet_format,
partitioning=partitioning)
self._fragment = None

@property
def schema(self):
if self._dataset is not None:
return self._dataset.schema
return self._fragment.physical_schema
return self._dataset.schema

def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
Expand Down Expand Up @@ -1463,7 +1463,7 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
]
columns = columns + list(set(index_columns) - set(columns))

table = (self._dataset or self._fragment).to_table(
table = self._dataset.to_table(
columns=columns, filter=self._filter_expression,
use_threads=use_threads
)
Expand Down
115 changes: 68 additions & 47 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,70 +194,91 @@ def test_filesystem_dataset(mockfs):
schema = pa.schema([
pa.field('const', pa.int64())
])

file_format = ds.ParquetFileFormat()

paths = ['subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet']
partitions = [ds.scalar(True), ds.scalar(True)]
partitions = [ds.field('part') == x for x in range(1, 3)]
fragments = [file_format.make_fragment(path, mockfs, part)
for path, part in zip(paths, partitions)]
root_partition = ds.field('level') == ds.scalar(1337)

dataset = ds.FileSystemDataset(
schema=schema,
format=file_format,
filesystem=mockfs,
paths_or_selector=paths,
partitions=partitions
dataset_from_fragments = ds.FileSystemDataset(
fragments, schema=schema, format=file_format,
root_partition=root_partition,
)
dataset_from_paths = ds.FileSystemDataset.from_paths(
paths, schema=schema, format=file_format, filesystem=mockfs,
partitions=partitions, root_partition=root_partition,
)

assert isinstance(dataset.format, ds.ParquetFileFormat)

# the root_partition and partitions keywords have defaults
for dataset in [dataset_from_fragments, dataset_from_paths]:
assert isinstance(dataset, ds.FileSystemDataset)
assert isinstance(dataset.format, ds.ParquetFileFormat)
assert dataset.partition_expression.equals(root_partition)
assert set(dataset.files) == set(paths)

fragments = list(dataset.get_fragments())
for fragment, partition, path in zip(fragments, partitions, paths):
assert fragment.partition_expression.equals(partition)
assert fragment.path == path
assert isinstance(fragment.format, ds.ParquetFileFormat)
assert isinstance(fragment, ds.ParquetFileFragment)
assert fragment.row_groups is None

row_group_fragments = list(fragment.split_by_row_group())
assert len(row_group_fragments) == 1
assert isinstance(row_group_fragments[0], ds.ParquetFileFragment)
assert row_group_fragments[0].path == path
assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)]

fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
assert len(fragments) == 2

# the root_partition keyword has a default
dataset = ds.FileSystemDataset(
paths, schema, format=file_format, filesystem=mockfs,
fragments, schema=schema, format=file_format
)
assert dataset.partition_expression.equals(ds.scalar(True))

assert isinstance(dataset.format, ds.ParquetFileFormat)
# from_paths partitions have defaults
dataset = ds.FileSystemDataset.from_paths(
paths, schema=schema, format=file_format, filesystem=mockfs
)
assert dataset.partition_expression.equals(ds.scalar(True))
for fragment in dataset.get_fragments():
assert fragment.partition_expression.equals(ds.scalar(True))

# validation of required arguments
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, format=file_format, filesystem=mockfs)
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, schema=schema, filesystem=mockfs)
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, schema=schema, format=file_format)
ds.FileSystemDataset(fragments, file_format, schema)
# validation of root_partition
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, schema=schema, format=file_format,
filesystem=mockfs, root_partition=1)
ds.FileSystemDataset(fragments, schema=schema, format=file_format,
root_partition=1)
# missing required argument in from_paths
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset.from_paths(fragments, format=file_format)

root_partition = ds.field('level') == ds.scalar(1337)
partitions = [ds.field('part') == x for x in range(1, 3)]
dataset = ds.FileSystemDataset(
paths_or_selector=paths,
schema=schema,
root_partition=root_partition,
filesystem=mockfs,
partitions=partitions,
format=file_format
)
assert dataset.partition_expression.equals(root_partition)
assert set(dataset.files) == set(paths)

fragments = list(dataset.get_fragments())
for fragment, partition, path in zip(fragments, partitions, paths):
assert fragment.partition_expression.equals(partition)
assert fragment.path == path
assert isinstance(fragment.format, ds.ParquetFileFormat)
assert isinstance(fragment, ds.ParquetFileFragment)
assert fragment.row_groups is None
def test_filesystem_dataset_no_filesystem_interaction():
# ARROW-8283
schema = pa.schema([
pa.field('f1', pa.int64())
])
file_format = ds.IpcFileFormat()
paths = ['nonexistingfile.arrow']

row_group_fragments = list(fragment.split_by_row_group())
assert len(row_group_fragments) == 1
assert isinstance(fragment, ds.ParquetFileFragment)
assert row_group_fragments[0].path == path
assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)]
# creating the dataset itself doesn't raise
dataset = ds.FileSystemDataset.from_paths(
paths, schema=schema, format=file_format,
filesystem=fs.LocalFileSystem(),
)

fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
assert len(fragments) == 2
# getting fragments also doesn't raise
dataset.get_fragments()

# scanning does raise
with pytest.raises(FileNotFoundError):
dataset.to_table()


def test_dataset(dataset):
Expand Down