Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
FileSystem abstraction to interact with various local and remote filesystems.
"""

from pyarrow.util import _is_path_like, _stringify_path

from pyarrow._fs import ( # noqa
FileSelector,
FileType,
Expand Down Expand Up @@ -62,7 +64,9 @@ def __getattr__(name):
)


def _ensure_filesystem(filesystem, use_mmap=False):
def _ensure_filesystem(
filesystem, use_mmap=False, allow_legacy_filesystem=False
):
if isinstance(filesystem, FileSystem):
return filesystem

Expand All @@ -79,15 +83,43 @@ def _ensure_filesystem(filesystem, use_mmap=False):
return PyFileSystem(FSSpecHandler(filesystem))

# map old filesystems to new ones
from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem
import pyarrow.filesystem as legacyfs

if isinstance(filesystem, LegacyLocalFileSystem):
if isinstance(filesystem, legacyfs.LocalFileSystem):
return LocalFileSystem(use_mmap=use_mmap)
# TODO handle HDFS?
if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem):
return filesystem

raise TypeError("Unrecognized filesystem: {}".format(type(filesystem)))


def _resolve_filesystem_and_path(
path, filesystem=None, allow_legacy_filesystem=False
):
"""
Return filesystem/path from path which could be an URI or a plain
filesystem path.
"""
if not _is_path_like(path):
if filesystem is not None:
raise ValueError(
"'filesystem' passed but the specified path is file-like, so"
" there is nothing to open with 'filesystem'."
)
return filesystem, path

path = _stringify_path(path)

if filesystem is not None:
filesystem = _ensure_filesystem(
filesystem, allow_legacy_filesystem=allow_legacy_filesystem
)
return filesystem, path
else:
return FileSystem.from_uri(path)


class FSSpecHandler(FileSystemHandler):
"""
Handler for fsspec-based Python filesystems.
Expand Down
32 changes: 21 additions & 11 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
FileMetaData, RowGroupMetaData,
ColumnChunkMetaData,
ParquetSchema, ColumnSchema)
from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem,
resolve_filesystem_and_path)
from pyarrow.fs import (
LocalFileSystem, _resolve_filesystem_and_path, _ensure_filesystem)
from pyarrow import filesystem as legacyfs
from pyarrow.util import guid, _is_path_like, _stringify_path

_URI_STRIP_SCHEMES = ('hdfs',)
Expand All @@ -55,9 +56,9 @@ def _parse_uri(path):

def _get_filesystem_and_path(passed_filesystem, path):
if passed_filesystem is None:
return resolve_filesystem_and_path(path, passed_filesystem)
return legacyfs.resolve_filesystem_and_path(path, passed_filesystem)
else:
passed_filesystem = _ensure_filesystem(passed_filesystem)
passed_filesystem = legacyfs._ensure_filesystem(passed_filesystem)
parsed_path = _parse_uri(path)
return passed_filesystem, parsed_path

