diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 69404b2a48e..226b80b8ff0 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -19,6 +19,8 @@ FileSystem abstraction to interact with various local and remote filesystems. """ +from pyarrow.util import _is_path_like, _stringify_path + from pyarrow._fs import ( # noqa FileSelector, FileType, @@ -62,7 +64,9 @@ def __getattr__(name): ) -def _ensure_filesystem(filesystem, use_mmap=False): +def _ensure_filesystem( + filesystem, use_mmap=False, allow_legacy_filesystem=False +): if isinstance(filesystem, FileSystem): return filesystem @@ -79,15 +83,43 @@ def _ensure_filesystem(filesystem, use_mmap=False): return PyFileSystem(FSSpecHandler(filesystem)) # map old filesystems to new ones - from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem + import pyarrow.filesystem as legacyfs - if isinstance(filesystem, LegacyLocalFileSystem): + if isinstance(filesystem, legacyfs.LocalFileSystem): return LocalFileSystem(use_mmap=use_mmap) # TODO handle HDFS? + if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem): + return filesystem raise TypeError("Unrecognized filesystem: {}".format(type(filesystem))) +def _resolve_filesystem_and_path( + path, filesystem=None, allow_legacy_filesystem=False +): + """ + Return filesystem/path from path which could be an URI or a plain + filesystem path. + """ + if not _is_path_like(path): + if filesystem is not None: + raise ValueError( + "'filesystem' passed but the specified path is file-like, so" + " there is nothing to open with 'filesystem'." + ) + return filesystem, path + + path = _stringify_path(path) + + if filesystem is not None: + filesystem = _ensure_filesystem( + filesystem, allow_legacy_filesystem=allow_legacy_filesystem + ) + return filesystem, path + else: + return FileSystem.from_uri(path) + + class FSSpecHandler(FileSystemHandler): """ Handler for fsspec-based Python filesystems. diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index cff5dd58564..41f861caa1e 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -35,8 +35,9 @@ FileMetaData, RowGroupMetaData, ColumnChunkMetaData, ParquetSchema, ColumnSchema) -from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem, - resolve_filesystem_and_path) +from pyarrow.fs import ( + LocalFileSystem, _resolve_filesystem_and_path, _ensure_filesystem) +from pyarrow import filesystem as legacyfs from pyarrow.util import guid, _is_path_like, _stringify_path _URI_STRIP_SCHEMES = ('hdfs',) @@ -55,9 +56,9 @@ def _parse_uri(path): def _get_filesystem_and_path(passed_filesystem, path): if passed_filesystem is None: - return resolve_filesystem_and_path(path, passed_filesystem) + return legacyfs.resolve_filesystem_and_path(path, passed_filesystem) else: - passed_filesystem = _ensure_filesystem(passed_filesystem) + passed_filesystem = legacyfs._ensure_filesystem(passed_filesystem) parsed_path = _parse_uri(path) return passed_filesystem, parsed_path @@ -541,9 +542,16 @@ def __init__(self, where, schema, filesystem=None, # sure to close it when `self.close` is called. self.file_handle = None - filesystem, path = resolve_filesystem_and_path(where, filesystem) + filesystem, path = _resolve_filesystem_and_path( + where, filesystem, allow_legacy_filesystem=True + ) if filesystem is not None: - sink = self.file_handle = filesystem.open(path, 'wb') + if isinstance(filesystem, legacyfs.FileSystem): + # legacy filesystem (eg custom subclass) + # TODO deprecate + sink = self.file_handle = filesystem.open(path, 'wb') + else: + sink = self.file_handle = filesystem.open_output_stream(path) else: sink = where self._metadata_collector = options.pop('metadata_collector', None) @@ -1040,7 +1048,8 @@ class _ParquetDatasetMetadata: def _open_dataset_file(dataset, path, meta=None): - if dataset.fs is not None and not isinstance(dataset.fs, LocalFileSystem): + if (dataset.fs is not None and + not isinstance(dataset.fs, legacyfs.LocalFileSystem)): path = dataset.fs.open(path, mode='rb') return ParquetFile( path, @@ -1378,7 +1387,6 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, partitioning="hive", read_dictionary=None, buffer_size=None, memory_map=False, ignore_prefixes=None, **kwargs): import pyarrow.dataset as ds - import pyarrow.fs # Raise error for not supported keywords for keyword, default in [ @@ -1422,12 +1430,12 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, # map old filesystems to new one if filesystem is not None: - filesystem = pyarrow.fs._ensure_filesystem( + filesystem = _ensure_filesystem( filesystem, use_mmap=memory_map) elif filesystem is None and memory_map: # if memory_map is specified, assume local file system (string # path can in principle be URI for any filesystem) - filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True) + filesystem = LocalFileSystem(use_mmap=True) self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, format=parquet_format, @@ -1586,8 +1594,10 @@ def read_table(source, columns=None, use_threads=True, metadata=None, "the 'partitioning' keyword is not supported when the " "pyarrow.dataset module is not available" ) + filesystem, path = _resolve_filesystem_and_path(source, filesystem) + if filesystem is not None: + source = filesystem.open_input_file(path) # TODO test that source is not a directory or a list - # TODO check filesystem? dataset = ParquetFile( source, metadata=metadata, read_dictionary=read_dictionary, memory_map=memory_map, buffer_size=buffer_size) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index b87cd55e320..f654ad94e1b 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -31,6 +31,7 @@ from pyarrow.tests import util from pyarrow.util import guid from pyarrow.filesystem import LocalFileSystem, FileSystem +from pyarrow import fs try: @@ -2152,7 +2153,7 @@ def s3_bucket(request, s3_connection, s3_server): @pytest.fixture -def s3_example(s3_connection, s3_server, s3_bucket): +def s3_example_s3fs(s3_connection, s3_server, s3_bucket): s3fs = pytest.importorskip('s3fs') host, port, access_key, secret_key = s3_connection @@ -2175,10 +2176,10 @@ def s3_example(s3_connection, s3_server, s3_bucket): @pytest.mark.pandas @pytest.mark.s3 @parametrize_legacy_dataset -def test_read_partitioned_directory_s3fs(s3_example, use_legacy_dataset): +def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset): from pyarrow.filesystem import S3FSWrapper - fs, bucket_uri = s3_example + fs, bucket_uri = s3_example_s3fs wrapper = S3FSWrapper(fs) _partition_test_for_filesystem(wrapper, bucket_uri) @@ -3510,6 +3511,88 @@ def test_parquet_file_pass_directory_instead_of_file(tempdir): pq.ParquetFile(path) +@pytest.mark.pandas +@pytest.mark.parametrize("filesystem", [ + None, + LocalFileSystem.get_instance(), + fs.LocalFileSystem(), +]) +def test_parquet_writer_filesystem_local(tempdir, filesystem): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + + with pq.ParquetWriter( + path, table.schema, filesystem=filesystem, version='2.0' + ) as writer: + writer.write_table(table) + + result = _read_table(path).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.fixture +def s3_example_fs(s3_connection, s3_server): + from pyarrow.fs import FileSystem + + host, port, access_key, secret_key = s3_connection + uri = ( + "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" + .format(access_key, secret_key, host, port) + ) + fs, path = FileSystem.from_uri(uri) + + fs.create_dir("mybucket") + + yield fs, uri, path + + +@pytest.mark.pandas +@pytest.mark.s3 +def test_parquet_writer_filesystem_s3(s3_example_fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, uri, path = s3_example_fs + + with pq.ParquetWriter( + path, table.schema, filesystem=fs, version='2.0' + ) as writer: + writer.write_table(table) + + result = _read_table(uri).to_pandas() + tm.assert_frame_equal(result, df) + + +# TODO segfaulting (ARROW-9814?) +# @pytest.mark.pandas +# @pytest.mark.s3 +# def test_parquet_writer_filesystem_s3_uri(s3_example_fs): +# df = _test_dataframe(100) +# table = pa.Table.from_pandas(df, preserve_index=False) + +# fs, uri, path = s3_example_fs + +# with pq.ParquetWriter(uri, table.schema, version='2.0') as writer: +# writer.write_table(table) + +# result = _read_table(path, filesystem=fs).to_pandas() +# tm.assert_frame_equal(result, df) + + +@pytest.mark.pandas +def test_parquet_writer_filesystem_buffer_raises(): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + filesystem = fs.LocalFileSystem() + + # Should raise ValueError when filesystem is passed with file-like object + with pytest.raises(ValueError, match="specified path is file-like"): + pq.ParquetWriter( + pa.BufferOutputStream(), table.schema, filesystem=filesystem + ) + + @pytest.mark.pandas @parametrize_legacy_dataset def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):