Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3139,6 +3139,13 @@ cdef class FileSystemFactoryOptions(_Weakrefable):
self.options.selector_ignore_prefixes = [tobytes(v) for v in values]


cdef vector[CFileInfo] unwrap_finfos(finfos):
cdef vector[CFileInfo] o_vect
for fi in finfos:
o_vect.push_back((<FileInfo> fi).unwrap())
return o_vect


cdef class FileSystemDatasetFactory(DatasetFactory):
"""
Create a DatasetFactory from a list of paths with schema inspection.
Expand All @@ -3163,6 +3170,7 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
FileSystemFactoryOptions options=None):
cdef:
vector[c_string] paths
vector[CFileInfo] finfos
CFileSelector c_selector
CResult[shared_ptr[CDatasetFactory]] result
shared_ptr[CFileSystem] c_filesystem
Expand All @@ -3184,14 +3192,24 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
c_options
)
elif isinstance(paths_or_selector, (list, tuple)):
paths = [tobytes(s) for s in paths_or_selector]
with nogil:
result = CFileSystemDatasetFactory.MakeFromPaths(
c_filesystem,
paths,
c_format,
c_options
)
if len(paths_or_selector) > 0 and isinstance(paths_or_selector[0], FileInfo):
finfos = unwrap_finfos(paths_or_selector)
with nogil:
result = CFileSystemDatasetFactory.MakeFromFileInfos(
c_filesystem,
finfos,
c_format,
c_options
)
else:
paths = [tobytes(s) for s in paths_or_selector]
with nogil:
result = CFileSystemDatasetFactory.MakeFromPaths(
c_filesystem,
paths,
c_format,
c_options
)
else:
raise TypeError('Must pass either paths or a FileSelector, but '
'passed {}'.format(type(paths_or_selector)))
Expand Down
16 changes: 14 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,22 @@ def _filesystem_dataset(source, schema=None, filesystem=None,
-------
FileSystemDataset
"""
from pyarrow.fs import LocalFileSystem, _ensure_filesystem, FileInfo

format = _ensure_format(format or 'parquet')
partitioning = _ensure_partitioning(partitioning)

if isinstance(source, (list, tuple)):
fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
if source and isinstance(source[0], FileInfo):
if filesystem is None:
# fall back to local file system as the default
fs = LocalFileSystem()
else:
# construct a filesystem if it is a valid URI
fs = _ensure_filesystem(filesystem)
paths_or_selector = source
else:
fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
else:
fs, paths_or_selector = _ensure_single_source(source, filesystem)

Expand Down Expand Up @@ -767,6 +778,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
... dataset("local/path/to/data", format="ipc")
... ]) # doctest: +SKIP
"""
from pyarrow.fs import FileInfo
# collect the keyword arguments for later reuse
kwargs = dict(
schema=schema,
Expand All @@ -781,7 +793,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
if _is_path_like(source):
return _filesystem_dataset(source, **kwargs)
elif isinstance(source, (tuple, list)):
if all(_is_path_like(elem) for elem in source):
if all(_is_path_like(elem) or isinstance(elem, FileInfo) for elem in source):
return _filesystem_dataset(source, **kwargs)
elif all(isinstance(elem, Dataset) for elem in source):
return _union_dataset(source, **kwargs)
Expand Down
8 changes: 8 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CFileFormat] format,
CFileSystemFactoryOptions options
)

@staticmethod
CResult[shared_ptr[CDatasetFactory]] MakeFromFileInfos "Make"(
shared_ptr[CFileSystem] filesystem,
vector[CFileInfo] files,
shared_ptr[CFileFormat] format,
CFileSystemFactoryOptions options
)
10 changes: 10 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,16 @@ def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader):
assert dataset_reader.to_table(dataset).equals(table)


@pytest.mark.parquet
@pytest.mark.s3
def test_open_dataset_from_fileinfos(s3_example_simple, dataset_reader):
table, path, filesystem, uri, _, _, _, _ = s3_example_simple
selector = fs.FileSelector("mybucket")
finfos = filesystem.get_file_info(selector)
dataset = ds.dataset(finfos, format="parquet", filesystem=filesystem)
assert dataset_reader.to_table(dataset).equals(table)


@pytest.mark.parquet
@pytest.mark.s3 # still needed to create the data
def test_open_dataset_from_uri_s3_fsspec(s3_example_simple):
Expand Down