From 51e6d6727aca4f123616f5d591c3f97ba050cf3e Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Fri, 26 Jan 2024 16:59:35 +0100 Subject: [PATCH 01/21] Remove old docs --- docs/source/python/filesystems_deprecated.rst | 88 ------------------- 1 file changed, 88 deletions(-) delete mode 100644 docs/source/python/filesystems_deprecated.rst diff --git a/docs/source/python/filesystems_deprecated.rst b/docs/source/python/filesystems_deprecated.rst deleted file mode 100644 index c51245341b4..00000000000 --- a/docs/source/python/filesystems_deprecated.rst +++ /dev/null @@ -1,88 +0,0 @@ -.. Licensed to the Apache Software Foundation (ASF) under one -.. or more contributor license agreements. See the NOTICE file -.. distributed with this work for additional information -.. regarding copyright ownership. The ASF licenses this file -.. to you under the Apache License, Version 2.0 (the -.. "License"); you may not use this file except in compliance -.. with the License. You may obtain a copy of the License at - -.. http://www.apache.org/licenses/LICENSE-2.0 - -.. Unless required by applicable law or agreed to in writing, -.. software distributed under the License is distributed on an -.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -.. KIND, either express or implied. See the License for the -.. specific language governing permissions and limitations -.. under the License. - -Filesystem Interface (legacy) -============================= - -.. warning:: - This section documents the deprecated filesystem layer. You should - use the :ref:`new filesystem layer ` instead. - -.. _hdfs: - -Hadoop File System (HDFS) -------------------------- - -PyArrow comes with bindings to a C++-based interface to the Hadoop File -System. You connect like so: - -.. code-block:: python - - import pyarrow as pa - fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path) - with fs.open(path, 'rb') as f: - # Do something with f - -By default, ``pyarrow.hdfs.HadoopFileSystem`` uses libhdfs, a JNI-based -interface to the Java Hadoop client. This library is loaded **at runtime** -(rather than at link / library load time, since the library may not be in your -LD_LIBRARY_PATH), and relies on some environment variables. - -* ``HADOOP_HOME``: the root of your installed Hadoop distribution. Often has - `lib/native/libhdfs.so`. - -* ``JAVA_HOME``: the location of your Java SDK installation. - -* ``ARROW_LIBHDFS_DIR`` (optional): explicit location of ``libhdfs.so`` if it is - installed somewhere other than ``$HADOOP_HOME/lib/native``. - -* ``CLASSPATH``: must contain the Hadoop jars. You can set these using: - -.. code-block:: shell - - export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` - -If ``CLASSPATH`` is not set, then it will be set automatically if the -``hadoop`` executable is in your system path, or if ``HADOOP_HOME`` is set. - -HDFS API -~~~~~~~~ - -.. currentmodule:: pyarrow - -.. autosummary:: - :toctree: generated/ - - hdfs.connect - HadoopFileSystem.cat - HadoopFileSystem.chmod - HadoopFileSystem.chown - HadoopFileSystem.delete - HadoopFileSystem.df - HadoopFileSystem.disk_usage - HadoopFileSystem.download - HadoopFileSystem.exists - HadoopFileSystem.get_capacity - HadoopFileSystem.get_space_used - HadoopFileSystem.info - HadoopFileSystem.ls - HadoopFileSystem.mkdir - HadoopFileSystem.open - HadoopFileSystem.rename - HadoopFileSystem.rm - HadoopFileSystem.upload - HdfsFile From e25bf11e9918cdc3171ed6d27400da9b8ed80f41 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Fri, 26 Jan 2024 17:23:00 +0100 Subject: [PATCH 02/21] Remove hdfs python and hdfsio code --- python/CMakeLists.txt | 1 - python/pyarrow/__init__.py | 1 - python/pyarrow/_hdfsio.pyx | 478 ------------------------------ python/pyarrow/hdfs.py | 240 --------------- python/pyarrow/tests/test_hdfs.py | 451 ---------------------------- python/setup.py | 1 - 6 files changed, 1172 deletions(-) delete mode 100644 python/pyarrow/_hdfsio.pyx delete mode 100644 python/pyarrow/hdfs.py delete mode 100644 python/pyarrow/tests/test_hdfs.py diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 1d6524373a7..c3a1c578689 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -545,7 +545,6 @@ set(CYTHON_EXTENSIONS _csv _feather _fs - _hdfsio _json _pyarrow_cpp_tests) set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 2ee97ddb662..0096547b946 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -257,7 +257,6 @@ def print_entry(label, value): create_memory_map, MockOutputStream, input_stream, output_stream) -from pyarrow._hdfsio import HdfsFile, have_libhdfs from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, concat_arrays, concat_tables, TableGroupBy, diff --git a/python/pyarrow/_hdfsio.pyx b/python/pyarrow/_hdfsio.pyx deleted file mode 100644 index cbcc5d28ca9..00000000000 --- a/python/pyarrow/_hdfsio.pyx +++ /dev/null @@ -1,478 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# ---------------------------------------------------------------------- -# HDFS IO implementation - -# cython: language_level = 3 - -import re - -from pyarrow.lib cimport check_status, _Weakrefable, NativeFile -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport * -from pyarrow.includes.libarrow_fs cimport * -from pyarrow.lib import frombytes, tobytes, ArrowIOError - - -_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)') - - -def have_libhdfs(): - try: - with nogil: - check_status(HaveLibHdfs()) - return True - except Exception: - return False - - -def strip_hdfs_abspath(path): - m = _HDFS_PATH_RE.match(path) - if m: - return m.group(3) - else: - return path - - -cdef class HadoopFileSystem(_Weakrefable): - cdef: - shared_ptr[CIOHadoopFileSystem] client - - cdef readonly: - bint is_open - object host - object user - object kerb_ticket - int port - dict extra_conf - - def _connect(self, host, port, user, kerb_ticket, extra_conf): - cdef HdfsConnectionConfig conf - - if host is not None: - conf.host = tobytes(host) - self.host = host - - conf.port = port - self.port = port - - if user is not None: - conf.user = tobytes(user) - self.user = user - - if kerb_ticket is not None: - conf.kerb_ticket = tobytes(kerb_ticket) - self.kerb_ticket = kerb_ticket - - with nogil: - check_status(HaveLibHdfs()) - - if extra_conf is not None and isinstance(extra_conf, dict): - conf.extra_conf = {tobytes(k): tobytes(v) - for k, v in extra_conf.items()} - self.extra_conf = extra_conf - - with nogil: - check_status(CIOHadoopFileSystem.Connect(&conf, &self.client)) - self.is_open = True - - @classmethod - def connect(cls, *args, **kwargs): - return cls(*args, **kwargs) - - def __dealloc__(self): - if self.is_open: - self.close() - - def close(self): - """ - Disconnect from the HDFS cluster - """ - self._ensure_client() - with nogil: - check_status(self.client.get().Disconnect()) - self.is_open = False - - cdef _ensure_client(self): - if self.client.get() == NULL: - raise IOError('HDFS client improperly initialized') - elif not self.is_open: - raise IOError('HDFS client is closed') - - def exists(self, path): - """ - Returns True if the path is known to the cluster, False if it does not - (or there is an RPC error) - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - cdef c_bool result - with nogil: - result = self.client.get().Exists(c_path) - return result - - def isdir(self, path): - cdef HdfsPathInfo info - try: - self._path_info(path, &info) - except ArrowIOError: - return False - return info.kind == ObjectType_DIRECTORY - - def isfile(self, path): - cdef HdfsPathInfo info - try: - self._path_info(path, &info) - except ArrowIOError: - return False - return info.kind == ObjectType_FILE - - def get_capacity(self): - """ - Get reported total capacity of file system - - Returns - ------- - capacity : int - """ - cdef int64_t capacity = 0 - with nogil: - check_status(self.client.get().GetCapacity(&capacity)) - return capacity - - def get_space_used(self): - """ - Get space used on file system - - Returns - ------- - space_used : int - """ - cdef int64_t space_used = 0 - with nogil: - check_status(self.client.get().GetUsed(&space_used)) - return space_used - - def df(self): - """ - Return free space on disk, like the UNIX df command - - Returns - ------- - space : int - """ - return self.get_capacity() - self.get_space_used() - - def rename(self, path, new_path): - cdef c_string c_path = tobytes(path) - cdef c_string c_new_path = tobytes(new_path) - with nogil: - check_status(self.client.get().Rename(c_path, c_new_path)) - - def info(self, path): - """ - Return detailed HDFS information for path - - Parameters - ---------- - path : string - Path to file or directory - - Returns - ------- - path_info : dict - """ - cdef HdfsPathInfo info - self._path_info(path, &info) - return { - 'path': frombytes(info.name), - 'owner': frombytes(info.owner), - 'group': frombytes(info.group), - 'size': info.size, - 'block_size': info.block_size, - 'last_modified': info.last_modified_time, - 'last_accessed': info.last_access_time, - 'replication': info.replication, - 'permissions': info.permissions, - 'kind': ('directory' if info.kind == ObjectType_DIRECTORY - else 'file') - } - - def stat(self, path): - """ - Return basic file system statistics about path - - Parameters - ---------- - path : string - Path to file or directory - - Returns - ------- - stat : dict - """ - cdef FileStatistics info - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .Stat(c_path, &info)) - return { - 'size': info.size, - 'kind': ('directory' if info.kind == ObjectType_DIRECTORY - else 'file') - } - - cdef _path_info(self, path, HdfsPathInfo* info): - cdef c_string c_path = tobytes(path) - - with nogil: - check_status(self.client.get() - .GetPathInfo(c_path, info)) - - def ls(self, path, bint full_info): - cdef: - c_string c_path = tobytes(path) - vector[HdfsPathInfo] listing - list results = [] - int i - - self._ensure_client() - - with nogil: - check_status(self.client.get() - .ListDirectory(c_path, &listing)) - - cdef const HdfsPathInfo* info - for i in range( listing.size()): - info = &listing[i] - - # Try to trim off the hdfs://HOST:PORT piece - name = strip_hdfs_abspath(frombytes(info.name)) - - if full_info: - kind = ('file' if info.kind == ObjectType_FILE - else 'directory') - - results.append({ - 'kind': kind, - 'name': name, - 'owner': frombytes(info.owner), - 'group': frombytes(info.group), - 'last_modified_time': info.last_modified_time, - 'last_access_time': info.last_access_time, - 'size': info.size, - 'replication': info.replication, - 'block_size': info.block_size, - 'permissions': info.permissions - }) - else: - results.append(name) - - return results - - def chmod(self, path, mode): - """ - Change file permissions - - Parameters - ---------- - path : string - absolute path to file or directory - mode : int - POSIX-like bitmask - """ - self._ensure_client() - cdef c_string c_path = tobytes(path) - cdef int c_mode = mode - with nogil: - check_status(self.client.get() - .Chmod(c_path, c_mode)) - - def chown(self, path, owner=None, group=None): - """ - Change file permissions - - Parameters - ---------- - path : string - absolute path to file or directory - owner : string, default None - New owner, None for no change - group : string, default None - New group, None for no change - """ - cdef: - c_string c_path - c_string c_owner - c_string c_group - const char* c_owner_ptr = NULL - const char* c_group_ptr = NULL - - self._ensure_client() - - c_path = tobytes(path) - if owner is not None: - c_owner = tobytes(owner) - c_owner_ptr = c_owner.c_str() - - if group is not None: - c_group = tobytes(group) - c_group_ptr = c_group.c_str() - - with nogil: - check_status(self.client.get() - .Chown(c_path, c_owner_ptr, c_group_ptr)) - - def mkdir(self, path): - """ - Create indicated directory and any necessary parent directories - """ - self._ensure_client() - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .MakeDirectory(c_path)) - - def delete(self, path, bint recursive=False): - """ - Delete the indicated file or directory - - Parameters - ---------- - path : string - recursive : boolean, default False - If True, also delete child paths for directories - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .Delete(c_path, recursive == 1)) - - def open(self, path, mode='rb', buffer_size=None, replication=None, - default_block_size=None): - """ - Open HDFS file for reading or writing - - Parameters - ---------- - mode : string - Must be one of 'rb', 'wb', 'ab' - - Returns - ------- - handle : HdfsFile - """ - self._ensure_client() - - cdef HdfsFile out = HdfsFile() - - if mode not in ('rb', 'wb', 'ab'): - raise Exception("Mode must be 'rb' (read), " - "'wb' (write, new file), or 'ab' (append)") - - cdef c_string c_path = tobytes(path) - cdef c_bool append = False - - # 0 in libhdfs means "use the default" - cdef int32_t c_buffer_size = buffer_size or 0 - cdef int16_t c_replication = replication or 0 - cdef int64_t c_default_block_size = default_block_size or 0 - - cdef shared_ptr[HdfsOutputStream] wr_handle - cdef shared_ptr[HdfsReadableFile] rd_handle - - if mode in ('wb', 'ab'): - if mode == 'ab': - append = True - - with nogil: - check_status( - self.client.get() - .OpenWritable(c_path, append, c_buffer_size, - c_replication, c_default_block_size, - &wr_handle)) - - out.set_output_stream( wr_handle) - out.is_writable = True - else: - with nogil: - check_status(self.client.get() - .OpenReadable(c_path, &rd_handle)) - - out.set_random_access_file( - rd_handle) - out.is_readable = True - - assert not out.closed - - if c_buffer_size == 0: - c_buffer_size = 2 ** 16 - - out.mode = mode - out.buffer_size = c_buffer_size - out.parent = _HdfsFileNanny(self, out) - out.own_file = True - - return out - - def download(self, path, stream, buffer_size=None): - with self.open(path, 'rb') as f: - f.download(stream, buffer_size=buffer_size) - - def upload(self, path, stream, buffer_size=None): - """ - Upload file-like object to HDFS path - """ - with self.open(path, 'wb') as f: - f.upload(stream, buffer_size=buffer_size) - - -# ARROW-404: Helper class to ensure that files are closed before the -# client. During deallocation of the extension class, the attributes are -# decref'd which can cause the client to get closed first if the file has the -# last remaining reference -cdef class _HdfsFileNanny(_Weakrefable): - cdef: - object client - object file_handle_ref - - def __cinit__(self, client, file_handle): - import weakref - self.client = client - self.file_handle_ref = weakref.ref(file_handle) - - def __dealloc__(self): - fh = self.file_handle_ref() - if fh: - fh.close() - # avoid cyclic GC - self.file_handle_ref = None - self.client = None - - -cdef class HdfsFile(NativeFile): - cdef readonly: - int32_t buffer_size - object mode - object parent - - def __dealloc__(self): - self.parent = None diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py deleted file mode 100644 index 2e6c387a8fd..00000000000 --- a/python/pyarrow/hdfs.py +++ /dev/null @@ -1,240 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -import os -import posixpath -import sys -import warnings - -from pyarrow.util import doc, _DEPR_MSG -from pyarrow.filesystem import FileSystem -import pyarrow._hdfsio as _hdfsio - - -class HadoopFileSystem(_hdfsio.HadoopFileSystem, FileSystem): - """ - DEPRECATED: FileSystem interface for HDFS cluster. - - See pyarrow.hdfs.connect for full connection details - - .. deprecated:: 2.0 - ``pyarrow.hdfs.HadoopFileSystem`` is deprecated, - please use ``pyarrow.fs.HadoopFileSystem`` instead. - """ - - def __init__(self, host="default", port=0, user=None, kerb_ticket=None, - driver='libhdfs', extra_conf=None): - warnings.warn( - _DEPR_MSG.format( - "hdfs.HadoopFileSystem", "2.0.0", "fs.HadoopFileSystem"), - FutureWarning, stacklevel=2) - if driver == 'libhdfs': - _maybe_set_hadoop_classpath() - - self._connect(host, port, user, kerb_ticket, extra_conf) - - def __reduce__(self): - return (HadoopFileSystem, (self.host, self.port, self.user, - self.kerb_ticket, self.extra_conf)) - - def _isfilestore(self): - """ - Return True if this is a Unix-style file store with directories. - """ - return True - - @doc(FileSystem.isdir) - def isdir(self, path): - return super().isdir(path) - - @doc(FileSystem.isfile) - def isfile(self, path): - return super().isfile(path) - - @doc(FileSystem.delete) - def delete(self, path, recursive=False): - return super().delete(path, recursive) - - def mkdir(self, path, **kwargs): - """ - Create directory in HDFS. - - Parameters - ---------- - path : str - Directory path to create, including any parent directories. - - Notes - ----- - libhdfs does not support create_parents=False, so we ignore this here - """ - return super().mkdir(path) - - @doc(FileSystem.rename) - def rename(self, path, new_path): - return super().rename(path, new_path) - - @doc(FileSystem.exists) - def exists(self, path): - return super().exists(path) - - def ls(self, path, detail=False): - """ - Retrieve directory contents and metadata, if requested. - - Parameters - ---------- - path : str - HDFS path to retrieve contents of. - detail : bool, default False - If False, only return list of paths. - - Returns - ------- - result : list of dicts (detail=True) or strings (detail=False) - """ - return super().ls(path, detail) - - def walk(self, top_path): - """ - Directory tree generator for HDFS, like os.walk. - - Parameters - ---------- - top_path : str - Root directory for tree traversal. - - Returns - ------- - Generator yielding 3-tuple (dirpath, dirnames, filename) - """ - contents = self.ls(top_path, detail=True) - - directories, files = _libhdfs_walk_files_dirs(top_path, contents) - yield top_path, directories, files - for dirname in directories: - yield from self.walk(self._path_join(top_path, dirname)) - - -def _maybe_set_hadoop_classpath(): - import re - - if re.search(r'hadoop-common[^/]+.jar', os.environ.get('CLASSPATH', '')): - return - - if 'HADOOP_HOME' in os.environ: - if sys.platform != 'win32': - classpath = _derive_hadoop_classpath() - else: - hadoop_bin = '{}/bin/hadoop'.format(os.environ['HADOOP_HOME']) - classpath = _hadoop_classpath_glob(hadoop_bin) - else: - classpath = _hadoop_classpath_glob('hadoop') - - os.environ['CLASSPATH'] = classpath.decode('utf-8') - - -def _derive_hadoop_classpath(): - import subprocess - - find_args = ('find', '-L', os.environ['HADOOP_HOME'], '-name', '*.jar') - find = subprocess.Popen(find_args, stdout=subprocess.PIPE) - xargs_echo = subprocess.Popen(('xargs', 'echo'), - stdin=find.stdout, - stdout=subprocess.PIPE) - jars = subprocess.check_output(('tr', "' '", "':'"), - stdin=xargs_echo.stdout) - hadoop_conf = os.environ["HADOOP_CONF_DIR"] \ - if "HADOOP_CONF_DIR" in os.environ \ - else os.environ["HADOOP_HOME"] + "/etc/hadoop" - return (hadoop_conf + ":").encode("utf-8") + jars - - -def _hadoop_classpath_glob(hadoop_bin): - import subprocess - - hadoop_classpath_args = (hadoop_bin, 'classpath', '--glob') - return subprocess.check_output(hadoop_classpath_args) - - -def _libhdfs_walk_files_dirs(top_path, contents): - files = [] - directories = [] - for c in contents: - scrubbed_name = posixpath.split(c['name'])[1] - if c['kind'] == 'file': - files.append(scrubbed_name) - else: - directories.append(scrubbed_name) - - return directories, files - - -def connect(host="default", port=0, user=None, kerb_ticket=None, - extra_conf=None): - """ - DEPRECATED: Connect to an HDFS cluster. - - All parameters are optional and should only be set if the defaults need - to be overridden. - - Authentication should be automatic if the HDFS cluster uses Kerberos. - However, if a username is specified, then the ticket cache will likely - be required. - - .. deprecated:: 2.0 - ``pyarrow.hdfs.connect`` is deprecated, - please use ``pyarrow.fs.HadoopFileSystem`` instead. - - Parameters - ---------- - host : NameNode. Set to "default" for fs.defaultFS from core-site.xml. - port : NameNode's port. Set to 0 for default or logical (HA) nodes. - user : Username when connecting to HDFS; None implies login user. - kerb_ticket : Path to Kerberos ticket cache. - extra_conf : dict, default None - extra Key/Value pairs for config; Will override any - hdfs-site.xml properties - - Notes - ----- - The first time you call this method, it will take longer than usual due - to JNI spin-up time. - - Returns - ------- - filesystem : HadoopFileSystem - """ - warnings.warn( - _DEPR_MSG.format("hdfs.connect", "2.0.0", "fs.HadoopFileSystem"), - FutureWarning, stacklevel=2 - ) - return _connect( - host=host, port=port, user=user, kerb_ticket=kerb_ticket, - extra_conf=extra_conf - ) - - -def _connect(host="default", port=0, user=None, kerb_ticket=None, - extra_conf=None): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - fs = HadoopFileSystem(host=host, port=port, user=user, - kerb_ticket=kerb_ticket, - extra_conf=extra_conf) - return fs diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py deleted file mode 100644 index 5b94c200f35..00000000000 --- a/python/pyarrow/tests/test_hdfs.py +++ /dev/null @@ -1,451 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import os -import random -from io import BytesIO -from os.path import join as pjoin - -import numpy as np -import pytest - -import pyarrow as pa -from pyarrow.tests import util -from pyarrow.tests.parquet.common import _test_dataframe -from pyarrow.tests.parquet.test_dataset import ( - _test_write_to_dataset_with_partitions, - _test_write_to_dataset_no_partitions -) -from pyarrow.util import guid - -try: - from pandas.testing import assert_frame_equal -except ImportError: - pass - - -# ---------------------------------------------------------------------- -# HDFS tests - - -def check_libhdfs_present(): - if not pa.have_libhdfs(): - message = 'No libhdfs available on system' - if os.environ.get('PYARROW_HDFS_TEST_LIBHDFS_REQUIRE'): - pytest.fail(message) - else: - pytest.skip(message) - - -def hdfs_test_client(): - host = os.environ.get('ARROW_HDFS_TEST_HOST', 'default') - user = os.environ.get('ARROW_HDFS_TEST_USER', None) - try: - port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0)) - except ValueError: - raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' - 'an integer') - - with pytest.warns(FutureWarning): - return pa.hdfs.connect(host, port, user) - - -@pytest.mark.hdfs -class HdfsTestCases: - - def _make_test_file(self, hdfs, test_name, test_path, test_data): - base_path = pjoin(self.tmp_path, test_name) - hdfs.mkdir(base_path) - - full_path = pjoin(base_path, test_path) - - with hdfs.open(full_path, 'wb') as f: - f.write(test_data) - - return full_path - - @classmethod - def setup_class(cls): - cls.check_driver() - cls.hdfs = hdfs_test_client() - cls.tmp_path = '/tmp/pyarrow-test-{}'.format(random.randint(0, 1000)) - cls.hdfs.mkdir(cls.tmp_path) - - @classmethod - def teardown_class(cls): - cls.hdfs.delete(cls.tmp_path, recursive=True) - cls.hdfs.close() - - def test_pickle(self, pickle_module): - s = pickle_module.dumps(self.hdfs) - h2 = pickle_module.loads(s) - assert h2.is_open - assert h2.host == self.hdfs.host - assert h2.port == self.hdfs.port - assert h2.user == self.hdfs.user - assert h2.kerb_ticket == self.hdfs.kerb_ticket - # smoketest unpickled client works - h2.ls(self.tmp_path) - - def test_cat(self): - path = pjoin(self.tmp_path, 'cat-test') - - data = b'foobarbaz' - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - contents = self.hdfs.cat(path) - assert contents == data - - def test_capacity_space(self): - capacity = self.hdfs.get_capacity() - space_used = self.hdfs.get_space_used() - disk_free = self.hdfs.df() - - assert capacity > 0 - assert capacity > space_used - assert disk_free == (capacity - space_used) - - def test_close(self): - client = hdfs_test_client() - assert client.is_open - client.close() - assert not client.is_open - - with pytest.raises(Exception): - client.ls('/') - - def test_mkdir(self): - path = pjoin(self.tmp_path, 'test-dir/test-dir') - parent_path = pjoin(self.tmp_path, 'test-dir') - - self.hdfs.mkdir(path) - assert self.hdfs.exists(path) - - self.hdfs.delete(parent_path, recursive=True) - assert not self.hdfs.exists(path) - - def test_mv_rename(self): - path = pjoin(self.tmp_path, 'mv-test') - new_path = pjoin(self.tmp_path, 'mv-new-test') - - data = b'foobarbaz' - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - assert self.hdfs.exists(path) - self.hdfs.mv(path, new_path) - assert not self.hdfs.exists(path) - assert self.hdfs.exists(new_path) - - assert self.hdfs.cat(new_path) == data - - self.hdfs.rename(new_path, path) - assert self.hdfs.cat(path) == data - - def test_info(self): - path = pjoin(self.tmp_path, 'info-base') - file_path = pjoin(path, 'ex') - self.hdfs.mkdir(path) - - data = b'foobarbaz' - with self.hdfs.open(file_path, 'wb') as f: - f.write(data) - - path_info = self.hdfs.info(path) - file_path_info = self.hdfs.info(file_path) - - assert path_info['kind'] == 'directory' - - assert file_path_info['kind'] == 'file' - assert file_path_info['size'] == len(data) - - def test_exists_isdir_isfile(self): - dir_path = pjoin(self.tmp_path, 'info-base') - file_path = pjoin(dir_path, 'ex') - missing_path = pjoin(dir_path, 'this-path-is-missing') - - self.hdfs.mkdir(dir_path) - with self.hdfs.open(file_path, 'wb') as f: - f.write(b'foobarbaz') - - assert self.hdfs.exists(dir_path) - assert self.hdfs.exists(file_path) - assert not self.hdfs.exists(missing_path) - - assert self.hdfs.isdir(dir_path) - assert not self.hdfs.isdir(file_path) - assert not self.hdfs.isdir(missing_path) - - assert not self.hdfs.isfile(dir_path) - assert self.hdfs.isfile(file_path) - assert not self.hdfs.isfile(missing_path) - - def test_disk_usage(self): - path = pjoin(self.tmp_path, 'disk-usage-base') - p1 = pjoin(path, 'p1') - p2 = pjoin(path, 'p2') - - subdir = pjoin(path, 'subdir') - p3 = pjoin(subdir, 'p3') - - if self.hdfs.exists(path): - self.hdfs.delete(path, True) - - self.hdfs.mkdir(path) - self.hdfs.mkdir(subdir) - - data = b'foobarbaz' - - for file_path in [p1, p2, p3]: - with self.hdfs.open(file_path, 'wb') as f: - f.write(data) - - assert self.hdfs.disk_usage(path) == len(data) * 3 - - def test_ls(self): - base_path = pjoin(self.tmp_path, 'ls-test') - self.hdfs.mkdir(base_path) - - dir_path = pjoin(base_path, 'a-dir') - f1_path = pjoin(base_path, 'a-file-1') - - self.hdfs.mkdir(dir_path) - - f = self.hdfs.open(f1_path, 'wb') - f.write(b'a' * 10) - - contents = sorted(self.hdfs.ls(base_path, False)) - assert contents == [dir_path, f1_path] - - def test_chmod_chown(self): - path = pjoin(self.tmp_path, 'chmod-test') - with self.hdfs.open(path, 'wb') as f: - f.write(b'a' * 10) - - def test_download_upload(self): - base_path = pjoin(self.tmp_path, 'upload-test') - - data = b'foobarbaz' - buf = BytesIO(data) - buf.seek(0) - - self.hdfs.upload(base_path, buf) - - out_buf = BytesIO() - self.hdfs.download(base_path, out_buf) - out_buf.seek(0) - assert out_buf.getvalue() == data - - def test_file_context_manager(self): - path = pjoin(self.tmp_path, 'ctx-manager') - - data = b'foo' - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - with self.hdfs.open(path, 'rb') as f: - assert f.size() == 3 - result = f.read(10) - assert result == data - - def test_open_not_exist(self): - path = pjoin(self.tmp_path, 'does-not-exist-123') - - with pytest.raises(FileNotFoundError): - self.hdfs.open(path) - - def test_open_write_error(self): - with pytest.raises((FileExistsError, IsADirectoryError)): - self.hdfs.open('/', 'wb') - - def test_read_whole_file(self): - path = pjoin(self.tmp_path, 'read-whole-file') - - data = b'foo' * 1000 - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - with self.hdfs.open(path, 'rb') as f: - result = f.read() - - assert result == data - - def _write_multiple_hdfs_pq_files(self, tmpdir): - import pyarrow.parquet as pq - nfiles = 10 - size = 5 - test_data = [] - for i in range(nfiles): - df = _test_dataframe(size, seed=i) - - df['index'] = np.arange(i * size, (i + 1) * size) - - # Hack so that we don't have a dtype cast in v1 files - df['uint32'] = df['uint32'].astype(np.int64) - - path = pjoin(tmpdir, '{}.parquet'.format(i)) - - table = pa.Table.from_pandas(df, preserve_index=False) - with self.hdfs.open(path, 'wb') as f: - pq.write_table(table, f) - - test_data.append(table) - - expected = pa.concat_tables(test_data) - return expected - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.pandas - @pytest.mark.parquet - def test_read_multiple_parquet_files(self): - - tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid()) - - self.hdfs.mkdir(tmpdir) - - expected = self._write_multiple_hdfs_pq_files(tmpdir) - result = self.hdfs.read_parquet(tmpdir) - - assert_frame_equal( - result.to_pandas().sort_values(by='index').reset_index(drop=True), - expected.to_pandas() - ) - - @pytest.mark.pandas - @pytest.mark.parquet - def test_read_multiple_parquet_files_with_uri(self): - import pyarrow.parquet as pq - - tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid()) - - self.hdfs.mkdir(tmpdir) - - expected = self._write_multiple_hdfs_pq_files(tmpdir) - path = _get_hdfs_uri(tmpdir) - result = pq.read_table(path) - - assert_frame_equal( - result.to_pandas().sort_values(by='index').reset_index(drop=True), - expected.to_pandas() - ) - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.pandas - @pytest.mark.parquet - def test_read_write_parquet_files_with_uri(self): - import pyarrow.parquet as pq - - tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid()) - self.hdfs.mkdir(tmpdir) - path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet')) - - size = 5 - df = _test_dataframe(size, seed=0) - # Hack so that we don't have a dtype cast in v1 files - df['uint32'] = df['uint32'].astype(np.int64) - table = pa.Table.from_pandas(df, preserve_index=False) - - pq.write_table(table, path, filesystem=self.hdfs) - - result = pq.read_table(path, filesystem=self.hdfs).to_pandas() - - assert_frame_equal(result, df) - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.parquet - @pytest.mark.pandas - def test_write_to_dataset_with_partitions(self): - tmpdir = pjoin(self.tmp_path, 'write-partitions-' + guid()) - self.hdfs.mkdir(tmpdir) - _test_write_to_dataset_with_partitions( - tmpdir, filesystem=self.hdfs) - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.parquet - @pytest.mark.pandas - def test_write_to_dataset_no_partitions(self): - tmpdir = pjoin(self.tmp_path, 'write-no_partitions-' + guid()) - self.hdfs.mkdir(tmpdir) - _test_write_to_dataset_no_partitions( - tmpdir, filesystem=self.hdfs) - - -class TestLibHdfs(HdfsTestCases): - - @classmethod - def check_driver(cls): - check_libhdfs_present() - - def test_orphaned_file(self): - hdfs = hdfs_test_client() - file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname', - b'foobarbaz') - - f = hdfs.open(file_path) - hdfs = None - f = None # noqa - - -def _get_hdfs_uri(path): - host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') - try: - port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0)) - except ValueError: - raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' - 'an integer') - uri = "hdfs://{}:{}{}".format(host, port, path) - - return uri - - -@pytest.mark.hdfs -@pytest.mark.pandas -@pytest.mark.parquet -@pytest.mark.fastparquet -def test_fastparquet_read_with_hdfs(): - check_libhdfs_present() - try: - import snappy # noqa - except ImportError: - pytest.skip('fastparquet test requires snappy') - - import pyarrow.parquet as pq - fastparquet = pytest.importorskip('fastparquet') - - fs = hdfs_test_client() - - df = util.make_dataframe() - - table = pa.Table.from_pandas(df) - - path = '/tmp/testing.parquet' - with fs.open(path, 'wb') as f: - pq.write_table(table, f) - - parquet_file = fastparquet.ParquetFile(path, open_with=fs.open) - - result = parquet_file.to_pandas() - assert_frame_equal(result, df) diff --git a/python/setup.py b/python/setup.py index 423de708e88..798bd6b05fd 100755 --- a/python/setup.py +++ b/python/setup.py @@ -211,7 +211,6 @@ def initialize_options(self): '_s3fs', '_substrait', '_hdfs', - '_hdfsio', 'gandiva'] def _run_cmake(self): From 4ba269525767f38bf652cad08fe246a6493db5aa Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Fri, 26 Jan 2024 17:47:21 +0100 Subject: [PATCH 03/21] Remove legacy filesystem code --- python/pyarrow/__init__.py | 42 -- python/pyarrow/filesystem.py | 511 ------------------ python/pyarrow/fs.py | 9 - python/pyarrow/tests/parquet/test_basic.py | 5 +- python/pyarrow/tests/parquet/test_dataset.py | 47 +- .../tests/parquet/test_parquet_writer.py | 43 -- python/pyarrow/tests/test_filesystem.py | 75 --- 7 files changed, 15 insertions(+), 717 deletions(-) delete mode 100644 python/pyarrow/filesystem.py delete mode 100644 python/pyarrow/tests/test_filesystem.py diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 0096547b946..f3cbcb847f1 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -275,54 +275,12 @@ def print_entry(label, value): ArrowTypeError, ArrowSerializationError) -import pyarrow.hdfs as hdfs - from pyarrow.ipc import serialize_pandas, deserialize_pandas import pyarrow.ipc as ipc import pyarrow.types as types -# deprecated top-level access - - -from pyarrow.filesystem import FileSystem as _FileSystem -from pyarrow.filesystem import LocalFileSystem as _LocalFileSystem -from pyarrow.hdfs import HadoopFileSystem as _HadoopFileSystem - - -_localfs = _LocalFileSystem._get_instance() - - -_msg = ( - "pyarrow.{0} is deprecated as of 2.0.0, please use pyarrow.fs.{1} instead." -) - -_serialization_msg = ( - "'pyarrow.{0}' is deprecated and will be removed in a future version. " - "Use pickle or the pyarrow IPC functionality instead." -) - -_deprecated = { - "localfs": (_localfs, "LocalFileSystem"), - "FileSystem": (_FileSystem, "FileSystem"), - "LocalFileSystem": (_LocalFileSystem, "LocalFileSystem"), - "HadoopFileSystem": (_HadoopFileSystem, "HadoopFileSystem"), -} - - -def __getattr__(name): - if name in _deprecated: - obj, new_name = _deprecated[name] - _warnings.warn(_msg.format(name, new_name), - FutureWarning, stacklevel=2) - return obj - - raise AttributeError( - "module 'pyarrow' has no attribute '{0}'".format(name) - ) - - # ---------------------------------------------------------------------- # Deprecations diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py deleted file mode 100644 index c1e70a1ee69..00000000000 --- a/python/pyarrow/filesystem.py +++ /dev/null @@ -1,511 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -import os -import posixpath -import sys -import urllib.parse -import warnings - -from os.path import join as pjoin - -import pyarrow as pa -from pyarrow.util import doc, _stringify_path, _is_path_like, _DEPR_MSG - - -_FS_DEPR_MSG = _DEPR_MSG.format( - "filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem" -) - - -class FileSystem: - """ - Abstract filesystem interface. - """ - - def cat(self, path): - """ - Return contents of file as a bytes object. - - Parameters - ---------- - path : str - File path to read content from. - - Returns - ------- - contents : bytes - """ - with self.open(path, 'rb') as f: - return f.read() - - def ls(self, path): - """ - Return list of file paths. - - Parameters - ---------- - path : str - Directory to list contents from. - """ - raise NotImplementedError - - def delete(self, path, recursive=False): - """ - Delete the indicated file or directory. - - Parameters - ---------- - path : str - Path to delete. - recursive : bool, default False - If True, also delete child paths for directories. - """ - raise NotImplementedError - - def disk_usage(self, path): - """ - Compute bytes used by all contents under indicated path in file tree. - - Parameters - ---------- - path : str - Can be a file path or directory. - - Returns - ------- - usage : int - """ - path = _stringify_path(path) - path_info = self.stat(path) - if path_info['kind'] == 'file': - return path_info['size'] - - total = 0 - for root, directories, files in self.walk(path): - for child_path in files: - abspath = self._path_join(root, child_path) - total += self.stat(abspath)['size'] - - return total - - def _path_join(self, *args): - return self.pathsep.join(args) - - def stat(self, path): - """ - Information about a filesystem entry. - - Returns - ------- - stat : dict - """ - raise NotImplementedError('FileSystem.stat') - - def rm(self, path, recursive=False): - """ - Alias for FileSystem.delete. - """ - return self.delete(path, recursive=recursive) - - def mv(self, path, new_path): - """ - Alias for FileSystem.rename. - """ - return self.rename(path, new_path) - - def rename(self, path, new_path): - """ - Rename file, like UNIX mv command. - - Parameters - ---------- - path : str - Path to alter. - new_path : str - Path to move to. - """ - raise NotImplementedError('FileSystem.rename') - - def mkdir(self, path, create_parents=True): - """ - Create a directory. - - Parameters - ---------- - path : str - Path to the directory. - create_parents : bool, default True - If the parent directories don't exists create them as well. - """ - raise NotImplementedError - - def exists(self, path): - """ - Return True if path exists. - - Parameters - ---------- - path : str - Path to check. - """ - raise NotImplementedError - - def isdir(self, path): - """ - Return True if path is a directory. - - Parameters - ---------- - path : str - Path to check. - """ - raise NotImplementedError - - def isfile(self, path): - """ - Return True if path is a file. - - Parameters - ---------- - path : str - Path to check. - """ - raise NotImplementedError - - def _isfilestore(self): - """ - Returns True if this FileSystem is a unix-style file store with - directories. - """ - raise NotImplementedError - - def read_parquet(self, path, columns=None, metadata=None, schema=None, - use_threads=True, use_pandas_metadata=False): - """ - Read Parquet data from path in file system. Can read from a single file - or a directory of files. - - Parameters - ---------- - path : str - Single file path or directory - columns : List[str], optional - Subset of columns to read. - metadata : pyarrow.parquet.FileMetaData - Known metadata to validate files against. - schema : pyarrow.parquet.Schema - Known schema to validate files against. Alternative to metadata - argument. - use_threads : bool, default True - Perform multi-threaded column reads. - use_pandas_metadata : bool, default False - If True and file has custom pandas schema metadata, ensure that - index columns are also loaded. - - Returns - ------- - table : pyarrow.Table - """ - from pyarrow.parquet import ParquetDataset - dataset = ParquetDataset(path, schema=schema, metadata=metadata, - filesystem=self) - return dataset.read(columns=columns, use_threads=use_threads, - use_pandas_metadata=use_pandas_metadata) - - def open(self, path, mode='rb'): - """ - Open file for reading or writing. - """ - raise NotImplementedError - - @property - def pathsep(self): - return '/' - - -class LocalFileSystem(FileSystem): - - _instance = None - - def __init__(self): - warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) - super().__init__() - - @classmethod - def _get_instance(cls): - if cls._instance is None: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - cls._instance = LocalFileSystem() - return cls._instance - - @classmethod - def get_instance(cls): - warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) - return cls._get_instance() - - @doc(FileSystem.ls) - def ls(self, path): - path = _stringify_path(path) - return sorted(pjoin(path, x) for x in os.listdir(path)) - - @doc(FileSystem.mkdir) - def mkdir(self, path, create_parents=True): - path = _stringify_path(path) - if create_parents: - os.makedirs(path) - else: - os.mkdir(path) - - @doc(FileSystem.isdir) - def isdir(self, path): - path = _stringify_path(path) - return os.path.isdir(path) - - @doc(FileSystem.isfile) - def isfile(self, path): - path = _stringify_path(path) - return os.path.isfile(path) - - @doc(FileSystem._isfilestore) - def _isfilestore(self): - return True - - @doc(FileSystem.exists) - def exists(self, path): - path = _stringify_path(path) - return os.path.exists(path) - - @doc(FileSystem.open) - def open(self, path, mode='rb'): - """ - Open file for reading or writing. - """ - path = _stringify_path(path) - return open(path, mode=mode) - - @property - def pathsep(self): - return os.path.sep - - def walk(self, path): - """ - Directory tree generator, see os.walk. - """ - path = _stringify_path(path) - return os.walk(path) - - -class DaskFileSystem(FileSystem): - """ - Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc. - """ - - def __init__(self, fs): - warnings.warn( - "The pyarrow.filesystem.DaskFileSystem/S3FSWrapper are deprecated " - "as of pyarrow 3.0.0, and will be removed in a future version.", - FutureWarning, stacklevel=2) - self.fs = fs - - @doc(FileSystem.isdir) - def isdir(self, path): - raise NotImplementedError("Unsupported file system API") - - @doc(FileSystem.isfile) - def isfile(self, path): - raise NotImplementedError("Unsupported file system API") - - @doc(FileSystem._isfilestore) - def _isfilestore(self): - """ - Object Stores like S3 and GCSFS are based on key lookups, not true - file-paths. - """ - return False - - @doc(FileSystem.delete) - def delete(self, path, recursive=False): - path = _stringify_path(path) - return self.fs.rm(path, recursive=recursive) - - @doc(FileSystem.exists) - def exists(self, path): - path = _stringify_path(path) - return self.fs.exists(path) - - @doc(FileSystem.mkdir) - def mkdir(self, path, create_parents=True): - path = _stringify_path(path) - if create_parents: - return self.fs.mkdirs(path) - else: - return self.fs.mkdir(path) - - @doc(FileSystem.open) - def open(self, path, mode='rb'): - """ - Open file for reading or writing. - """ - path = _stringify_path(path) - return self.fs.open(path, mode=mode) - - def ls(self, path, detail=False): - path = _stringify_path(path) - return self.fs.ls(path, detail=detail) - - def walk(self, path): - """ - Directory tree generator, like os.walk. - """ - path = _stringify_path(path) - return self.fs.walk(path) - - -class S3FSWrapper(DaskFileSystem): - - @doc(FileSystem.isdir) - def isdir(self, path): - path = _sanitize_s3(_stringify_path(path)) - try: - contents = self.fs.ls(path) - if len(contents) == 1 and contents[0] == path: - return False - else: - return True - except OSError: - return False - - @doc(FileSystem.isfile) - def isfile(self, path): - path = _sanitize_s3(_stringify_path(path)) - try: - contents = self.fs.ls(path) - return len(contents) == 1 and contents[0] == path - except OSError: - return False - - def walk(self, path, refresh=False): - """ - Directory tree generator, like os.walk. - - Generator version of what is in s3fs, which yields a flattened list of - files. - """ - path = _sanitize_s3(_stringify_path(path)) - directories = set() - files = set() - - for key in list(self.fs._ls(path, refresh=refresh)): - path = key['Key'] - if key['StorageClass'] == 'DIRECTORY': - directories.add(path) - elif key['StorageClass'] == 'BUCKET': - pass - else: - files.add(path) - - # s3fs creates duplicate 'DIRECTORY' entries - files = sorted([posixpath.split(f)[1] for f in files - if f not in directories]) - directories = sorted([posixpath.split(x)[1] - for x in directories]) - - yield path, directories, files - - for directory in directories: - yield from self.walk(directory, refresh=refresh) - - -def _sanitize_s3(path): - if path.startswith('s3://'): - return path.replace('s3://', '') - else: - return path - - -def _ensure_filesystem(fs): - fs_type = type(fs) - - # If the arrow filesystem was subclassed, assume it supports the full - # interface and return it - if not issubclass(fs_type, FileSystem): - if "fsspec" in sys.modules: - fsspec = sys.modules["fsspec"] - if isinstance(fs, fsspec.AbstractFileSystem): - # for recent fsspec versions that stop inheriting from - # pyarrow.filesystem.FileSystem, still allow fsspec - # filesystems (which should be compatible with our legacy fs) - return fs - - raise OSError('Unrecognized filesystem: {}'.format(fs_type)) - else: - return fs - - -def resolve_filesystem_and_path(where, filesystem=None): - """ - Return filesystem from path which could be an HDFS URI, a local URI, - or a plain filesystem path. - """ - if not _is_path_like(where): - if filesystem is not None: - raise ValueError("filesystem passed but where is file-like, so" - " there is nothing to open with filesystem.") - return filesystem, where - - if filesystem is not None: - filesystem = _ensure_filesystem(filesystem) - if isinstance(filesystem, LocalFileSystem): - path = _stringify_path(where) - elif not isinstance(where, str): - raise TypeError( - "Expected string path; path-like objects are only allowed " - "with a local filesystem" - ) - else: - path = where - return filesystem, path - - path = _stringify_path(where) - - parsed_uri = urllib.parse.urlparse(path) - if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs': - # Input is hdfs URI such as hdfs://host:port/myfile.parquet - netloc_split = parsed_uri.netloc.split(':') - host = netloc_split[0] - if host == '': - host = 'default' - else: - host = parsed_uri.scheme + "://" + host - port = 0 - if len(netloc_split) == 2 and netloc_split[1].isnumeric(): - port = int(netloc_split[1]) - fs = pa.hdfs._connect(host=host, port=port) - fs_path = parsed_uri.path - elif parsed_uri.scheme == 'file': - # Input is local URI such as file:///home/user/myfile.parquet - fs = LocalFileSystem._get_instance() - fs_path = parsed_uri.path - else: - # Input is local path such as /home/user/myfile.parquet - fs = LocalFileSystem._get_instance() - fs_path = path - - return fs, fs_path diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index ead750ca44e..bafea5d67ee 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -123,15 +123,6 @@ def _ensure_filesystem( return LocalFileSystem(use_mmap=use_mmap) return PyFileSystem(FSSpecHandler(filesystem)) - # map old filesystems to new ones - import pyarrow.filesystem as legacyfs - - if isinstance(filesystem, legacyfs.LocalFileSystem): - return LocalFileSystem(use_mmap=use_mmap) - # TODO handle HDFS? - if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem): - return filesystem - raise TypeError( "Unrecognized filesystem: {}. `filesystem` argument must be a " "FileSystem instance or a valid file system URI'".format( diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 3c867776ac0..bc21d709ec2 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -25,7 +25,6 @@ import pyarrow as pa from pyarrow import fs -from pyarrow.filesystem import LocalFileSystem, FileSystem from pyarrow.tests import util from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table, _test_dataframe) @@ -259,11 +258,11 @@ def test_fspath(tempdir): # combined with non-local filesystem raises with pytest.raises(TypeError): - _read_table(fs_protocol_obj, filesystem=FileSystem()) + _read_table(fs_protocol_obj, filesystem=fs.FileSystem()) @pytest.mark.parametrize("filesystem", [ - None, fs.LocalFileSystem(), LocalFileSystem._get_instance() + None, fs.LocalFileSystem() ]) @pytest.mark.parametrize("name", ("data.parquet", "δΎ‹.parquet")) def test_relative_paths(tempdir, filesystem, name): diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index b6e351bdef9..0ffcb66689f 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -27,7 +27,6 @@ import pyarrow as pa import pyarrow.compute as pc from pyarrow import fs -from pyarrow.filesystem import LocalFileSystem from pyarrow.tests import util from pyarrow.util import guid from pyarrow.vendored.version import Version @@ -74,7 +73,7 @@ def test_filesystem_uri(tempdir): @pytest.mark.pandas def test_read_partitioned_directory(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() _partition_test_for_filesystem(fs, tempdir) @@ -82,7 +81,7 @@ def test_read_partitioned_directory(tempdir): def test_read_partitioned_columns_selection(tempdir): # ARROW-3861 - do not include partition columns in resulting table when # `columns` keyword was passed without those columns - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir _partition_test_for_filesystem(fs, base_path) @@ -93,7 +92,7 @@ def test_read_partitioned_columns_selection(tempdir): @pytest.mark.pandas def test_filters_equivalency(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1] @@ -164,7 +163,7 @@ def test_filters_equivalency(tempdir): @pytest.mark.pandas def test_filters_cutoff_exclusive_integer(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -204,7 +203,7 @@ def test_filters_cutoff_exclusive_integer(tempdir): ) @pytest.mark.pandas def test_filters_cutoff_exclusive_datetime(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir date_keys = [ @@ -264,7 +263,7 @@ def test_filters_inclusive_datetime(tempdir): @pytest.mark.pandas def test_filters_inclusive_integer(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -298,7 +297,7 @@ def test_filters_inclusive_integer(tempdir): @pytest.mark.pandas def test_filters_inclusive_set(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1] @@ -345,7 +344,7 @@ def test_filters_inclusive_set(tempdir): @pytest.mark.pandas def test_filters_invalid_pred_op(tempdir): - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -388,7 +387,7 @@ def test_filters_invalid_pred_op(tempdir): def test_filters_invalid_column(tempdir): # ARROW-5572 - raise error on invalid name in filter specification # works with new dataset - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -419,7 +418,7 @@ def test_filters_invalid_column(tempdir): def test_filters_read_table(tempdir, filters, read_method): read = getattr(pq, read_method) # test that filters keyword is passed through in read_table - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -445,7 +444,7 @@ def test_filters_read_table(tempdir, filters, read_method): @pytest.mark.pandas def test_partition_keys_with_underscores(tempdir): # ARROW-5666 - partition field values with underscores preserve underscores - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() base_path = tempdir string_keys = ["2019_2", "2019_3"] @@ -499,26 +498,6 @@ def test_read_single_file_list(tempdir): assert result.equals(table) -@pytest.mark.pandas -@pytest.mark.s3 -def test_read_partitioned_directory_s3fs_wrapper(s3_example_s3fs): - import s3fs - - from pyarrow.filesystem import S3FSWrapper - - if Version(s3fs.__version__) >= Version("0.5"): - pytest.skip("S3FSWrapper no longer working for s3fs 0.5+") - - fs, path = s3_example_s3fs - with pytest.warns(FutureWarning): - wrapper = S3FSWrapper(fs) - _partition_test_for_filesystem(wrapper, path) - - # Check that we can auto-wrap - dataset = pq.ParquetDataset(path, filesystem=fs) - dataset.read() - - @pytest.mark.pandas @pytest.mark.s3 def test_read_partitioned_directory_s3fs(s3_example_s3fs): @@ -1009,7 +988,7 @@ def _test_write_to_dataset_no_partitions(base_path, output_table = pa.Table.from_pandas(output_df) if filesystem is None: - filesystem = LocalFileSystem._get_instance() + filesystem = fs.LocalFileSystem() # Without partitions, append files to root_path n = 5 @@ -1110,7 +1089,7 @@ def test_write_to_dataset_filesystem(tempdir): def _make_dataset_for_pickling(tempdir, N=100): path = tempdir / 'data.parquet' - fs = LocalFileSystem._get_instance() + fs = fs.LocalFileSystem() df = pd.DataFrame({ 'index': np.arange(N), diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index 16584684f5c..f4ee7529ae8 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -19,7 +19,6 @@ import pyarrow as pa from pyarrow import fs -from pyarrow.filesystem import FileSystem, LocalFileSystem try: import pyarrow.parquet as pq @@ -161,7 +160,6 @@ def test_parquet_writer_context_obj_with_exception(tempdir): @pytest.mark.pandas @pytest.mark.parametrize("filesystem", [ None, - LocalFileSystem._get_instance(), fs.LocalFileSystem(), ]) def test_parquet_writer_write_wrappers(tempdir, filesystem): @@ -250,7 +248,6 @@ def check_chunk_size(data_size, chunk_size, expect_num_chunks): @pytest.mark.pandas @pytest.mark.parametrize("filesystem", [ None, - LocalFileSystem._get_instance(), fs.LocalFileSystem(), ]) def test_parquet_writer_filesystem_local(tempdir, filesystem): @@ -330,46 +327,6 @@ def test_parquet_writer_filesystem_buffer_raises(): ) -@pytest.mark.pandas -def test_parquet_writer_with_caller_provided_filesystem(): - out = pa.BufferOutputStream() - - class CustomFS(FileSystem): - def __init__(self): - self.path = None - self.mode = None - - def open(self, path, mode='rb'): - self.path = path - self.mode = mode - return out - - fs = CustomFS() - fname = 'expected_fname.parquet' - df = _test_dataframe(100) - table = pa.Table.from_pandas(df, preserve_index=False) - - with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.6') \ - as writer: - writer.write_table(table) - - assert fs.path == fname - assert fs.mode == 'wb' - assert out.closed - - buf = out.getvalue() - table_read = _read_table(pa.BufferReader(buf)) - df_read = table_read.to_pandas() - tm.assert_frame_equal(df_read, df) - - # Should raise ValueError when filesystem is passed with file-like object - with pytest.raises(ValueError) as err_info: - pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs) - expected_msg = ("filesystem passed but where is file-like, so" - " there is nothing to open with filesystem.") - assert str(err_info) == expected_msg - - def test_parquet_writer_store_schema(tempdir): table = pa.table({'a': [1, 2, 3]}) diff --git a/python/pyarrow/tests/test_filesystem.py b/python/pyarrow/tests/test_filesystem.py deleted file mode 100644 index 9862c5990d7..00000000000 --- a/python/pyarrow/tests/test_filesystem.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import pyarrow as pa -from pyarrow import filesystem - -import os -import pytest - - -def test_filesystem_deprecated(): - with pytest.warns(FutureWarning): - filesystem.LocalFileSystem() - - with pytest.warns(FutureWarning): - filesystem.LocalFileSystem.get_instance() - - -def test_filesystem_deprecated_toplevel(): - with pytest.warns(FutureWarning): - pa.localfs - - with pytest.warns(FutureWarning): - pa.FileSystem - - with pytest.warns(FutureWarning): - pa.LocalFileSystem - - with pytest.warns(FutureWarning): - pa.HadoopFileSystem - - -def test_resolve_uri(): - uri = "file:///home/user/myfile.parquet" - fs, path = filesystem.resolve_filesystem_and_path(uri) - assert isinstance(fs, filesystem.LocalFileSystem) - assert path == "/home/user/myfile.parquet" - - -def test_resolve_local_path(): - for uri in ['/home/user/myfile.parquet', - 'myfile.parquet', - 'my # file ? parquet', - 'C:/Windows/myfile.parquet', - r'C:\\Windows\\myfile.parquet', - ]: - fs, path = filesystem.resolve_filesystem_and_path(uri) - assert isinstance(fs, filesystem.LocalFileSystem) - assert path == uri - - -@pytest.mark.filterwarnings("ignore:pyarrow.filesystem.LocalFileSystem") -def test_resolve_home_directory(): - uri = '~/myfile.parquet' - fs, path = filesystem.resolve_filesystem_and_path(uri) - assert isinstance(fs, filesystem.LocalFileSystem) - assert path == os.path.expanduser(uri) - - local_fs = filesystem.LocalFileSystem() - fs, path = filesystem.resolve_filesystem_and_path(uri, local_fs) - assert path == os.path.expanduser(uri) From 6334f5a34994a5c2f13a5f34f1daaeb16b15bda9 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Mon, 29 Jan 2024 12:25:35 +0100 Subject: [PATCH 04/21] Remove left import and update test_dataset.py --- python/pyarrow/parquet/core.py | 16 +-- python/pyarrow/tests/parquet/test_dataset.py | 132 +++++++++++-------- 2 files changed, 84 insertions(+), 64 deletions(-) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 98a4b2a1138..30ccca487c9 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -47,7 +47,6 @@ SortingColumn) from pyarrow.fs import (LocalFileSystem, FileSystem, FileType, _resolve_filesystem_and_path, _ensure_filesystem) -from pyarrow import filesystem as legacyfs from pyarrow.util import guid, _is_path_like, _stringify_path, _deprecate_api @@ -993,16 +992,11 @@ def __init__(self, where, schema, filesystem=None, where, filesystem, allow_legacy_filesystem=True ) if filesystem is not None: - if isinstance(filesystem, legacyfs.FileSystem): - # legacy filesystem (eg custom subclass) - # TODO deprecate - sink = self.file_handle = filesystem.open(path, 'wb') - else: - # ARROW-10480: do not auto-detect compression. While - # a filename like foo.parquet.gz is nonconforming, it - # shouldn't implicitly apply compression. - sink = self.file_handle = filesystem.open_output_stream( - path, compression=None) + # ARROW-10480: do not auto-detect compression. While + # a filename like foo.parquet.gz is nonconforming, it + # shouldn't implicitly apply compression. + sink = self.file_handle = filesystem.open_output_stream( + path, compression=None) else: sink = where self._metadata_collector = options.pop('metadata_collector', None) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 0ffcb66689f..8ac08aa35c0 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -31,6 +31,7 @@ from pyarrow.util import guid from pyarrow.vendored.version import Version +from pyarrow.fs import (FileSelector, LocalFileSystem) try: import pyarrow.parquet as pq from pyarrow.tests.parquet.common import ( @@ -62,7 +63,7 @@ def test_filesystem_uri(tempdir): # filesystem object result = pq.read_table( - path, filesystem=fs.LocalFileSystem()) + path, filesystem=LocalFileSystem()) assert result.equals(table) # filesystem URI @@ -73,17 +74,17 @@ def test_filesystem_uri(tempdir): @pytest.mark.pandas def test_read_partitioned_directory(tempdir): - fs = fs.LocalFileSystem() - _partition_test_for_filesystem(fs, tempdir) + local = LocalFileSystem() + _partition_test_for_filesystem(local, tempdir) @pytest.mark.pandas def test_read_partitioned_columns_selection(tempdir): # ARROW-3861 - do not include partition columns in resulting table when # `columns` keyword was passed without those columns - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir - _partition_test_for_filesystem(fs, base_path) + _partition_test_for_filesystem(local, base_path) dataset = pq.ParquetDataset(base_path) result = dataset.read(columns=["values"]) @@ -92,7 +93,7 @@ def test_read_partitioned_columns_selection(tempdir): @pytest.mark.pandas def test_filters_equivalency(tempdir): - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1] @@ -111,12 +112,12 @@ def test_filters_equivalency(tempdir): 3), }, columns=['integer', 'string', 'boolean']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) # Old filters syntax: # integer == 1 AND string != b AND boolean == True dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[('integer', '=', 1), ('string', '!=', 'b'), ('boolean', '==', 'True')], ) @@ -140,7 +141,7 @@ def test_filters_equivalency(tempdir): [('integer', '=', 0), ('boolean', '==', 'False')] ] dataset = pq.ParquetDataset( - base_path, filesystem=fs, filters=filters) + base_path, filesystem=local, filters=filters) table = dataset.read() result_df = table.to_pandas().reset_index(drop=True) @@ -157,13 +158,13 @@ def test_filters_equivalency(tempdir): for filters in [[[('string', '==', b'1\0a')]], [[('string', '==', '1\0a')]]]: dataset = pq.ParquetDataset( - base_path, filesystem=fs, filters=filters) + base_path, filesystem=local, filters=filters) assert dataset.read().num_rows == 0 @pytest.mark.pandas def test_filters_cutoff_exclusive_integer(tempdir): - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -177,10 +178,10 @@ def test_filters_cutoff_exclusive_integer(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[ ('integers', '<', 4), ('integers', '>', 1), @@ -203,7 +204,7 @@ def test_filters_cutoff_exclusive_integer(tempdir): ) @pytest.mark.pandas def test_filters_cutoff_exclusive_datetime(tempdir): - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir date_keys = [ @@ -223,10 +224,10 @@ def test_filters_cutoff_exclusive_datetime(tempdir): 'dates': np.array(date_keys, dtype='datetime64'), }, columns=['index', 'dates']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[ ('dates', '<', "2018-04-12"), ('dates', '>', "2018-04-10") @@ -263,7 +264,7 @@ def test_filters_inclusive_datetime(tempdir): @pytest.mark.pandas def test_filters_inclusive_integer(tempdir): - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -277,10 +278,10 @@ def test_filters_inclusive_integer(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[ ('integers', '<=', 3), ('integers', '>=', 2), @@ -297,7 +298,7 @@ def test_filters_inclusive_integer(tempdir): @pytest.mark.pandas def test_filters_inclusive_set(tempdir): - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1] @@ -316,10 +317,10 @@ def test_filters_inclusive_set(tempdir): 3), }, columns=['integer', 'string', 'boolean']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[('string', 'in', 'ab')], ) table = dataset.read() @@ -330,7 +331,7 @@ def test_filters_inclusive_set(tempdir): assert 'c' not in result_df['string'].values dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')), ('boolean', 'not in', {'False'})], ) @@ -344,7 +345,7 @@ def test_filters_inclusive_set(tempdir): @pytest.mark.pandas def test_filters_invalid_pred_op(tempdir): - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -358,26 +359,26 @@ def test_filters_invalid_pred_op(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) with pytest.raises(TypeError): pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', 'in', 3), ]) with pytest.raises(ValueError): pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', '=<', 3), ]) # Dataset API returns empty table dataset = pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', 'in', set()), ]) assert dataset.read().num_rows == 0 dataset = pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', '!=', {3})]) with pytest.raises(NotImplementedError): assert dataset.read().num_rows == 0 @@ -387,7 +388,7 @@ def test_filters_invalid_pred_op(tempdir): def test_filters_invalid_column(tempdir): # ARROW-5572 - raise error on invalid name in filter specification # works with new dataset - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -399,11 +400,11 @@ def test_filters_invalid_column(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) msg = r"No match for FieldRef.Name\(non_existent_column\)" with pytest.raises(ValueError, match=msg): - pq.ParquetDataset(base_path, filesystem=fs, + pq.ParquetDataset(base_path, filesystem=local, filters=[('non_existent_column', '<', 3), ]).read() @@ -418,7 +419,7 @@ def test_filters_invalid_column(tempdir): def test_filters_read_table(tempdir, filters, read_method): read = getattr(pq, read_method) # test that filters keyword is passed through in read_table - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -433,9 +434,9 @@ def test_filters_read_table(tempdir, filters, read_method): 'nested': np.array([{'a': i, 'b': str(i)} for i in range(N)]) }) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) - kwargs = dict(filesystem=fs, filters=filters) + kwargs = dict(filesystem=local, filters=filters) table = read(base_path, **kwargs) assert table.num_rows == 3 @@ -444,7 +445,7 @@ def test_filters_read_table(tempdir, filters, read_method): @pytest.mark.pandas def test_partition_keys_with_underscores(tempdir): # ARROW-5666 - partition field values with underscores preserve underscores - fs = fs.LocalFileSystem() + local = LocalFileSystem() base_path = tempdir string_keys = ["2019_2", "2019_3"] @@ -458,7 +459,7 @@ def test_partition_keys_with_underscores(tempdir): 'year_week': np.array(string_keys, dtype='object'), }, columns=['index', 'year_week']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset(base_path) result = dataset.read() @@ -561,25 +562,41 @@ def _visit_level(base_dir, level, part_keys): str(base_dir), '{}={}'.format(name, value) ]) - fs.mkdir(level_dir) + try: + fs.create_dir(level_dir) + except AttributeError: + fs.mkdir(level_dir) if level == DEPTH - 1: # Generate example data file_path = pathsep.join([level_dir, guid()]) filtered_df = _filter_partition(df, this_part_keys) part_table = pa.Table.from_pandas(filtered_df) - with fs.open(file_path, 'wb') as f: - _write_table(part_table, f) - assert fs.exists(file_path) + try: + with fs.open_output_stream(file_path) as f: + _write_table(part_table, f) + assert os.path.exists(file_path) + except AttributeError: + with fs.open(file_path, 'wb') as f: + _write_table(part_table, f) + assert fs.exists(file_path) file_success = pathsep.join([level_dir, '_SUCCESS']) - with fs.open(file_success, 'wb') as f: - pass + try: + with fs.open_output_stream(file_success) as f: + pass + except AttributeError: + with fs.open(file_success, 'wb') as f: + pass else: _visit_level(level_dir, level + 1, this_part_keys) file_success = pathsep.join([level_dir, '_SUCCESS']) - with fs.open(file_success, 'wb') as f: - pass + try: + with fs.open_output_stream(file_success) as f: + pass + except AttributeError: + with fs.open(file_success, 'wb') as f: + pass _visit_level(base_dir, 0, []) @@ -988,15 +1005,24 @@ def _test_write_to_dataset_no_partitions(base_path, output_table = pa.Table.from_pandas(output_df) if filesystem is None: - filesystem = fs.LocalFileSystem() + filesystem = LocalFileSystem() # Without partitions, append files to root_path n = 5 for i in range(n): pq.write_to_dataset(output_table, base_path, filesystem=filesystem) - output_files = [file for file in filesystem.ls(str(base_path)) - if file.endswith(".parquet")] + + try: + output_files = [file for file in filesystem.ls(str(base_path)) + if file.endswith(".parquet")] + except AttributeError: + selector = FileSelector(str(base_path), allow_not_found=False, + recursive=True) + assert selector.base_dir == str(base_path) + + infos = filesystem.get_file_info(selector) + output_files = [info for info in infos if (info.path.endswith(".parquet"))] assert len(output_files) == n # Deduplicated incoming DataFrame should match @@ -1082,14 +1108,14 @@ def test_write_to_dataset_filesystem(tempdir): table = pa.Table.from_pandas(df) path = str(tempdir) - pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem()) + pq.write_to_dataset(table, path, filesystem=LocalFileSystem()) result = pq.read_table(path) assert result.equals(table) def _make_dataset_for_pickling(tempdir, N=100): path = tempdir / 'data.parquet' - fs = fs.LocalFileSystem() + local = LocalFileSystem() df = pd.DataFrame({ 'index': np.arange(N), @@ -1106,11 +1132,11 @@ def _make_dataset_for_pickling(tempdir, N=100): assert reader.metadata.num_row_groups == num_groups metadata_path = tempdir / '_metadata' - with fs.open(metadata_path, 'wb') as f: + with local.open_output_stream(str(metadata_path)) as f: pq.write_metadata(table.schema, f) dataset = pq.ParquetDataset( - tempdir, filesystem=fs) + tempdir, filesystem=local) return dataset @@ -1228,7 +1254,7 @@ def test_parquet_dataset_new_filesystem(tempdir): # Ensure we can pass new FileSystem object to ParquetDataset table = pa.table({'a': [1, 2, 3]}) pq.write_table(table, tempdir / 'data.parquet') - filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem()) + filesystem = fs.SubTreeFileSystem(str(tempdir), LocalFileSystem()) dataset = pq.ParquetDataset('.', filesystem=filesystem) result = dataset.read() assert result.equals(table) From e9ea6aec388026511313ab8afdbe8fd869d11539 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Mon, 29 Jan 2024 12:38:51 +0100 Subject: [PATCH 05/21] Fix linter --- python/pyarrow/parquet/core.py | 4 +--- python/pyarrow/tests/parquet/test_dataset.py | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 30ccca487c9..9ddf5683c7c 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -1119,9 +1119,7 @@ def _get_pandas_index_columns(keyvalues): def _is_local_file_system(fs): - return isinstance(fs, LocalFileSystem) or isinstance( - fs, legacyfs.LocalFileSystem - ) + return isinstance(fs, LocalFileSystem) _read_docstring_common = """\ diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 8ac08aa35c0..0c07b6e59de 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -29,7 +29,6 @@ from pyarrow import fs from pyarrow.tests import util from pyarrow.util import guid -from pyarrow.vendored.version import Version from pyarrow.fs import (FileSelector, LocalFileSystem) try: From 8d3818aec214172a4f73988b731182c97f4f2df5 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Mon, 29 Jan 2024 14:57:56 +0100 Subject: [PATCH 06/21] Revert change on _hdfsio and have_libhdfs() --- python/CMakeLists.txt | 1 + python/pyarrow/__init__.py | 1 + python/pyarrow/_hdfsio.pyx | 33 +++++++++++++++++++++++++++++++++ python/setup.py | 1 + 4 files changed, 36 insertions(+) create mode 100644 python/pyarrow/_hdfsio.pyx diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index c3a1c578689..1d6524373a7 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -545,6 +545,7 @@ set(CYTHON_EXTENSIONS _csv _feather _fs + _hdfsio _json _pyarrow_cpp_tests) set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index f3cbcb847f1..0d2591bd69f 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -257,6 +257,7 @@ def print_entry(label, value): create_memory_map, MockOutputStream, input_stream, output_stream) +from pyarrow._hdfsio import have_libhdfs from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, concat_arrays, concat_tables, TableGroupBy, diff --git a/python/pyarrow/_hdfsio.pyx b/python/pyarrow/_hdfsio.pyx new file mode 100644 index 00000000000..55238e2f624 --- /dev/null +++ b/python/pyarrow/_hdfsio.pyx @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ---------------------------------------------------------------------- +# HDFS IO implementation + +# cython: language_level = 3 + +from pyarrow.lib cimport check_status +from pyarrow.includes.libarrow_fs cimport * + + +def have_libhdfs(): + try: + with nogil: + check_status(HaveLibHdfs()) + return True + except Exception: + return False diff --git a/python/setup.py b/python/setup.py index 798bd6b05fd..423de708e88 100755 --- a/python/setup.py +++ b/python/setup.py @@ -211,6 +211,7 @@ def initialize_options(self): '_s3fs', '_substrait', '_hdfs', + '_hdfsio', 'gandiva'] def _run_cmake(self): From 05227946ce63041310c8835d0ae8a8033c00544c Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Tue, 30 Jan 2024 10:50:00 +0100 Subject: [PATCH 07/21] Remove _hdfsio and move have_libhdfs to io.pxi --- python/CMakeLists.txt | 1 - python/pyarrow/__init__.py | 5 ++--- python/pyarrow/_hdfsio.pyx | 33 --------------------------------- python/pyarrow/io.pxi | 10 ++++++++++ python/setup.py | 1 - 5 files changed, 12 insertions(+), 38 deletions(-) delete mode 100644 python/pyarrow/_hdfsio.pyx diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 1d6524373a7..c3a1c578689 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -545,7 +545,6 @@ set(CYTHON_EXTENSIONS _csv _feather _fs - _hdfsio _json _pyarrow_cpp_tests) set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 0d2591bd69f..7ede69da665 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -255,9 +255,8 @@ def print_entry(label, value): BufferReader, BufferOutputStream, OSFile, MemoryMappedFile, memory_map, create_memory_map, MockOutputStream, - input_stream, output_stream) - -from pyarrow._hdfsio import have_libhdfs + input_stream, output_stream, + have_libhdfs) from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, concat_arrays, concat_tables, TableGroupBy, diff --git a/python/pyarrow/_hdfsio.pyx b/python/pyarrow/_hdfsio.pyx deleted file mode 100644 index 55238e2f624..00000000000 --- a/python/pyarrow/_hdfsio.pyx +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# ---------------------------------------------------------------------- -# HDFS IO implementation - -# cython: language_level = 3 - -from pyarrow.lib cimport check_status -from pyarrow.includes.libarrow_fs cimport * - - -def have_libhdfs(): - try: - with nogil: - check_status(HaveLibHdfs()) - return True - except Exception: - return False diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index b57980b3d68..3c36820cf6a 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -30,6 +30,7 @@ import warnings from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation from queue import Queue, Empty as QueueEmpty +from pyarrow.lib cimport check_status, HaveLibHdfs from pyarrow.util import _is_path_like, _stringify_path @@ -46,6 +47,15 @@ cdef extern from "Python.h": bytearray PyByteArray_FromStringAndSize(char *string, Py_ssize_t len) +def have_libhdfs(): + try: + with nogil: + check_status(HaveLibHdfs()) + return True + except Exception: + return False + + def io_thread_count(): """ Return the number of threads to use for I/O operations. diff --git a/python/setup.py b/python/setup.py index 423de708e88..798bd6b05fd 100755 --- a/python/setup.py +++ b/python/setup.py @@ -211,7 +211,6 @@ def initialize_options(self): '_s3fs', '_substrait', '_hdfs', - '_hdfsio', 'gandiva'] def _run_cmake(self): From 3bd32d8dc9235a011c85ffab90c9996b4f8ad250 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Tue, 30 Jan 2024 10:54:18 +0100 Subject: [PATCH 08/21] Remove the use of allow_legacy_filesystem in fs.py --- python/pyarrow/fs.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index bafea5d67ee..6751a397b19 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -98,9 +98,7 @@ def _filesystem_from_str(uri): return filesystem -def _ensure_filesystem( - filesystem, use_mmap=False, allow_legacy_filesystem=False -): +def _ensure_filesystem(filesystem, use_mmap=False): if isinstance(filesystem, FileSystem): return filesystem elif isinstance(filesystem, str): @@ -130,9 +128,7 @@ def _ensure_filesystem( ) -def _resolve_filesystem_and_path( - path, filesystem=None, allow_legacy_filesystem=False, memory_map=False -): +def _resolve_filesystem_and_path(path, filesystem=None, memory_map=False): """ Return filesystem/path from path which could be an URI or a plain filesystem path. @@ -146,10 +142,7 @@ def _resolve_filesystem_and_path( return filesystem, path if filesystem is not None: - filesystem = _ensure_filesystem( - filesystem, use_mmap=memory_map, - allow_legacy_filesystem=allow_legacy_filesystem - ) + filesystem = _ensure_filesystem(filesystem, use_mmap=memory_map) if isinstance(filesystem, LocalFileSystem): path = _stringify_path(path) elif not isinstance(path, str): @@ -157,8 +150,7 @@ def _resolve_filesystem_and_path( "Expected string path; path-like objects are only allowed " "with a local filesystem" ) - if not allow_legacy_filesystem: - path = filesystem.normalize_path(path) + path = filesystem.normalize_path(path) return filesystem, path path = _stringify_path(path) From 05489857936bcdd3099eda6b57c9eed319cc6a8f Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Tue, 30 Jan 2024 10:55:49 +0100 Subject: [PATCH 09/21] Remove _is_local_file_system --- python/pyarrow/parquet/core.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 9ddf5683c7c..6dc1c5e1172 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -1118,10 +1118,6 @@ def _get_pandas_index_columns(keyvalues): EXCLUDED_PARQUET_PATHS = {'_SUCCESS'} -def _is_local_file_system(fs): - return isinstance(fs, LocalFileSystem) - - _read_docstring_common = """\ read_dictionary : list, default None List of names or column paths (for nested types) to read directly @@ -1298,7 +1294,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None, if ( hasattr(path_or_paths, "__fspath__") and filesystem is not None and - not _is_local_file_system(filesystem) + not isinstance(filesystem, LocalFileSystem) ): raise TypeError( "Path-like objects with __fspath__ must only be used with " From 64a22c8e66047a940ab4ce3f0850bff05f296861 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Tue, 30 Jan 2024 11:17:22 +0100 Subject: [PATCH 10/21] Update ParquetFile --- python/pyarrow/parquet/core.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 6dc1c5e1172..e287b64f0f2 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -988,9 +988,7 @@ def __init__(self, where, schema, filesystem=None, # sure to close it when `self.close` is called. self.file_handle = None - filesystem, path = _resolve_filesystem_and_path( - where, filesystem, allow_legacy_filesystem=True - ) + filesystem, path = _resolve_filesystem_and_path(where, filesystem) if filesystem is not None: # ARROW-10480: do not auto-detect compression. While # a filename like foo.parquet.gz is nonconforming, it From 142af852c9caa821061bdf650a241c377319d57d Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Wed, 28 Feb 2024 12:16:43 +0100 Subject: [PATCH 11/21] Updated s3_example_s3fs and s3_example_fs fixtures --- python/pyarrow/tests/parquet/conftest.py | 18 ++++----- python/pyarrow/tests/parquet/test_dataset.py | 37 ++++++------------- python/pyarrow/tests/parquet/test_metadata.py | 6 ++- 3 files changed, 25 insertions(+), 36 deletions(-) diff --git a/python/pyarrow/tests/parquet/conftest.py b/python/pyarrow/tests/parquet/conftest.py index 461c24af22a..36dfb7c763d 100644 --- a/python/pyarrow/tests/parquet/conftest.py +++ b/python/pyarrow/tests/parquet/conftest.py @@ -53,23 +53,22 @@ def s3_bucket(s3_server): @pytest.fixture def s3_example_s3fs(s3_server, s3_bucket): - s3fs = pytest.importorskip('s3fs') + from pyarrow.fs import S3FileSystem host, port, access_key, secret_key = s3_server['connection'] - fs = s3fs.S3FileSystem( - key=access_key, - secret=secret_key, - client_kwargs={ - 'endpoint_url': 'http://{}:{}'.format(host, port) - } + fs = S3FileSystem( + access_key=access_key, + secret_key=secret_key, + endpoint_override='{}:{}'.format(host, port), + scheme='http' ) test_path = '{}/{}'.format(s3_bucket, guid()) - fs.mkdir(test_path) + fs.create_dir(test_path) yield fs, test_path try: - fs.rm(test_path, recursive=True) + fs.delete_dir_contents(test_path, missing_dir_ok=True) except FileNotFoundError: pass @@ -81,6 +80,7 @@ def s3_example_fs(s3_server): host, port, access_key, secret_key = s3_server['connection'] uri = ( "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" + "&allow_bucket_creation=True" .format(access_key, secret_key, host, port) ) fs, path = FileSystem.from_uri(uri) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 0c07b6e59de..4013011287b 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -561,41 +561,28 @@ def _visit_level(base_dir, level, part_keys): str(base_dir), '{}={}'.format(name, value) ]) - try: - fs.create_dir(level_dir) - except AttributeError: - fs.mkdir(level_dir) + fs.create_dir(level_dir) if level == DEPTH - 1: # Generate example data + from pyarrow.fs import FileType + file_path = pathsep.join([level_dir, guid()]) filtered_df = _filter_partition(df, this_part_keys) part_table = pa.Table.from_pandas(filtered_df) - try: - with fs.open_output_stream(file_path) as f: - _write_table(part_table, f) - assert os.path.exists(file_path) - except AttributeError: - with fs.open(file_path, 'wb') as f: - _write_table(part_table, f) - assert fs.exists(file_path) + with fs.open_output_stream(file_path) as f: + _write_table(part_table, f) + assert fs.get_file_info(file_path).type != FileType.NotFound + assert fs.get_file_info(file_path).type == FileType.File file_success = pathsep.join([level_dir, '_SUCCESS']) - try: - with fs.open_output_stream(file_success) as f: - pass - except AttributeError: - with fs.open(file_success, 'wb') as f: - pass + with fs.open_output_stream(file_success) as f: + pass else: _visit_level(level_dir, level + 1, this_part_keys) file_success = pathsep.join([level_dir, '_SUCCESS']) - try: - with fs.open_output_stream(file_success) as f: - pass - except AttributeError: - with fs.open(file_success, 'wb') as f: - pass + with fs.open_output_stream(file_success) as f: + pass _visit_level(base_dir, 0, []) @@ -953,7 +940,7 @@ def _test_write_to_dataset_with_partitions(base_path, metadata_path = os.path.join(str(base_path), '_common_metadata') if filesystem is not None: - with filesystem.open(metadata_path, 'wb') as f: + with filesystem.open_output_stream(metadata_path) as f: pq.write_metadata(output_table.schema, f) else: pq.write_metadata(output_table.schema, metadata_path) diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index bf186bd923c..34a309d36a9 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -770,5 +770,7 @@ def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs): pq.write_metadata(table.schema, meta5, [], filesystem=s3_fs) assert meta1.read_bytes() == meta2.read_bytes() \ - == meta3.read_bytes() == meta4.read_bytes() \ - == s3_fs.open(meta5).read() + == meta3.read_bytes() == meta4.read_bytes() + + with s3_fs.open_input_file(meta5) as f: + assert f.readall() == meta1.read_bytes() From 4ef45c1369861fea304c01273813282a76736aad Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Wed, 28 Feb 2024 17:37:57 +0100 Subject: [PATCH 12/21] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- python/pyarrow/fs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 6751a397b19..a256cc540f7 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -98,7 +98,7 @@ def _filesystem_from_str(uri): return filesystem -def _ensure_filesystem(filesystem, use_mmap=False): +def _ensure_filesystem(filesystem, *, use_mmap=False): if isinstance(filesystem, FileSystem): return filesystem elif isinstance(filesystem, str): @@ -128,7 +128,7 @@ def _ensure_filesystem(filesystem, use_mmap=False): ) -def _resolve_filesystem_and_path(path, filesystem=None, memory_map=False): +def _resolve_filesystem_and_path(path, filesystem=None, *, memory_map=False): """ Return filesystem/path from path which could be an URI or a plain filesystem path. From 34c96adfca35a64865d65120e68649295213f43a Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Wed, 28 Feb 2024 17:41:08 +0100 Subject: [PATCH 13/21] Update python/pyarrow/tests/parquet/test_dataset.py Co-authored-by: Antoine Pitrou --- python/pyarrow/tests/parquet/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 4013011287b..de2edd21c35 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1008,7 +1008,7 @@ def _test_write_to_dataset_no_partitions(base_path, assert selector.base_dir == str(base_path) infos = filesystem.get_file_info(selector) - output_files = [info for info in infos if (info.path.endswith(".parquet"))] + output_files = [info for info in infos if info.path.endswith(".parquet")] assert len(output_files) == n # Deduplicated incoming DataFrame should match From d31a2b224d2d40599f8dae864bffd2d904b15432 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Thu, 29 Feb 2024 13:11:48 +0100 Subject: [PATCH 14/21] Redo change in s3_example_s3fs fixture and add a warpping of s3fs in _generate_partition_directories --- python/pyarrow/tests/parquet/conftest.py | 17 +++++++++-------- python/pyarrow/tests/parquet/test_dataset.py | 8 ++++++-- python/pyarrow/tests/parquet/test_metadata.py | 6 ++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/python/pyarrow/tests/parquet/conftest.py b/python/pyarrow/tests/parquet/conftest.py index 36dfb7c763d..767e7f6b69d 100644 --- a/python/pyarrow/tests/parquet/conftest.py +++ b/python/pyarrow/tests/parquet/conftest.py @@ -53,22 +53,23 @@ def s3_bucket(s3_server): @pytest.fixture def s3_example_s3fs(s3_server, s3_bucket): - from pyarrow.fs import S3FileSystem + s3fs = pytest.importorskip('s3fs') host, port, access_key, secret_key = s3_server['connection'] - fs = S3FileSystem( - access_key=access_key, - secret_key=secret_key, - endpoint_override='{}:{}'.format(host, port), - scheme='http' + fs = s3fs.S3FileSystem( + key=access_key, + secret=secret_key, + client_kwargs={ + 'endpoint_url': 'http://{}:{}'.format(host, port) + } ) test_path = '{}/{}'.format(s3_bucket, guid()) - fs.create_dir(test_path) + fs.mkdir(test_path) yield fs, test_path try: - fs.delete_dir_contents(test_path, missing_dir_ok=True) + fs.rm(test_path, recursive=True) except FileNotFoundError: pass diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index de2edd21c35..ff55b262da8 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -30,7 +30,8 @@ from pyarrow.tests import util from pyarrow.util import guid -from pyarrow.fs import (FileSelector, LocalFileSystem) +from pyarrow.fs import (FileSelector, FileSystem, + LocalFileSystem, PyFileSystem, FSSpecHandler) try: import pyarrow.parquet as pq from pyarrow.tests.parquet.common import ( @@ -548,6 +549,9 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df): # partition_spec : list of lists, e.g. [['foo', [0, 1, 2], # ['bar', ['a', 'b', 'c']] # part_table : a pyarrow.Table to write to each partition + if (not isinstance(fs, FileSystem)): + fs = PyFileSystem(FSSpecHandler(fs)) + DEPTH = len(partition_spec) pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/")) @@ -940,7 +944,7 @@ def _test_write_to_dataset_with_partitions(base_path, metadata_path = os.path.join(str(base_path), '_common_metadata') if filesystem is not None: - with filesystem.open_output_stream(metadata_path) as f: + with filesystem.open(metadata_path, 'wb') as f: pq.write_metadata(output_table.schema, f) else: pq.write_metadata(output_table.schema, metadata_path) diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 34a309d36a9..bf186bd923c 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -770,7 +770,5 @@ def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs): pq.write_metadata(table.schema, meta5, [], filesystem=s3_fs) assert meta1.read_bytes() == meta2.read_bytes() \ - == meta3.read_bytes() == meta4.read_bytes() - - with s3_fs.open_input_file(meta5) as f: - assert f.readall() == meta1.read_bytes() + == meta3.read_bytes() == meta4.read_bytes() \ + == s3_fs.open(meta5).read() From 0d68482c9f33db6ba53c298c39bc01fbc7426ce2 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Thu, 29 Feb 2024 13:27:04 +0100 Subject: [PATCH 15/21] Rearrange imports in test_dataset.py --- python/pyarrow/tests/parquet/test_dataset.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index ff55b262da8..e780c4cfb33 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -26,12 +26,11 @@ import pyarrow as pa import pyarrow.compute as pc -from pyarrow import fs +from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem, + PyFileSystem, SubTreeFileSystem, FSSpecHandler) from pyarrow.tests import util from pyarrow.util import guid -from pyarrow.fs import (FileSelector, FileSystem, - LocalFileSystem, PyFileSystem, FSSpecHandler) try: import pyarrow.parquet as pq from pyarrow.tests.parquet.common import ( @@ -1244,7 +1243,7 @@ def test_parquet_dataset_new_filesystem(tempdir): # Ensure we can pass new FileSystem object to ParquetDataset table = pa.table({'a': [1, 2, 3]}) pq.write_table(table, tempdir / 'data.parquet') - filesystem = fs.SubTreeFileSystem(str(tempdir), LocalFileSystem()) + filesystem = SubTreeFileSystem(str(tempdir), LocalFileSystem()) dataset = pq.ParquetDataset('.', filesystem=filesystem) result = dataset.read() assert result.equals(table) From 228391ef05dc4a6ea61f689e9fdbc759f4097711 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Thu, 29 Feb 2024 13:34:55 +0100 Subject: [PATCH 16/21] Update _test_write_to_dataset_no_partitions --- python/pyarrow/tests/parquet/test_dataset.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index e780c4cfb33..ee316155b71 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -995,6 +995,8 @@ def _test_write_to_dataset_no_partitions(base_path, if filesystem is None: filesystem = LocalFileSystem() + elif (not isinstance(filesystem, FileSystem)): + filesystem = PyFileSystem(FSSpecHandler(filesystem)) # Without partitions, append files to root_path n = 5 @@ -1002,16 +1004,12 @@ def _test_write_to_dataset_no_partitions(base_path, pq.write_to_dataset(output_table, base_path, filesystem=filesystem) - try: - output_files = [file for file in filesystem.ls(str(base_path)) - if file.endswith(".parquet")] - except AttributeError: - selector = FileSelector(str(base_path), allow_not_found=False, - recursive=True) - assert selector.base_dir == str(base_path) + selector = FileSelector(str(base_path), allow_not_found=False, + recursive=True) + assert selector.base_dir == str(base_path) - infos = filesystem.get_file_info(selector) - output_files = [info for info in infos if info.path.endswith(".parquet")] + infos = filesystem.get_file_info(selector) + output_files = [info for info in infos if info.path.endswith(".parquet")] assert len(output_files) == n # Deduplicated incoming DataFrame should match From 6cfeeee748186adfa6bbd6754e7f808391382d1b Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Thu, 29 Feb 2024 13:36:41 +0100 Subject: [PATCH 17/21] Remove filesystems_deprecated from index.rst --- docs/source/python/index.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source/python/index.rst b/docs/source/python/index.rst index 08939bc760d..7acff940ba2 100644 --- a/docs/source/python/index.rst +++ b/docs/source/python/index.rst @@ -49,7 +49,6 @@ files into Arrow structures. memory ipc filesystems - filesystems_deprecated numpy pandas interchange_protocol From e22bc028ee2307e72ccf4b69de64a5e7f37e2fb8 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Thu, 29 Feb 2024 13:58:43 +0100 Subject: [PATCH 18/21] Add docstrings to have_libhdfs --- python/pyarrow/io.pxi | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 3c36820cf6a..7890bf4b2dd 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -48,6 +48,9 @@ cdef extern from "Python.h": def have_libhdfs(): + """ + Return true if HDFS (HadoopFileSystem) library is set up correctly. + """ try: with nogil: check_status(HaveLibHdfs()) From 721cf147d6bca46e8511ec0e2f675dcc45abaad4 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 29 Feb 2024 14:21:40 +0100 Subject: [PATCH 19/21] Apply suggestions from code review Co-authored-by: Joris Van den Bossche --- python/pyarrow/tests/parquet/test_dataset.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index ee316155b71..30dae05124f 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -548,7 +548,7 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df): # partition_spec : list of lists, e.g. [['foo', [0, 1, 2], # ['bar', ['a', 'b', 'c']] # part_table : a pyarrow.Table to write to each partition - if (not isinstance(fs, FileSystem)): + if not isinstance(fs, FileSystem): fs = PyFileSystem(FSSpecHandler(fs)) DEPTH = len(partition_spec) @@ -995,7 +995,7 @@ def _test_write_to_dataset_no_partitions(base_path, if filesystem is None: filesystem = LocalFileSystem() - elif (not isinstance(filesystem, FileSystem)): + elif not isinstance(filesystem, FileSystem): filesystem = PyFileSystem(FSSpecHandler(filesystem)) # Without partitions, append files to root_path @@ -1006,7 +1006,6 @@ def _test_write_to_dataset_no_partitions(base_path, selector = FileSelector(str(base_path), allow_not_found=False, recursive=True) - assert selector.base_dir == str(base_path) infos = filesystem.get_file_info(selector) output_files = [info for info in infos if info.path.endswith(".parquet")] From ca7358cdd6ac3f67e2588ba15561935eab2234cf Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Mon, 4 Mar 2024 11:39:20 +0100 Subject: [PATCH 20/21] Remove allow_bucket_creation=True from s3_example_fs --- python/pyarrow/tests/parquet/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyarrow/tests/parquet/conftest.py b/python/pyarrow/tests/parquet/conftest.py index 767e7f6b69d..461c24af22a 100644 --- a/python/pyarrow/tests/parquet/conftest.py +++ b/python/pyarrow/tests/parquet/conftest.py @@ -81,7 +81,6 @@ def s3_example_fs(s3_server): host, port, access_key, secret_key = s3_server['connection'] uri = ( "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" - "&allow_bucket_creation=True" .format(access_key, secret_key, host, port) ) fs, path = FileSystem.from_uri(uri) From 8e7e38501e4b12c7f7490a80f2c3649bdcd0d9c9 Mon Sep 17 00:00:00 2001 From: AlenkaF Date: Mon, 4 Mar 2024 11:55:47 +0100 Subject: [PATCH 21/21] Fix the use of _resolve_filesystem_and_path --- python/pyarrow/parquet/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index e287b64f0f2..69a1c9d19aa 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -308,7 +308,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, self._close_source = getattr(source, 'closed', True) filesystem, source = _resolve_filesystem_and_path( - source, filesystem, memory_map) + source, filesystem, memory_map=memory_map) if filesystem is not None: source = filesystem.open_input_file(source) self._close_source = True # We opened it here, ensure we close it.