Expand Down Expand Up @@ -541,9 +542,16 @@ def __init__(self, where, schema, filesystem=None,
# sure to close it when `self.close` is called.
self.file_handle = None

filesystem, path = resolve_filesystem_and_path(where, filesystem)
filesystem, path = _resolve_filesystem_and_path(
where, filesystem, allow_legacy_filesystem=True
)
if filesystem is not None:
sink = self.file_handle = filesystem.open(path, 'wb')
if isinstance(filesystem, legacyfs.FileSystem):
# legacy filesystem (eg custom subclass)
# TODO deprecate
sink = self.file_handle = filesystem.open(path, 'wb')
else:
sink = self.file_handle = filesystem.open_output_stream(path)
else:
sink = where
self._metadata_collector = options.pop('metadata_collector', None)
Expand Down Expand Up @@ -1040,7 +1048,8 @@ class _ParquetDatasetMetadata:


def _open_dataset_file(dataset, path, meta=None):
if dataset.fs is not None and not isinstance(dataset.fs, LocalFileSystem):
if (dataset.fs is not None and
not isinstance(dataset.fs, legacyfs.LocalFileSystem)):
path = dataset.fs.open(path, mode='rb')
return ParquetFile(
path,
Expand Down Expand Up @@ -1378,7 +1387,6 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
partitioning="hive", read_dictionary=None, buffer_size=None,
memory_map=False, ignore_prefixes=None, **kwargs):
import pyarrow.dataset as ds
import pyarrow.fs

# Raise error for not supported keywords
for keyword, default in [
Expand Down Expand Up @@ -1422,12 +1430,12 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,

# map old filesystems to new one
if filesystem is not None:
filesystem = pyarrow.fs._ensure_filesystem(
filesystem = _ensure_filesystem(
filesystem, use_mmap=memory_map)
elif filesystem is None and memory_map:
# if memory_map is specified, assume local file system (string
# path can in principle be URI for any filesystem)
filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True)
filesystem = LocalFileSystem(use_mmap=True)

self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
format=parquet_format,
Expand Down Expand Up @@ -1586,8 +1594,10 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
"the 'partitioning' keyword is not supported when the "
"pyarrow.dataset module is not available"
)
filesystem, path = _resolve_filesystem_and_path(source, filesystem)
if filesystem is not None:
source = filesystem.open_input_file(path)
# TODO test that source is not a directory or a list
# TODO check filesystem?
dataset = ParquetFile(
source, metadata=metadata, read_dictionary=read_dictionary,
memory_map=memory_map, buffer_size=buffer_size)
Expand Down
89 changes: 86 additions & 3 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from pyarrow.tests import util
from pyarrow.util import guid
from pyarrow.filesystem import LocalFileSystem, FileSystem
from pyarrow import fs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also from pyarrow import filesystem as legacyfs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am going to leave it as is for now, because the old ones are still used a lot (would make the diff much larger, will keep that for a next PR, eg when actually deprecating)



try:
Expand Down Expand Up @@ -2152,7 +2153,7 @@ def s3_bucket(request, s3_connection, s3_server):


@pytest.fixture
def s3_example(s3_connection, s3_server, s3_bucket):
def s3_example_s3fs(s3_connection, s3_server, s3_bucket):
s3fs = pytest.importorskip('s3fs')

host, port, access_key, secret_key = s3_connection
Expand All @@ -2175,10 +2176,10 @@ def s3_example(s3_connection, s3_server, s3_bucket):
@pytest.mark.pandas
@pytest.mark.s3
@parametrize_legacy_dataset
def test_read_partitioned_directory_s3fs(s3_example, use_legacy_dataset):
def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
from pyarrow.filesystem import S3FSWrapper

fs, bucket_uri = s3_example
fs, bucket_uri = s3_example_s3fs
wrapper = S3FSWrapper(fs)
_partition_test_for_filesystem(wrapper, bucket_uri)

Expand Down Expand Up @@ -3510,6 +3511,88 @@ def test_parquet_file_pass_directory_instead_of_file(tempdir):
pq.ParquetFile(path)


@pytest.mark.pandas
@pytest.mark.parametrize("filesystem", [
None,
LocalFileSystem.get_instance(),
fs.LocalFileSystem(),
])
def test_parquet_writer_filesystem_local(tempdir, filesystem):
df = _test_dataframe(100)
table = pa.Table.from_pandas(df, preserve_index=False)
path = str(tempdir / 'data.parquet')

with pq.ParquetWriter(
path, table.schema, filesystem=filesystem, version='2.0'
) as writer:
writer.write_table(table)

result = _read_table(path).to_pandas()
tm.assert_frame_equal(result, df)


@pytest.fixture
def s3_example_fs(s3_connection, s3_server):
from pyarrow.fs import FileSystem

host, port, access_key, secret_key = s3_connection
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
.format(access_key, secret_key, host, port)
)
fs, path = FileSystem.from_uri(uri)

fs.create_dir("mybucket")

yield fs, uri, path


@pytest.mark.pandas
@pytest.mark.s3
def test_parquet_writer_filesystem_s3(s3_example_fs):
df = _test_dataframe(100)
table = pa.Table.from_pandas(df, preserve_index=False)

fs, uri, path = s3_example_fs

with pq.ParquetWriter(
path, table.schema, filesystem=fs, version='2.0'
) as writer:
writer.write_table(table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test ParquetWriter(path=uri)?

Copy link
Member Author

@jorisvandenbossche jorisvandenbossche Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added one, but it is segfaulting locally .. (maybe similar as ARROW-9814)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you pass -s to pytest, you should be able to see the C++ crash message (if any).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK to merge it with the commented out test for now? (opened issue for it at https://issues.apache.org/jira/browse/ARROW-9906)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!


result = _read_table(uri).to_pandas()
tm.assert_frame_equal(result, df)


# TODO segfaulting (ARROW-9814?)
# @pytest.mark.pandas
# @pytest.mark.s3
# def test_parquet_writer_filesystem_s3_uri(s3_example_fs):
# df = _test_dataframe(100)
# table = pa.Table.from_pandas(df, preserve_index=False)

# fs, uri, path = s3_example_fs

# with pq.ParquetWriter(uri, table.schema, version='2.0') as writer:
# writer.write_table(table)

# result = _read_table(path, filesystem=fs).to_pandas()
# tm.assert_frame_equal(result, df)


@pytest.mark.pandas
def test_parquet_writer_filesystem_buffer_raises():
df = _test_dataframe(100)
table = pa.Table.from_pandas(df, preserve_index=False)
filesystem = fs.LocalFileSystem()

# Should raise ValueError when filesystem is passed with file-like object
with pytest.raises(ValueError, match="specified path is file-like"):
pq.ParquetWriter(
pa.BufferOutputStream(), table.schema, filesystem=filesystem
)


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):
Expand Down