From 6a329f42c4586b8c5e6f0368d797e2850389d5b2 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 18 Aug 2020 17:27:13 +0200 Subject: [PATCH 1/8] ARROW-9718: [Python] ParquetWriter to work with new FileSystem API --- python/pyarrow/fs.py | 21 +++++++++++++++++++++ python/pyarrow/parquet.py | 5 +++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 69404b2a48e..589fbcb4de0 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -19,6 +19,8 @@ FileSystem abstraction to interact with various local and remote filesystems. """ +from pyarrow.util import _is_path_like, _stringify_path + from pyarrow._fs import ( # noqa FileSelector, FileType, @@ -88,6 +90,25 @@ def _ensure_filesystem(filesystem, use_mmap=False): raise TypeError("Unrecognized filesystem: {}".format(type(filesystem))) +def _resolve_filesystem_and_path(path, filesystem=None): + """ + Return filesystem/path from path which could be an URI or a plain + filesystem path. + """ + if not _is_path_like(path): + if filesystem is not None: + raise ValueError("filesystem passed but where is file-like, so" + " there is nothing to open with filesystem.") + return filesystem, path + + path = _stringify_path(path) + + if filesystem is not None: + return _ensure_filesystem(filesystem), path + else: + return FileSystem.from_uri(path) + + class FSSpecHandler(FileSystemHandler): """ Handler for fsspec-based Python filesystems. diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index cff5dd58564..35c8beffb2a 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -541,9 +541,10 @@ def __init__(self, where, schema, filesystem=None, # sure to close it when `self.close` is called. self.file_handle = None - filesystem, path = resolve_filesystem_and_path(where, filesystem) + from pyarrow.fs import _resolve_filesystem_and_path + filesystem, path = _resolve_filesystem_and_path(where, filesystem) if filesystem is not None: - sink = self.file_handle = filesystem.open(path, 'wb') + sink = self.file_handle = filesystem.open_output_stream(path) else: sink = where self._metadata_collector = options.pop('metadata_collector', None) From 775c5e990bf3cda02bab81178b5bb9db2cae3f20 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 27 Aug 2020 11:10:51 +0200 Subject: [PATCH 2/8] workaround for custom subclasses --- python/pyarrow/fs.py | 20 ++++++++++++++++---- python/pyarrow/parquet.py | 16 ++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 589fbcb4de0..84687c4d4eb 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -64,7 +64,9 @@ def __getattr__(name): ) -def _ensure_filesystem(filesystem, use_mmap=False): +def _ensure_filesystem( + filesystem, use_mmap=False, allow_legacy_filesystem=False +): if isinstance(filesystem, FileSystem): return filesystem @@ -81,16 +83,23 @@ def _ensure_filesystem(filesystem, use_mmap=False): return PyFileSystem(FSSpecHandler(filesystem)) # map old filesystems to new ones - from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem + from pyarrow.filesystem import ( + FileSystem as LegacyFileSystem, + LocalFileSystem as LegacyLocalFileSystem + ) if isinstance(filesystem, LegacyLocalFileSystem): return LocalFileSystem(use_mmap=use_mmap) # TODO handle HDFS? + if allow_legacy_filesystem and isinstance(filesystem, LegacyFileSystem): + return filesystem raise TypeError("Unrecognized filesystem: {}".format(type(filesystem))) -def _resolve_filesystem_and_path(path, filesystem=None): +def _resolve_filesystem_and_path( + path, filesystem=None, allow_legacy_filesystem=False +): """ Return filesystem/path from path which could be an URI or a plain filesystem path. @@ -104,7 +113,10 @@ def _resolve_filesystem_and_path(path, filesystem=None): path = _stringify_path(path) if filesystem is not None: - return _ensure_filesystem(filesystem), path + filesystem = _ensure_filesystem( + filesystem, allow_legacy_filesystem=allow_legacy_filesystem + ) + return filesystem, path else: return FileSystem.from_uri(path) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 35c8beffb2a..0fc5ca16c78 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -35,8 +35,9 @@ FileMetaData, RowGroupMetaData, ColumnChunkMetaData, ParquetSchema, ColumnSchema) -from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem, - resolve_filesystem_and_path) +from pyarrow.filesystem import ( + FileSystem, LocalFileSystem, _ensure_filesystem, + resolve_filesystem_and_path) from pyarrow.util import guid, _is_path_like, _stringify_path _URI_STRIP_SCHEMES = ('hdfs',) @@ -542,9 +543,16 @@ def __init__(self, where, schema, filesystem=None, self.file_handle = None from pyarrow.fs import _resolve_filesystem_and_path - filesystem, path = _resolve_filesystem_and_path(where, filesystem) + filesystem, path = _resolve_filesystem_and_path( + where, filesystem, allow_legacy_filesystem=True + ) if filesystem is not None: - sink = self.file_handle = filesystem.open_output_stream(path) + if isinstance(filesystem, FileSystem): + # legacy filesystem (eg custom subclass) + # TODO deprecate + sink = self.file_handle = filesystem.open(path, 'wb') + else: + sink = self.file_handle = filesystem.open_output_stream(path) else: sink = where self._metadata_collector = options.pop('metadata_collector', None) From 842aae9fa19b86d349fe5b00365b85ca4495fb85 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 27 Aug 2020 11:44:15 +0200 Subject: [PATCH 3/8] add tests for filesystem keyword --- python/pyarrow/tests/test_parquet.py | 72 ++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index b87cd55e320..99428be7a28 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -31,6 +31,7 @@ from pyarrow.tests import util from pyarrow.util import guid from pyarrow.filesystem import LocalFileSystem, FileSystem +from pyarrow import fs try: @@ -2152,7 +2153,7 @@ def s3_bucket(request, s3_connection, s3_server): @pytest.fixture -def s3_example(s3_connection, s3_server, s3_bucket): +def s3_example_s3fs(s3_connection, s3_server, s3_bucket): s3fs = pytest.importorskip('s3fs') host, port, access_key, secret_key = s3_connection @@ -2175,10 +2176,10 @@ def s3_example(s3_connection, s3_server, s3_bucket): @pytest.mark.pandas @pytest.mark.s3 @parametrize_legacy_dataset -def test_read_partitioned_directory_s3fs(s3_example, use_legacy_dataset): +def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset): from pyarrow.filesystem import S3FSWrapper - fs, bucket_uri = s3_example + fs, bucket_uri = s3_example_s3fs wrapper = S3FSWrapper(fs) _partition_test_for_filesystem(wrapper, bucket_uri) @@ -3510,6 +3511,71 @@ def test_parquet_file_pass_directory_instead_of_file(tempdir): pq.ParquetFile(path) +@pytest.mark.parametrize("filesystem", [ + None, + LocalFileSystem.get_instance(), + fs.LocalFileSystem(), +]) +def test_parquet_writer_filesystem_local(tempdir, filesystem): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + + with pq.ParquetWriter( + path, table.schema, filesystem=filesystem, version='2.0' + ) as writer: + writer.write_table(table) + + result = _read_table(path).to_pandas() + tm.assert_frame_equal(result, df) + + +@pytest.fixture +def s3_example_fs(s3_connection, s3_server): + from pyarrow.fs import FileSystem + + host, port, access_key, secret_key = s3_connection + uri = ( + "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" + .format(access_key, secret_key, host, port) + ) + fs, path = FileSystem.from_uri(uri) + + fs.create_dir("mybucket") + + yield fs, uri, path + + +def test_parquet_writer_filesystem_s3(s3_example_fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, uri, path = s3_example_fs + + with pq.ParquetWriter( + path, table.schema, filesystem=fs, version='2.0' + ) as writer: + writer.write_table(table) + + result = _read_table(uri).to_pandas() + tm.assert_frame_equal(result, df) + + +def test_parquet_writer_filesystem_buffer_raises(): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + filesystem = fs.LocalFileSystem() + + # Should raise ValueError when filesystem is passed with file-like object + with pytest.raises(ValueError) as err_info: + pq.ParquetWriter( + pa.BufferOutputStream(), table.schema, filesystem=filesystem + ) + expected_msg = ("filesystem passed but where is file-like, so" + " there is nothing to open with filesystem.") + assert str(err_info) == expected_msg + + @pytest.mark.pandas @parametrize_legacy_dataset def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset): From 5d344cd1e088ffe635c2c75294ddf97732dfdb9c Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 27 Aug 2020 11:51:45 +0200 Subject: [PATCH 4/8] correctly mark tests --- python/pyarrow/tests/test_parquet.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 99428be7a28..9d91c154e53 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -3511,6 +3511,7 @@ def test_parquet_file_pass_directory_instead_of_file(tempdir): pq.ParquetFile(path) +@pytest.mark.pandas @pytest.mark.parametrize("filesystem", [ None, LocalFileSystem.get_instance(), @@ -3546,6 +3547,8 @@ def s3_example_fs(s3_connection, s3_server): yield fs, uri, path +@pytest.mark.pandas +@pytest.mark.s3 def test_parquet_writer_filesystem_s3(s3_example_fs): df = _test_dataframe(100) table = pa.Table.from_pandas(df, preserve_index=False) @@ -3561,6 +3564,7 @@ def test_parquet_writer_filesystem_s3(s3_example_fs): tm.assert_frame_equal(result, df) +@pytest.mark.pandas def test_parquet_writer_filesystem_buffer_raises(): df = _test_dataframe(100) table = pa.Table.from_pandas(df, preserve_index=False) From e309ce1e550c833ccc307b22b3236c572b734909 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 1 Sep 2020 11:39:43 +0200 Subject: [PATCH 5/8] check filesystem in read_table fallback --- python/pyarrow/parquet.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 0fc5ca16c78..0f8fc51e993 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1595,8 +1595,11 @@ def read_table(source, columns=None, use_threads=True, metadata=None, "the 'partitioning' keyword is not supported when the " "pyarrow.dataset module is not available" ) + from pyarrow.fs import _resolve_filesystem_and_path + filesystem, path = _resolve_filesystem_and_path(source, filesystem) + if filesystem is not None: + source = filesystem.open_output_stream(path) # TODO test that source is not a directory or a list - # TODO check filesystem? dataset = ParquetFile( source, metadata=metadata, read_dictionary=read_dictionary, memory_map=memory_map, buffer_size=buffer_size) From fd27aad2629a78ce069cd8ac5be59a7d15c3c7cd Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 1 Sep 2020 12:11:36 +0200 Subject: [PATCH 6/8] fix output->input --- python/pyarrow/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 0f8fc51e993..3e578a63b84 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1598,7 +1598,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, from pyarrow.fs import _resolve_filesystem_and_path filesystem, path = _resolve_filesystem_and_path(source, filesystem) if filesystem is not None: - source = filesystem.open_output_stream(path) + source = filesystem.open_input_file(path) # TODO test that source is not a directory or a list dataset = ParquetFile( source, metadata=metadata, read_dictionary=read_dictionary, From 5bafd1d9849b301fe0a8b2cbdf3e42e20e2c736b Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 3 Sep 2020 09:51:14 +0200 Subject: [PATCH 7/8] update for feedback --- python/pyarrow/fs.py | 15 +++++++-------- python/pyarrow/parquet.py | 22 ++++++++++------------ python/pyarrow/tests/test_parquet.py | 20 ++++++++++++++++---- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 84687c4d4eb..226b80b8ff0 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -83,15 +83,12 @@ def _ensure_filesystem( return PyFileSystem(FSSpecHandler(filesystem)) # map old filesystems to new ones - from pyarrow.filesystem import ( - FileSystem as LegacyFileSystem, - LocalFileSystem as LegacyLocalFileSystem - ) + import pyarrow.filesystem as legacyfs - if isinstance(filesystem, LegacyLocalFileSystem): + if isinstance(filesystem, legacyfs.LocalFileSystem): return LocalFileSystem(use_mmap=use_mmap) # TODO handle HDFS? - if allow_legacy_filesystem and isinstance(filesystem, LegacyFileSystem): + if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem): return filesystem raise TypeError("Unrecognized filesystem: {}".format(type(filesystem))) @@ -106,8 +103,10 @@ def _resolve_filesystem_and_path( """ if not _is_path_like(path): if filesystem is not None: - raise ValueError("filesystem passed but where is file-like, so" - " there is nothing to open with filesystem.") + raise ValueError( + "'filesystem' passed but the specified path is file-like, so" + " there is nothing to open with 'filesystem'." + ) return filesystem, path path = _stringify_path(path) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 3e578a63b84..3601276f279 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -35,9 +35,9 @@ FileMetaData, RowGroupMetaData, ColumnChunkMetaData, ParquetSchema, ColumnSchema) -from pyarrow.filesystem import ( - FileSystem, LocalFileSystem, _ensure_filesystem, - resolve_filesystem_and_path) +from pyarrow.fs import ( + LocalFileSystem, _resolve_filesystem_and_path, _ensure_filesystem) +from pyarrow import filesystem as legacyfs from pyarrow.util import guid, _is_path_like, _stringify_path _URI_STRIP_SCHEMES = ('hdfs',) @@ -56,9 +56,9 @@ def _parse_uri(path): def _get_filesystem_and_path(passed_filesystem, path): if passed_filesystem is None: - return resolve_filesystem_and_path(path, passed_filesystem) + return legacyfs.resolve_filesystem_and_path(path, passed_filesystem) else: - passed_filesystem = _ensure_filesystem(passed_filesystem) + passed_filesystem = legacyfs._ensure_filesystem(passed_filesystem) parsed_path = _parse_uri(path) return passed_filesystem, parsed_path @@ -542,12 +542,11 @@ def __init__(self, where, schema, filesystem=None, # sure to close it when `self.close` is called. self.file_handle = None - from pyarrow.fs import _resolve_filesystem_and_path filesystem, path = _resolve_filesystem_and_path( where, filesystem, allow_legacy_filesystem=True ) if filesystem is not None: - if isinstance(filesystem, FileSystem): + if isinstance(filesystem, legacyfs.FileSystem): # legacy filesystem (eg custom subclass) # TODO deprecate sink = self.file_handle = filesystem.open(path, 'wb') @@ -1049,7 +1048,8 @@ class _ParquetDatasetMetadata: def _open_dataset_file(dataset, path, meta=None): - if dataset.fs is not None and not isinstance(dataset.fs, LocalFileSystem): + if (dataset.fs is not None + and not isinstance(dataset.fs, legacyfs.LocalFileSystem)): path = dataset.fs.open(path, mode='rb') return ParquetFile( path, @@ -1387,7 +1387,6 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, partitioning="hive", read_dictionary=None, buffer_size=None, memory_map=False, ignore_prefixes=None, **kwargs): import pyarrow.dataset as ds - import pyarrow.fs # Raise error for not supported keywords for keyword, default in [ @@ -1431,12 +1430,12 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, # map old filesystems to new one if filesystem is not None: - filesystem = pyarrow.fs._ensure_filesystem( + filesystem = _ensure_filesystem( filesystem, use_mmap=memory_map) elif filesystem is None and memory_map: # if memory_map is specified, assume local file system (string # path can in principle be URI for any filesystem) - filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True) + filesystem = LocalFileSystem(use_mmap=True) self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, format=parquet_format, @@ -1595,7 +1594,6 @@ def read_table(source, columns=None, use_threads=True, metadata=None, "the 'partitioning' keyword is not supported when the " "pyarrow.dataset module is not available" ) - from pyarrow.fs import _resolve_filesystem_and_path filesystem, path = _resolve_filesystem_and_path(source, filesystem) if filesystem is not None: source = filesystem.open_input_file(path) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 9d91c154e53..49779f718c7 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -3564,6 +3564,21 @@ def test_parquet_writer_filesystem_s3(s3_example_fs): tm.assert_frame_equal(result, df) +@pytest.mark.pandas +@pytest.mark.s3 +def test_parquet_writer_filesystem_s3_uri(s3_example_fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, uri, path = s3_example_fs + + with pq.ParquetWriter(uri, table.schema, version='2.0') as writer: + writer.write_table(table) + + result = _read_table(path, filesystem=fs).to_pandas() + tm.assert_frame_equal(result, df) + + @pytest.mark.pandas def test_parquet_writer_filesystem_buffer_raises(): df = _test_dataframe(100) @@ -3571,13 +3586,10 @@ def test_parquet_writer_filesystem_buffer_raises(): filesystem = fs.LocalFileSystem() # Should raise ValueError when filesystem is passed with file-like object - with pytest.raises(ValueError) as err_info: + with pytest.raises(ValueError, match="specified path is file-like"): pq.ParquetWriter( pa.BufferOutputStream(), table.schema, filesystem=filesystem ) - expected_msg = ("filesystem passed but where is file-like, so" - " there is nothing to open with filesystem.") - assert str(err_info) == expected_msg @pytest.mark.pandas From c619a5b7731b8eef8ab5ef8b0cac99c4fbd9fc98 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 3 Sep 2020 10:48:02 +0200 Subject: [PATCH 8/8] fix linting + comment out segfaulting test --- python/pyarrow/parquet.py | 4 ++-- python/pyarrow/tests/test_parquet.py | 21 +++++++++++---------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 3601276f279..41f861caa1e 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1048,8 +1048,8 @@ class _ParquetDatasetMetadata: def _open_dataset_file(dataset, path, meta=None): - if (dataset.fs is not None - and not isinstance(dataset.fs, legacyfs.LocalFileSystem)): + if (dataset.fs is not None and + not isinstance(dataset.fs, legacyfs.LocalFileSystem)): path = dataset.fs.open(path, mode='rb') return ParquetFile( path, diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 49779f718c7..f654ad94e1b 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -3564,19 +3564,20 @@ def test_parquet_writer_filesystem_s3(s3_example_fs): tm.assert_frame_equal(result, df) -@pytest.mark.pandas -@pytest.mark.s3 -def test_parquet_writer_filesystem_s3_uri(s3_example_fs): - df = _test_dataframe(100) - table = pa.Table.from_pandas(df, preserve_index=False) +# TODO segfaulting (ARROW-9814?) +# @pytest.mark.pandas +# @pytest.mark.s3 +# def test_parquet_writer_filesystem_s3_uri(s3_example_fs): +# df = _test_dataframe(100) +# table = pa.Table.from_pandas(df, preserve_index=False) - fs, uri, path = s3_example_fs +# fs, uri, path = s3_example_fs - with pq.ParquetWriter(uri, table.schema, version='2.0') as writer: - writer.write_table(table) +# with pq.ParquetWriter(uri, table.schema, version='2.0') as writer: +# writer.write_table(table) - result = _read_table(path, filesystem=fs).to_pandas() - tm.assert_frame_equal(result, df) +# result = _read_table(path, filesystem=fs).to_pandas() +# tm.assert_frame_equal(result, df) @pytest.mark.pandas