From da63ce17895503948ebaf7bd4c537d5c30534e63 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 19 Feb 2024 18:05:45 +0100 Subject: [PATCH 1/4] Allow FileInfo instances to be passed to dataset init --- python/pyarrow/_dataset.pyx | 34 +++++++++++++++----- python/pyarrow/dataset.py | 16 +++++++-- python/pyarrow/includes/libarrow_dataset.pxd | 8 +++++ python/pyarrow/tests/conftest.py | 6 ++-- python/pyarrow/tests/test_dataset.py | 19 +++++++++++ 5 files changed, 71 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index b93f71969e8..679adfe639b 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3139,6 +3139,13 @@ cdef class FileSystemFactoryOptions(_Weakrefable): self.options.selector_ignore_prefixes = [tobytes(v) for v in values] +cdef vector[CFileInfo] unwrap_finfos(finfos): + cdef vector[CFileInfo] o_vect + for fi in finfos: + o_vect.push_back(( fi).unwrap()) + return o_vect + + cdef class FileSystemDatasetFactory(DatasetFactory): """ Create a DatasetFactory from a list of paths with schema inspection. @@ -3163,6 +3170,7 @@ cdef class FileSystemDatasetFactory(DatasetFactory): FileSystemFactoryOptions options=None): cdef: vector[c_string] paths + vector[CFileInfo] finfos CFileSelector c_selector CResult[shared_ptr[CDatasetFactory]] result shared_ptr[CFileSystem] c_filesystem @@ -3184,14 +3192,24 @@ cdef class FileSystemDatasetFactory(DatasetFactory): c_options ) elif isinstance(paths_or_selector, (list, tuple)): - paths = [tobytes(s) for s in paths_or_selector] - with nogil: - result = CFileSystemDatasetFactory.MakeFromPaths( - c_filesystem, - paths, - c_format, - c_options - ) + if isinstance(paths_or_selector[0], FileInfo): + finfos = unwrap_finfos(paths_or_selector) + with nogil: + result = CFileSystemDatasetFactory.MakeFromFileInfos( + c_filesystem, + finfos, + c_format, + c_options + ) + else: + paths = [tobytes(s) for s in paths_or_selector] + with nogil: + result = CFileSystemDatasetFactory.MakeFromPaths( + c_filesystem, + paths, + c_format, + c_options + ) else: raise TypeError('Must pass either paths or a FileSelector, but ' 'passed {}'.format(type(paths_or_selector))) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index f83753ac57d..0fc9780d272 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -456,11 +456,22 @@ def _filesystem_dataset(source, schema=None, filesystem=None, ------- FileSystemDataset """ + from pyarrow.fs import LocalFileSystem, _ensure_filesystem, FileInfo + format = _ensure_format(format or 'parquet') partitioning = _ensure_partitioning(partitioning) if isinstance(source, (list, tuple)): - fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) + if isinstance(source[0], FileInfo): + if filesystem is None: + # fall back to local file system as the default + fs = LocalFileSystem() + else: + # construct a filesystem if it is a valid URI + fs = _ensure_filesystem(filesystem) + paths_or_selector = source + else: + fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) else: fs, paths_or_selector = _ensure_single_source(source, filesystem) @@ -767,6 +778,7 @@ def dataset(source, schema=None, format=None, filesystem=None, ... dataset("local/path/to/data", format="ipc") ... ]) # doctest: +SKIP """ + from pyarrow.fs import FileInfo # collect the keyword arguments for later reuse kwargs = dict( schema=schema, @@ -781,7 +793,7 @@ def dataset(source, schema=None, format=None, filesystem=None, if _is_path_like(source): return _filesystem_dataset(source, **kwargs) elif isinstance(source, (tuple, list)): - if all(_is_path_like(elem) for elem in source): + if all(_is_path_like(elem) or isinstance(elem, FileInfo) for elem in source): return _filesystem_dataset(source, **kwargs) elif all(isinstance(elem, Dataset) for elem in source): return _union_dataset(source, **kwargs) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 4566cb5004a..fe96705a54b 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -403,3 +403,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CFileFormat] format, CFileSystemFactoryOptions options ) + + @staticmethod + CResult[shared_ptr[CDatasetFactory]] MakeFromFileInfos "Make"( + shared_ptr[CFileSystem] filesystem, + vector[CFileInfo] files, + shared_ptr[CFileFormat] format, + CFileSystemFactoryOptions options + ) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 0da757a4bc5..b3c96cb32b9 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -204,7 +204,8 @@ def minio_server_health_check(address): env = os.environ.copy() env.update({ 'MINIO_ACCESS_KEY': access_key, - 'MINIO_SECRET_KEY': secret_key + 'MINIO_SECRET_KEY': secret_key, + 'MINIO_PROMETHEUS_AUTH_TYPE': 'public', }) args = ['minio', '--compat', 'server', '--quiet', '--address', @@ -221,7 +222,8 @@ def minio_server_health_check(address): yield { 'connection': s3_connection, 'process': proc, - 'tempdir': tmpdir + 'tempdir': tmpdir, + 'address': address, } finally: if proc is not None: diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a9054f0b174..b70de9f6174 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2725,6 +2725,25 @@ def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader): assert dataset_reader.to_table(dataset).equals(table) +@pytest.mark.parquet +@pytest.mark.s3 +def test_dataset_from_fileinfos(s3_example_simple, dataset_reader, s3_server): + table, path, filesystem, uri, _, _, _, _ = s3_example_simple + + selector = fs.FileSelector("mybucket") + finfos = filesystem.get_file_info(selector) + dataset = ds.dataset(finfos, format="parquet", filesystem=filesystem) + assert dataset_reader.to_table(dataset).equals(table) + import urllib + + resp = urllib.request.urlopen( + f"http://{s3_server['address']}/minio/v2/metrics/cluster" + ) + assert resp.status == 200 + # minio_s3_requests_total{api="headobject",...} 2 + assert "headobject" not in resp.read().decode("utf-8") + + @pytest.mark.parquet @pytest.mark.s3 # still needed to create the data def test_open_dataset_from_uri_s3_fsspec(s3_example_simple): From dbdc9e2db927108be824c198eedcbe0f5e8f6b0d Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 20 Feb 2024 10:42:31 +0100 Subject: [PATCH 2/4] handle empty list --- python/pyarrow/_dataset.pyx | 2 +- python/pyarrow/dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 679adfe639b..8b9e62d6288 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3192,7 +3192,7 @@ cdef class FileSystemDatasetFactory(DatasetFactory): c_options ) elif isinstance(paths_or_selector, (list, tuple)): - if isinstance(paths_or_selector[0], FileInfo): + if len(paths_or_selector) > 0 and isinstance(paths_or_selector[0], FileInfo): finfos = unwrap_finfos(paths_or_selector) with nogil: result = CFileSystemDatasetFactory.MakeFromFileInfos( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 0fc9780d272..1efbfe1665a 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -462,7 +462,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, partitioning = _ensure_partitioning(partitioning) if isinstance(source, (list, tuple)): - if isinstance(source[0], FileInfo): + if source and isinstance(source[0], FileInfo): if filesystem is None: # fall back to local file system as the default fs = LocalFileSystem() From ac216d5319f5344554488c659ef8e76e347fc656 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 20 Feb 2024 14:41:29 +0100 Subject: [PATCH 3/4] revert prometheus check --- python/pyarrow/tests/conftest.py | 4 +--- python/pyarrow/tests/test_dataset.py | 11 +---------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index b3c96cb32b9..2cac6506d74 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -204,8 +204,7 @@ def minio_server_health_check(address): env = os.environ.copy() env.update({ 'MINIO_ACCESS_KEY': access_key, - 'MINIO_SECRET_KEY': secret_key, - 'MINIO_PROMETHEUS_AUTH_TYPE': 'public', + 'MINIO_SECRET_KEY': secret_key }) args = ['minio', '--compat', 'server', '--quiet', '--address', @@ -223,7 +222,6 @@ def minio_server_health_check(address): 'connection': s3_connection, 'process': proc, 'tempdir': tmpdir, - 'address': address, } finally: if proc is not None: diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b70de9f6174..de429e5cfd8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2727,21 +2727,12 @@ def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader): @pytest.mark.parquet @pytest.mark.s3 -def test_dataset_from_fileinfos(s3_example_simple, dataset_reader, s3_server): +def test_dataset_from_fileinfos(s3_example_simple, dataset_reader): table, path, filesystem, uri, _, _, _, _ = s3_example_simple - selector = fs.FileSelector("mybucket") finfos = filesystem.get_file_info(selector) dataset = ds.dataset(finfos, format="parquet", filesystem=filesystem) assert dataset_reader.to_table(dataset).equals(table) - import urllib - - resp = urllib.request.urlopen( - f"http://{s3_server['address']}/minio/v2/metrics/cluster" - ) - assert resp.status == 200 - # minio_s3_requests_total{api="headobject",...} 2 - assert "headobject" not in resp.read().decode("utf-8") @pytest.mark.parquet From 373b9cb07d2ab56d41087d2873180f6a0d598e6c Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 27 Feb 2024 13:57:53 +0100 Subject: [PATCH 4/4] Apply suggestions from code review --- python/pyarrow/tests/conftest.py | 2 +- python/pyarrow/tests/test_dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 2cac6506d74..0da757a4bc5 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -221,7 +221,7 @@ def minio_server_health_check(address): yield { 'connection': s3_connection, 'process': proc, - 'tempdir': tmpdir, + 'tempdir': tmpdir } finally: if proc is not None: diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index de429e5cfd8..8e203903858 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2727,7 +2727,7 @@ def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader): @pytest.mark.parquet @pytest.mark.s3 -def test_dataset_from_fileinfos(s3_example_simple, dataset_reader): +def test_open_dataset_from_fileinfos(s3_example_simple, dataset_reader): table, path, filesystem, uri, _, _, _, _ = s3_example_simple selector = fs.FileSelector("mybucket") finfos = filesystem.get_file_info(selector)