From 0f90e885d14564e0c248035e26d215bf14747ac3 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Mon, 25 Sep 2023 13:18:57 +0300 Subject: [PATCH 01/12] expose file size to python dataset --- cpp/src/arrow/dataset/file_base.h | 6 ++- cpp/src/arrow/filesystem/filesystem.h | 2 + python/pyarrow/_dataset.pxd | 2 +- python/pyarrow/_dataset.pyx | 17 ++++---- python/pyarrow/_dataset_parquet.pyx | 9 ++-- python/pyarrow/includes/libarrow_dataset.pxd | 1 + python/pyarrow/tests/test_dataset.py | 43 ++++++++++++++++++++ 7 files changed, 68 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 46fc8ebc40d..0cf283a06d6 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -54,7 +54,11 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable { : file_info_(std::move(path)), filesystem_(std::move(filesystem)), compression_(compression) {} - + FileSource(std::string path, int64_t size, std::shared_ptr filesystem, + Compression::type compression = Compression::UNCOMPRESSED) + : file_info_(std::move(path), std::move(size)), + filesystem_(std::move(filesystem)), + compression_(compression) {} FileSource(fs::FileInfo info, std::shared_ptr filesystem, Compression::type compression = Compression::UNCOMPRESSED) : file_info_(std::move(info)), diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12..3f233c74d5a 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -60,6 +60,8 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable { explicit FileInfo(std::string path, FileType type = FileType::Unknown) : path_(std::move(path)), type_(type) {} + explicit FileInfo(std::string path, int64_t size, FileType type = FileType::Unknown) + : path_(std::move(path)), type_(type), size_(size) {} /// The file type FileType type() const { return type_; } diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index 210e5558009..67074961ae9 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -25,7 +25,7 @@ from pyarrow.lib cimport * from pyarrow._fs cimport FileSystem -cdef CFileSource _make_file_source(object file, FileSystem filesystem=*) +cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int size=*) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 48ee6769153..968887f7715 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -96,7 +96,7 @@ def _get_parquet_symbol(name): return _dataset_pq and getattr(_dataset_pq, name) -cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): +cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int size=-1): cdef: CFileSource c_source @@ -108,14 +108,14 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): if isinstance(file, Buffer): c_buffer = pyarrow_unwrap_buffer(file) c_source = CFileSource(move(c_buffer)) - elif _is_path_like(file): if filesystem is None: raise ValueError("cannot construct a FileSource from " "a path without a FileSystem") c_filesystem = filesystem.unwrap() c_path = tobytes(_stringify_path(file)) - c_source = CFileSource(move(c_path), move(c_filesystem)) + c_size = size + c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) elif hasattr(file, 'read'): # Optimistically hope this is file-like @@ -1230,7 +1230,7 @@ cdef class FileFormat(_Weakrefable): The schema inferred from the file """ cdef: - CFileSource c_source = _make_file_source(file, filesystem) + CFileSource c_source = _make_file_source(file, filesystem=filesystem, size=-1) CResult[shared_ptr[CSchema]] c_result with nogil: c_result = self.format.Inspect(c_source) @@ -1238,7 +1238,8 @@ cdef class FileFormat(_Weakrefable): return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, file, filesystem=None, - Expression partition_expression=None): + Expression partition_expression=None, + size=-1): """ Make a FileFragment from a given file. @@ -1252,6 +1253,9 @@ cdef class FileFormat(_Weakrefable): partition_expression : Expression, optional An expression that is guaranteed true for all rows in the fragment. Allows fragment to be potentially skipped while scanning with a filter. + size : int, optional + The size of the file in bytes. Can improve performance with high-latency filesystems + when file size needs to be known before reading. Returns ------- @@ -1260,8 +1264,7 @@ cdef class FileFormat(_Weakrefable): """ if partition_expression is None: partition_expression = _true - - c_source = _make_file_source(file, filesystem) + c_source = _make_file_source(file, filesystem=filesystem, size=size) c_fragment = GetResultValue( self.format.MakeFragment(move(c_source), partition_expression.unwrap(), diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 31aa058706a..432a424f7c5 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat): return f"" def make_fragment(self, file, filesystem=None, - Expression partition_expression=None, row_groups=None): + Expression partition_expression=None, row_groups=None, size=-1): """ Make a FileFragment from a given file. @@ -251,6 +251,9 @@ cdef class ParquetFileFormat(FileFormat): fragment to be potentially skipped while scanning with a filter. row_groups : Iterable, optional The indices of the row groups to include + size : int, optional + The size of the file in bytes. Can improve performance with high-latency filesystems + when file size needs to be known before reading. Returns ------- @@ -265,9 +268,9 @@ cdef class ParquetFileFormat(FileFormat): if row_groups is None: return super().make_fragment(file, filesystem, - partition_expression) + partition_expression, size) - c_source = _make_file_source(file, filesystem) + c_source = _make_file_source(file, filesystem, size=size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue( diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 8901d763e39..063dc2f6d7a 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const c_string& path() const const shared_ptr[CFileSystem]& filesystem() const const shared_ptr[CBuffer]& buffer() const + const int size() const # HACK: Cython can't handle all the overloads so don't declare them. # This means invalid construction of CFileSource won't be caught in # the C++ generation phase (though it will still be caught when diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6f3b54b0cd6..b905783aae8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -981,6 +981,49 @@ def test_make_fragment(multisourcefs): assert row_group_fragment.row_groups == [0] +@pytest.mark.parquet +def test_make_fragment_with_size(s3_example_simple): + table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple + + file_format = ds.ParquetFileFormat() + paths = [path] + + fragments = [file_format.make_fragment(path, fs) + for path in paths] + + dataset = ds.FileSystemDataset( + fragments, format=file_format, schema=table.schema, + filesystem = fs + ) + + tbl = dataset.to_table() + assert tbl.equals(table) + + sizes_toosmall = [1] + fragments_with_size = [file_format.make_fragment(path, fs, size=size) + for path, size in zip(paths, sizes_toosmall)] + + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, + filesystem = fs + ) + + with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'): + table = dataset_with_size.to_table() + + sizes_toolarge = [1000000] + fragments_with_size = [file_format.make_fragment(path, fs, size=size) + for path, size in zip(paths, sizes_toolarge)] + + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, + filesystem = fs + ) + + # invalid range + with pytest.raises(OSError, match='HTTP status 416'): + table = dataset_with_size.to_table() + def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module): content = textwrap.dedent(""" alpha,num,animal From 9d4d96fefc60456e07b0ea56ba27872b4a86cffa Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Tue, 26 Sep 2023 19:04:35 +0300 Subject: [PATCH 02/12] only pass non-default size --- python/pyarrow/_dataset.pyx | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 968887f7715..6f7f0425b93 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -114,9 +114,13 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int "a path without a FileSystem") c_filesystem = filesystem.unwrap() c_path = tobytes(_stringify_path(file)) - c_size = size - c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) + if size >= 0: + c_size = size + c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) + else: + c_size = size + c_source = CFileSource(move(c_path), move(c_filesystem)) elif hasattr(file, 'read'): # Optimistically hope this is file-like c_file = get_native_file(file, False).get_random_access_file() From f194bfa841b4885aed80e9bf2b533c5046bc99a0 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 27 Sep 2023 19:45:49 +0300 Subject: [PATCH 03/12] use None as default --- python/pyarrow/_dataset.pxd | 2 +- python/pyarrow/_dataset.pyx | 14 +++++++++----- python/pyarrow/_dataset_parquet.pyx | 10 ++++++---- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index 67074961ae9..6ffe230c5b4 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -25,7 +25,7 @@ from pyarrow.lib cimport * from pyarrow._fs cimport FileSystem -cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int size=*) +cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int64_t size=*) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 6f7f0425b93..146c4b45ad7 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -96,7 +96,7 @@ def _get_parquet_symbol(name): return _dataset_pq and getattr(_dataset_pq, name) -cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int size=-1): +cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int64_t size=-1): cdef: CFileSource c_source @@ -119,7 +119,6 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int c_size = size c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) else: - c_size = size c_source = CFileSource(move(c_path), move(c_filesystem)) elif hasattr(file, 'read'): # Optimistically hope this is file-like @@ -1234,7 +1233,7 @@ cdef class FileFormat(_Weakrefable): The schema inferred from the file """ cdef: - CFileSource c_source = _make_file_source(file, filesystem=filesystem, size=-1) + CFileSource c_source = _make_file_source(file, filesystem=filesystem) CResult[shared_ptr[CSchema]] c_result with nogil: c_result = self.format.Inspect(c_source) @@ -1243,7 +1242,7 @@ cdef class FileFormat(_Weakrefable): def make_fragment(self, file, filesystem=None, Expression partition_expression=None, - size=-1): + size=None): """ Make a FileFragment from a given file. @@ -1266,9 +1265,14 @@ cdef class FileFormat(_Weakrefable): fragment : Fragment The file fragment """ + cdef: + # default value, will not be passed to constructor + int64_t c_size = -1 if partition_expression is None: partition_expression = _true - c_source = _make_file_source(file, filesystem=filesystem, size=size) + if size is not None: + c_size = size + c_source = _make_file_source(file, filesystem=filesystem, size=c_size) c_fragment = GetResultValue( self.format.MakeFragment(move(c_source), partition_expression.unwrap(), diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 432a424f7c5..8951c9b7550 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat): return f"" def make_fragment(self, file, filesystem=None, - Expression partition_expression=None, row_groups=None, size=-1): + Expression partition_expression=None, row_groups=None, size=None): """ Make a FileFragment from a given file. @@ -262,15 +262,17 @@ cdef class ParquetFileFormat(FileFormat): """ cdef: vector[int] c_row_groups - + # default value, will not be passed to constructor + int64_t c_size = -1 if partition_expression is None: partition_expression = _true - + if size is not None: + c_size = size if row_groups is None: return super().make_fragment(file, filesystem, partition_expression, size) - c_source = _make_file_source(file, filesystem, size=size) + c_source = _make_file_source(file, filesystem, size=c_size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue( From dc14c75632fc4ebcdc6245ec35fdabc632d4080f Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 27 Sep 2023 19:51:55 +0300 Subject: [PATCH 04/12] formatting --- python/pyarrow/tests/test_dataset.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b905783aae8..cd9a6468cce 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -982,18 +982,18 @@ def test_make_fragment(multisourcefs): @pytest.mark.parquet +@pytest.mark.s3 def test_make_fragment_with_size(s3_example_simple): table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple file_format = ds.ParquetFileFormat() paths = [path] - + fragments = [file_format.make_fragment(path, fs) for path in paths] - + dataset = ds.FileSystemDataset( - fragments, format=file_format, schema=table.schema, - filesystem = fs + fragments, format=file_format, schema=table.schema, filesystem=fs ) tbl = dataset.to_table() @@ -1001,29 +1001,28 @@ def test_make_fragment_with_size(s3_example_simple): sizes_toosmall = [1] fragments_with_size = [file_format.make_fragment(path, fs, size=size) - for path, size in zip(paths, sizes_toosmall)] - + for path, size in zip(paths, sizes_toosmall)] + dataset_with_size = ds.FileSystemDataset( - fragments_with_size, format=file_format, schema=table.schema, - filesystem = fs + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs ) with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'): table = dataset_with_size.to_table() - + sizes_toolarge = [1000000] fragments_with_size = [file_format.make_fragment(path, fs, size=size) - for path, size in zip(paths, sizes_toolarge)] - + for path, size in zip(paths, sizes_toolarge)] + dataset_with_size = ds.FileSystemDataset( - fragments_with_size, format=file_format, schema=table.schema, - filesystem = fs + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs ) # invalid range with pytest.raises(OSError, match='HTTP status 416'): table = dataset_with_size.to_table() + def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module): content = textwrap.dedent(""" alpha,num,animal From eab300028002bf87a8b38445e1e8abdcb2fd70e5 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 27 Sep 2023 20:03:24 +0300 Subject: [PATCH 05/12] change int type --- python/pyarrow/includes/libarrow_dataset.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 063dc2f6d7a..4566cb5004a 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -178,7 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const c_string& path() const const shared_ptr[CFileSystem]& filesystem() const const shared_ptr[CBuffer]& buffer() const - const int size() const + const int64_t size() const # HACK: Cython can't handle all the overloads so don't declare them. # This means invalid construction of CFileSource won't be caught in # the C++ generation phase (though it will still be caught when From 0e748f707742586f86e29085d376653f8fd79c63 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Thu, 26 Oct 2023 08:28:53 +0300 Subject: [PATCH 06/12] make size kw-only arg --- python/pyarrow/_dataset.pyx | 2 +- python/pyarrow/_dataset_parquet.pyx | 4 ++-- python/some_path | 0 3 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 python/some_path diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 146c4b45ad7..f5f064eef61 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1242,7 +1242,7 @@ cdef class FileFormat(_Weakrefable): def make_fragment(self, file, filesystem=None, Expression partition_expression=None, - size=None): + *, size=None): """ Make a FileFragment from a given file. diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 8951c9b7550..10f0ef33906 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat): return f"" def make_fragment(self, file, filesystem=None, - Expression partition_expression=None, row_groups=None, size=None): + Expression partition_expression=None, row_groups=None, *, size=None): """ Make a FileFragment from a given file. @@ -270,7 +270,7 @@ cdef class ParquetFileFormat(FileFormat): c_size = size if row_groups is None: return super().make_fragment(file, filesystem, - partition_expression, size) + partition_expression, size=size) c_source = _make_file_source(file, filesystem, size=c_size) c_row_groups = [ row_group for row_group in set(row_groups)] diff --git a/python/some_path b/python/some_path new file mode 100644 index 00000000000..e69de29bb2d From 167ea77e04f91f9ab439ae69638889f256b9dbe1 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Sat, 28 Oct 2023 14:45:48 +0300 Subject: [PATCH 07/12] rename size -> file_size --- python/pyarrow/_dataset.pxd | 2 +- python/pyarrow/_dataset.pyx | 16 ++++++++-------- python/pyarrow/_dataset_parquet.pyx | 12 ++++++------ python/pyarrow/tests/test_dataset.py | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index 6ffe230c5b4..71b2a58c35a 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -25,7 +25,7 @@ from pyarrow.lib cimport * from pyarrow._fs cimport FileSystem -cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int64_t size=*) +cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int64_t file_size=*) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index f5f064eef61..445d86d8538 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -96,7 +96,7 @@ def _get_parquet_symbol(name): return _dataset_pq and getattr(_dataset_pq, name) -cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int64_t size=-1): +cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int64_t file_size=-1): cdef: CFileSource c_source @@ -115,8 +115,8 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int6 c_filesystem = filesystem.unwrap() c_path = tobytes(_stringify_path(file)) - if size >= 0: - c_size = size + if file_size >= 0: + c_size = file_size c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) else: c_source = CFileSource(move(c_path), move(c_filesystem)) @@ -1242,7 +1242,7 @@ cdef class FileFormat(_Weakrefable): def make_fragment(self, file, filesystem=None, Expression partition_expression=None, - *, size=None): + *, file_size=None): """ Make a FileFragment from a given file. @@ -1256,7 +1256,7 @@ cdef class FileFormat(_Weakrefable): partition_expression : Expression, optional An expression that is guaranteed true for all rows in the fragment. Allows fragment to be potentially skipped while scanning with a filter. - size : int, optional + file_size : int, optional The size of the file in bytes. Can improve performance with high-latency filesystems when file size needs to be known before reading. @@ -1270,9 +1270,9 @@ cdef class FileFormat(_Weakrefable): int64_t c_size = -1 if partition_expression is None: partition_expression = _true - if size is not None: - c_size = size - c_source = _make_file_source(file, filesystem=filesystem, size=c_size) + if file_size is not None: + c_size = file_size + c_source = _make_file_source(file, filesystem=filesystem, file_size=c_size) c_fragment = GetResultValue( self.format.MakeFragment(move(c_source), partition_expression.unwrap(), diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 10f0ef33906..d34dc8f2840 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat): return f"" def make_fragment(self, file, filesystem=None, - Expression partition_expression=None, row_groups=None, *, size=None): + Expression partition_expression=None, row_groups=None, *, file_size=None): """ Make a FileFragment from a given file. @@ -251,7 +251,7 @@ cdef class ParquetFileFormat(FileFormat): fragment to be potentially skipped while scanning with a filter. row_groups : Iterable, optional The indices of the row groups to include - size : int, optional + file_size : int, optional The size of the file in bytes. Can improve performance with high-latency filesystems when file size needs to be known before reading. @@ -266,13 +266,13 @@ cdef class ParquetFileFormat(FileFormat): int64_t c_size = -1 if partition_expression is None: partition_expression = _true - if size is not None: - c_size = size + if file_size is not None: + c_size = file_size if row_groups is None: return super().make_fragment(file, filesystem, - partition_expression, size=size) + partition_expression, file_size=file_size) - c_source = _make_file_source(file, filesystem, size=c_size) + c_source = _make_file_source(file, filesystem, file_size=c_size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index cd9a6468cce..07474dadd76 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1000,7 +1000,7 @@ def test_make_fragment_with_size(s3_example_simple): assert tbl.equals(table) sizes_toosmall = [1] - fragments_with_size = [file_format.make_fragment(path, fs, size=size) + fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) for path, size in zip(paths, sizes_toosmall)] dataset_with_size = ds.FileSystemDataset( @@ -1011,7 +1011,7 @@ def test_make_fragment_with_size(s3_example_simple): table = dataset_with_size.to_table() sizes_toolarge = [1000000] - fragments_with_size = [file_format.make_fragment(path, fs, size=size) + fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) for path, size in zip(paths, sizes_toolarge)] dataset_with_size = ds.FileSystemDataset( From 8278fa5b38782c9be76ff01e5908a6bff4002246 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Sat, 28 Oct 2023 15:23:19 +0300 Subject: [PATCH 08/12] remove redundant constructor --- cpp/src/arrow/dataset/file_base.h | 5 ----- cpp/src/arrow/filesystem/filesystem.h | 3 --- python/pyarrow/_dataset.pxd | 3 +-- python/pyarrow/_dataset.pyx | 7 +++++-- python/pyarrow/tests/test_dataset.py | 17 ++++++++++++++--- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 0cf283a06d6..2a17cbf8f88 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -54,11 +54,6 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable { : file_info_(std::move(path)), filesystem_(std::move(filesystem)), compression_(compression) {} - FileSource(std::string path, int64_t size, std::shared_ptr filesystem, - Compression::type compression = Compression::UNCOMPRESSED) - : file_info_(std::move(path), std::move(size)), - filesystem_(std::move(filesystem)), - compression_(compression) {} FileSource(fs::FileInfo info, std::shared_ptr filesystem, Compression::type compression = Compression::UNCOMPRESSED) : file_info_(std::move(info)), diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 3f233c74d5a..baf05cadbb2 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -60,9 +60,6 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable { explicit FileInfo(std::string path, FileType type = FileType::Unknown) : path_(std::move(path)), type_(type) {} - explicit FileInfo(std::string path, int64_t size, FileType type = FileType::Unknown) - : path_(std::move(path)), type_(type), size_(size) {} - /// The file type FileType type() const { return type_; } void set_type(FileType type) { type_ = type; } diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index 71b2a58c35a..b9e3aac68ee 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -22,12 +22,11 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.lib cimport * -from pyarrow._fs cimport FileSystem +from pyarrow._fs cimport FileSystem, FileInfo cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int64_t file_size=*) - cdef class DatasetFactory(_Weakrefable): cdef: diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 445d86d8538..b99197cb373 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -32,7 +32,7 @@ from pyarrow.includes.libarrow_dataset cimport * from pyarrow._acero cimport ExecNodeOptions from pyarrow._compute cimport Expression, _bind from pyarrow._compute import _forbid_instantiation -from pyarrow._fs cimport FileSystem, FileSelector +from pyarrow._fs cimport FileSystem, FileSelector, FileInfo from pyarrow._csv cimport ( ConvertOptions, ParseOptions, ReadOptions, WriteOptions) from pyarrow.util import _is_iterable, _is_path_like, _stringify_path @@ -101,6 +101,7 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int6 cdef: CFileSource c_source shared_ptr[CFileSystem] c_filesystem + CFileInfo c_info c_string c_path shared_ptr[CRandomAccessFile] c_file shared_ptr[CBuffer] c_buffer @@ -117,7 +118,9 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int6 if file_size >= 0: c_size = file_size - c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) + info = FileInfo(c_path, size=c_size) + c_info = info.unwrap() + c_source = CFileSource(move(c_info), move(c_filesystem)) else: c_source = CFileSource(move(c_path), move(c_filesystem)) elif hasattr(file, 'read'): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 07474dadd76..55e3d744271 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -991,7 +991,6 @@ def test_make_fragment_with_size(s3_example_simple): fragments = [file_format.make_fragment(path, fs) for path in paths] - dataset = ds.FileSystemDataset( fragments, format=file_format, schema=table.schema, filesystem=fs ) @@ -999,7 +998,18 @@ def test_make_fragment_with_size(s3_example_simple): tbl = dataset.to_table() assert tbl.equals(table) - sizes_toosmall = [1] + # true sizes -> works + sizes_true = [dataset.filesystem.get_file_info(x).size for x in dataset.files] + fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) + for path, size in zip(paths, sizes_true)] + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs + ) + tbl = dataset.to_table() + assert tbl.equals(table) + + # too small sizes -> error + sizes_toosmall = [1 for path in paths] fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) for path, size in zip(paths, sizes_toosmall)] @@ -1010,7 +1020,8 @@ def test_make_fragment_with_size(s3_example_simple): with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'): table = dataset_with_size.to_table() - sizes_toolarge = [1000000] + # too large sizes -> error + sizes_toolarge = [1000000 for path in paths] fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) for path, size in zip(paths, sizes_toolarge)] From 3441edf1868fc848359a5a96dec9de5c0b31932d Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 15 Nov 2023 21:30:05 +0200 Subject: [PATCH 09/12] fix formatting --- python/pyarrow/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 55e3d744271..3c56947da3b 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1003,7 +1003,7 @@ def test_make_fragment_with_size(s3_example_simple): fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) for path, size in zip(paths, sizes_true)] dataset_with_size = ds.FileSystemDataset( - fragments_with_size, format=file_format, schema=table.schema, filesystem=fs + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs ) tbl = dataset.to_table() assert tbl.equals(table) From 771b3b8deebcf65401119e5c72299e3ca0d26de7 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Wed, 15 Nov 2023 21:49:34 +0200 Subject: [PATCH 10/12] remove some_path file --- python/some_path | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 python/some_path diff --git a/python/some_path b/python/some_path deleted file mode 100644 index e69de29bb2d..00000000000 From 941d10a6e214a4e0092c8f237a91a5a40569ae38 Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Fri, 24 Nov 2023 07:20:54 +0200 Subject: [PATCH 11/12] revisions --- cpp/src/arrow/dataset/file_base.h | 1 + cpp/src/arrow/filesystem/filesystem.h | 1 + python/pyarrow/tests/test_dataset.py | 5 +++++ 3 files changed, 7 insertions(+) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 2a17cbf8f88..46fc8ebc40d 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -54,6 +54,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable { : file_info_(std::move(path)), filesystem_(std::move(filesystem)), compression_(compression) {} + FileSource(fs::FileInfo info, std::shared_ptr filesystem, Compression::type compression = Compression::UNCOMPRESSED) : file_info_(std::move(info)), diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index baf05cadbb2..559f1335f12 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -60,6 +60,7 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable { explicit FileInfo(std::string path, FileType type = FileType::Unknown) : path_(std::move(path)), type_(type) {} + /// The file type FileType type() const { return type_; } void set_type(FileType type) { type_ = type; } diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3c56947da3b..7bfc623d803 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -984,6 +984,11 @@ def test_make_fragment(multisourcefs): @pytest.mark.parquet @pytest.mark.s3 def test_make_fragment_with_size(s3_example_simple): + """ + Test passing file_size to make_fragment. Not all FS implementations make use + of the file size (by implementing an OpenInputFile that takes a FileInfo), but + s3 does, which is why it's used here. + """ table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple file_format = ds.ParquetFileFormat() From 056c96d1b13cf47df512fa5e185ddb2da9a2f271 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 28 Nov 2023 10:56:35 -0500 Subject: [PATCH 12/12] suggested args --- python/pyarrow/_dataset.pxd | 2 +- python/pyarrow/_dataset.pyx | 17 ++++++----------- python/pyarrow/_dataset_parquet.pyx | 6 +----- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index b9e3aac68ee..15932475831 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -25,7 +25,7 @@ from pyarrow.lib cimport * from pyarrow._fs cimport FileSystem, FileInfo -cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int64_t file_size=*) +cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, object file_size=*) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index b99197cb373..f02398be663 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -96,7 +96,7 @@ def _get_parquet_symbol(name): return _dataset_pq and getattr(_dataset_pq, name) -cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int64_t file_size=-1): +cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, object file_size=None): cdef: CFileSource c_source @@ -105,6 +105,7 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int6 c_string c_path shared_ptr[CRandomAccessFile] c_file shared_ptr[CBuffer] c_buffer + int64_t c_size if isinstance(file, Buffer): c_buffer = pyarrow_unwrap_buffer(file) @@ -116,10 +117,9 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int6 c_filesystem = filesystem.unwrap() c_path = tobytes(_stringify_path(file)) - if file_size >= 0: + if file_size is not None: c_size = file_size - info = FileInfo(c_path, size=c_size) - c_info = info.unwrap() + c_info = FileInfo(c_path, size=c_size).unwrap() c_source = CFileSource(move(c_info), move(c_filesystem)) else: c_source = CFileSource(move(c_path), move(c_filesystem)) @@ -1236,7 +1236,7 @@ cdef class FileFormat(_Weakrefable): The schema inferred from the file """ cdef: - CFileSource c_source = _make_file_source(file, filesystem=filesystem) + CFileSource c_source = _make_file_source(file, filesystem, file_size=None) CResult[shared_ptr[CSchema]] c_result with nogil: c_result = self.format.Inspect(c_source) @@ -1268,14 +1268,9 @@ cdef class FileFormat(_Weakrefable): fragment : Fragment The file fragment """ - cdef: - # default value, will not be passed to constructor - int64_t c_size = -1 if partition_expression is None: partition_expression = _true - if file_size is not None: - c_size = file_size - c_source = _make_file_source(file, filesystem=filesystem, file_size=c_size) + c_source = _make_file_source(file, filesystem, file_size) c_fragment = GetResultValue( self.format.MakeFragment(move(c_source), partition_expression.unwrap(), diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index d34dc8f2840..0fdc333eca5 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -262,17 +262,13 @@ cdef class ParquetFileFormat(FileFormat): """ cdef: vector[int] c_row_groups - # default value, will not be passed to constructor - int64_t c_size = -1 if partition_expression is None: partition_expression = _true - if file_size is not None: - c_size = file_size if row_groups is None: return super().make_fragment(file, filesystem, partition_expression, file_size=file_size) - c_source = _make_file_source(file, filesystem, file_size=c_size) + c_source = _make_file_source(file, filesystem, file_size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue(