diff --git a/docs/source/python/filesystems_deprecated.rst b/docs/source/python/filesystems_deprecated.rst deleted file mode 100644 index c51245341b4..00000000000 --- a/docs/source/python/filesystems_deprecated.rst +++ /dev/null @@ -1,88 +0,0 @@ -.. Licensed to the Apache Software Foundation (ASF) under one -.. or more contributor license agreements. See the NOTICE file -.. distributed with this work for additional information -.. regarding copyright ownership. The ASF licenses this file -.. to you under the Apache License, Version 2.0 (the -.. "License"); you may not use this file except in compliance -.. with the License. You may obtain a copy of the License at - -.. http://www.apache.org/licenses/LICENSE-2.0 - -.. Unless required by applicable law or agreed to in writing, -.. software distributed under the License is distributed on an -.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -.. KIND, either express or implied. See the License for the -.. specific language governing permissions and limitations -.. under the License. - -Filesystem Interface (legacy) -============================= - -.. warning:: - This section documents the deprecated filesystem layer. You should - use the :ref:`new filesystem layer ` instead. - -.. _hdfs: - -Hadoop File System (HDFS) -------------------------- - -PyArrow comes with bindings to a C++-based interface to the Hadoop File -System. You connect like so: - -.. code-block:: python - - import pyarrow as pa - fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path) - with fs.open(path, 'rb') as f: - # Do something with f - -By default, ``pyarrow.hdfs.HadoopFileSystem`` uses libhdfs, a JNI-based -interface to the Java Hadoop client. This library is loaded **at runtime** -(rather than at link / library load time, since the library may not be in your -LD_LIBRARY_PATH), and relies on some environment variables. - -* ``HADOOP_HOME``: the root of your installed Hadoop distribution. Often has - `lib/native/libhdfs.so`. - -* ``JAVA_HOME``: the location of your Java SDK installation. - -* ``ARROW_LIBHDFS_DIR`` (optional): explicit location of ``libhdfs.so`` if it is - installed somewhere other than ``$HADOOP_HOME/lib/native``. - -* ``CLASSPATH``: must contain the Hadoop jars. You can set these using: - -.. code-block:: shell - - export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` - -If ``CLASSPATH`` is not set, then it will be set automatically if the -``hadoop`` executable is in your system path, or if ``HADOOP_HOME`` is set. - -HDFS API -~~~~~~~~ - -.. currentmodule:: pyarrow - -.. autosummary:: - :toctree: generated/ - - hdfs.connect - HadoopFileSystem.cat - HadoopFileSystem.chmod - HadoopFileSystem.chown - HadoopFileSystem.delete - HadoopFileSystem.df - HadoopFileSystem.disk_usage - HadoopFileSystem.download - HadoopFileSystem.exists - HadoopFileSystem.get_capacity - HadoopFileSystem.get_space_used - HadoopFileSystem.info - HadoopFileSystem.ls - HadoopFileSystem.mkdir - HadoopFileSystem.open - HadoopFileSystem.rename - HadoopFileSystem.rm - HadoopFileSystem.upload - HdfsFile diff --git a/docs/source/python/index.rst b/docs/source/python/index.rst index 08939bc760d..7acff940ba2 100644 --- a/docs/source/python/index.rst +++ b/docs/source/python/index.rst @@ -49,7 +49,6 @@ files into Arrow structures. memory ipc filesystems - filesystems_deprecated numpy pandas interchange_protocol diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 1d6524373a7..c3a1c578689 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -545,7 +545,6 @@ set(CYTHON_EXTENSIONS _csv _feather _fs - _hdfsio _json _pyarrow_cpp_tests) set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 2ee97ddb662..7ede69da665 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -255,9 +255,8 @@ def print_entry(label, value): BufferReader, BufferOutputStream, OSFile, MemoryMappedFile, memory_map, create_memory_map, MockOutputStream, - input_stream, output_stream) - -from pyarrow._hdfsio import HdfsFile, have_libhdfs + input_stream, output_stream, + have_libhdfs) from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, concat_arrays, concat_tables, TableGroupBy, @@ -276,54 +275,12 @@ def print_entry(label, value): ArrowTypeError, ArrowSerializationError) -import pyarrow.hdfs as hdfs - from pyarrow.ipc import serialize_pandas, deserialize_pandas import pyarrow.ipc as ipc import pyarrow.types as types -# deprecated top-level access - - -from pyarrow.filesystem import FileSystem as _FileSystem -from pyarrow.filesystem import LocalFileSystem as _LocalFileSystem -from pyarrow.hdfs import HadoopFileSystem as _HadoopFileSystem - - -_localfs = _LocalFileSystem._get_instance() - - -_msg = ( - "pyarrow.{0} is deprecated as of 2.0.0, please use pyarrow.fs.{1} instead." -) - -_serialization_msg = ( - "'pyarrow.{0}' is deprecated and will be removed in a future version. " - "Use pickle or the pyarrow IPC functionality instead." -) - -_deprecated = { - "localfs": (_localfs, "LocalFileSystem"), - "FileSystem": (_FileSystem, "FileSystem"), - "LocalFileSystem": (_LocalFileSystem, "LocalFileSystem"), - "HadoopFileSystem": (_HadoopFileSystem, "HadoopFileSystem"), -} - - -def __getattr__(name): - if name in _deprecated: - obj, new_name = _deprecated[name] - _warnings.warn(_msg.format(name, new_name), - FutureWarning, stacklevel=2) - return obj - - raise AttributeError( - "module 'pyarrow' has no attribute '{0}'".format(name) - ) - - # ---------------------------------------------------------------------- # Deprecations diff --git a/python/pyarrow/_hdfsio.pyx b/python/pyarrow/_hdfsio.pyx deleted file mode 100644 index cbcc5d28ca9..00000000000 --- a/python/pyarrow/_hdfsio.pyx +++ /dev/null @@ -1,478 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# ---------------------------------------------------------------------- -# HDFS IO implementation - -# cython: language_level = 3 - -import re - -from pyarrow.lib cimport check_status, _Weakrefable, NativeFile -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport * -from pyarrow.includes.libarrow_fs cimport * -from pyarrow.lib import frombytes, tobytes, ArrowIOError - - -_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)') - - -def have_libhdfs(): - try: - with nogil: - check_status(HaveLibHdfs()) - return True - except Exception: - return False - - -def strip_hdfs_abspath(path): - m = _HDFS_PATH_RE.match(path) - if m: - return m.group(3) - else: - return path - - -cdef class HadoopFileSystem(_Weakrefable): - cdef: - shared_ptr[CIOHadoopFileSystem] client - - cdef readonly: - bint is_open - object host - object user - object kerb_ticket - int port - dict extra_conf - - def _connect(self, host, port, user, kerb_ticket, extra_conf): - cdef HdfsConnectionConfig conf - - if host is not None: - conf.host = tobytes(host) - self.host = host - - conf.port = port - self.port = port - - if user is not None: - conf.user = tobytes(user) - self.user = user - - if kerb_ticket is not None: - conf.kerb_ticket = tobytes(kerb_ticket) - self.kerb_ticket = kerb_ticket - - with nogil: - check_status(HaveLibHdfs()) - - if extra_conf is not None and isinstance(extra_conf, dict): - conf.extra_conf = {tobytes(k): tobytes(v) - for k, v in extra_conf.items()} - self.extra_conf = extra_conf - - with nogil: - check_status(CIOHadoopFileSystem.Connect(&conf, &self.client)) - self.is_open = True - - @classmethod - def connect(cls, *args, **kwargs): - return cls(*args, **kwargs) - - def __dealloc__(self): - if self.is_open: - self.close() - - def close(self): - """ - Disconnect from the HDFS cluster - """ - self._ensure_client() - with nogil: - check_status(self.client.get().Disconnect()) - self.is_open = False - - cdef _ensure_client(self): - if self.client.get() == NULL: - raise IOError('HDFS client improperly initialized') - elif not self.is_open: - raise IOError('HDFS client is closed') - - def exists(self, path): - """ - Returns True if the path is known to the cluster, False if it does not - (or there is an RPC error) - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - cdef c_bool result - with nogil: - result = self.client.get().Exists(c_path) - return result - - def isdir(self, path): - cdef HdfsPathInfo info - try: - self._path_info(path, &info) - except ArrowIOError: - return False - return info.kind == ObjectType_DIRECTORY - - def isfile(self, path): - cdef HdfsPathInfo info - try: - self._path_info(path, &info) - except ArrowIOError: - return False - return info.kind == ObjectType_FILE - - def get_capacity(self): - """ - Get reported total capacity of file system - - Returns - ------- - capacity : int - """ - cdef int64_t capacity = 0 - with nogil: - check_status(self.client.get().GetCapacity(&capacity)) - return capacity - - def get_space_used(self): - """ - Get space used on file system - - Returns - ------- - space_used : int - """ - cdef int64_t space_used = 0 - with nogil: - check_status(self.client.get().GetUsed(&space_used)) - return space_used - - def df(self): - """ - Return free space on disk, like the UNIX df command - - Returns - ------- - space : int - """ - return self.get_capacity() - self.get_space_used() - - def rename(self, path, new_path): - cdef c_string c_path = tobytes(path) - cdef c_string c_new_path = tobytes(new_path) - with nogil: - check_status(self.client.get().Rename(c_path, c_new_path)) - - def info(self, path): - """ - Return detailed HDFS information for path - - Parameters - ---------- - path : string - Path to file or directory - - Returns - ------- - path_info : dict - """ - cdef HdfsPathInfo info - self._path_info(path, &info) - return { - 'path': frombytes(info.name), - 'owner': frombytes(info.owner), - 'group': frombytes(info.group), - 'size': info.size, - 'block_size': info.block_size, - 'last_modified': info.last_modified_time, - 'last_accessed': info.last_access_time, - 'replication': info.replication, - 'permissions': info.permissions, - 'kind': ('directory' if info.kind == ObjectType_DIRECTORY - else 'file') - } - - def stat(self, path): - """ - Return basic file system statistics about path - - Parameters - ---------- - path : string - Path to file or directory - - Returns - ------- - stat : dict - """ - cdef FileStatistics info - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .Stat(c_path, &info)) - return { - 'size': info.size, - 'kind': ('directory' if info.kind == ObjectType_DIRECTORY - else 'file') - } - - cdef _path_info(self, path, HdfsPathInfo* info): - cdef c_string c_path = tobytes(path) - - with nogil: - check_status(self.client.get() - .GetPathInfo(c_path, info)) - - def ls(self, path, bint full_info): - cdef: - c_string c_path = tobytes(path) - vector[HdfsPathInfo] listing - list results = [] - int i - - self._ensure_client() - - with nogil: - check_status(self.client.get() - .ListDirectory(c_path, &listing)) - - cdef const HdfsPathInfo* info - for i in range( listing.size()): - info = &listing[i] - - # Try to trim off the hdfs://HOST:PORT piece - name = strip_hdfs_abspath(frombytes(info.name)) - - if full_info: - kind = ('file' if info.kind == ObjectType_FILE - else 'directory') - - results.append({ - 'kind': kind, - 'name': name, - 'owner': frombytes(info.owner), - 'group': frombytes(info.group), - 'last_modified_time': info.last_modified_time, - 'last_access_time': info.last_access_time, - 'size': info.size, - 'replication': info.replication, - 'block_size': info.block_size, - 'permissions': info.permissions - }) - else: - results.append(name) - - return results - - def chmod(self, path, mode): - """ - Change file permissions - - Parameters - ---------- - path : string - absolute path to file or directory - mode : int - POSIX-like bitmask - """ - self._ensure_client() - cdef c_string c_path = tobytes(path) - cdef int c_mode = mode - with nogil: - check_status(self.client.get() - .Chmod(c_path, c_mode)) - - def chown(self, path, owner=None, group=None): - """ - Change file permissions - - Parameters - ---------- - path : string - absolute path to file or directory - owner : string, default None - New owner, None for no change - group : string, default None - New group, None for no change - """ - cdef: - c_string c_path - c_string c_owner - c_string c_group - const char* c_owner_ptr = NULL - const char* c_group_ptr = NULL - - self._ensure_client() - - c_path = tobytes(path) - if owner is not None: - c_owner = tobytes(owner) - c_owner_ptr = c_owner.c_str() - - if group is not None: - c_group = tobytes(group) - c_group_ptr = c_group.c_str() - - with nogil: - check_status(self.client.get() - .Chown(c_path, c_owner_ptr, c_group_ptr)) - - def mkdir(self, path): - """ - Create indicated directory and any necessary parent directories - """ - self._ensure_client() - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .MakeDirectory(c_path)) - - def delete(self, path, bint recursive=False): - """ - Delete the indicated file or directory - - Parameters - ---------- - path : string - recursive : boolean, default False - If True, also delete child paths for directories - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .Delete(c_path, recursive == 1)) - - def open(self, path, mode='rb', buffer_size=None, replication=None, - default_block_size=None): - """ - Open HDFS file for reading or writing - - Parameters - ---------- - mode : string - Must be one of 'rb', 'wb', 'ab' - - Returns - ------- - handle : HdfsFile - """ - self._ensure_client() - - cdef HdfsFile out = HdfsFile() - - if mode not in ('rb', 'wb', 'ab'): - raise Exception("Mode must be 'rb' (read), " - "'wb' (write, new file), or 'ab' (append)") - - cdef c_string c_path = tobytes(path) - cdef c_bool append = False - - # 0 in libhdfs means "use the default" - cdef int32_t c_buffer_size = buffer_size or 0 - cdef int16_t c_replication = replication or 0 - cdef int64_t c_default_block_size = default_block_size or 0 - - cdef shared_ptr[HdfsOutputStream] wr_handle - cdef shared_ptr[HdfsReadableFile] rd_handle - - if mode in ('wb', 'ab'): - if mode == 'ab': - append = True - - with nogil: - check_status( - self.client.get() - .OpenWritable(c_path, append, c_buffer_size, - c_replication, c_default_block_size, - &wr_handle)) - - out.set_output_stream( wr_handle) - out.is_writable = True - else: - with nogil: - check_status(self.client.get() - .OpenReadable(c_path, &rd_handle)) - - out.set_random_access_file( - rd_handle) - out.is_readable = True - - assert not out.closed - - if c_buffer_size == 0: - c_buffer_size = 2 ** 16 - - out.mode = mode - out.buffer_size = c_buffer_size - out.parent = _HdfsFileNanny(self, out) - out.own_file = True - - return out - - def download(self, path, stream, buffer_size=None): - with self.open(path, 'rb') as f: - f.download(stream, buffer_size=buffer_size) - - def upload(self, path, stream, buffer_size=None): - """ - Upload file-like object to HDFS path - """ - with self.open(path, 'wb') as f: - f.upload(stream, buffer_size=buffer_size) - - -# ARROW-404: Helper class to ensure that files are closed before the -# client. During deallocation of the extension class, the attributes are -# decref'd which can cause the client to get closed first if the file has the -# last remaining reference -cdef class _HdfsFileNanny(_Weakrefable): - cdef: - object client - object file_handle_ref - - def __cinit__(self, client, file_handle): - import weakref - self.client = client - self.file_handle_ref = weakref.ref(file_handle) - - def __dealloc__(self): - fh = self.file_handle_ref() - if fh: - fh.close() - # avoid cyclic GC - self.file_handle_ref = None - self.client = None - - -cdef class HdfsFile(NativeFile): - cdef readonly: - int32_t buffer_size - object mode - object parent - - def __dealloc__(self): - self.parent = None diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py deleted file mode 100644 index c1e70a1ee69..00000000000 --- a/python/pyarrow/filesystem.py +++ /dev/null @@ -1,511 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -import os -import posixpath -import sys -import urllib.parse -import warnings - -from os.path import join as pjoin - -import pyarrow as pa -from pyarrow.util import doc, _stringify_path, _is_path_like, _DEPR_MSG - - -_FS_DEPR_MSG = _DEPR_MSG.format( - "filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem" -) - - -class FileSystem: - """ - Abstract filesystem interface. - """ - - def cat(self, path): - """ - Return contents of file as a bytes object. - - Parameters - ---------- - path : str - File path to read content from. - - Returns - ------- - contents : bytes - """ - with self.open(path, 'rb') as f: - return f.read() - - def ls(self, path): - """ - Return list of file paths. - - Parameters - ---------- - path : str - Directory to list contents from. - """ - raise NotImplementedError - - def delete(self, path, recursive=False): - """ - Delete the indicated file or directory. - - Parameters - ---------- - path : str - Path to delete. - recursive : bool, default False - If True, also delete child paths for directories. - """ - raise NotImplementedError - - def disk_usage(self, path): - """ - Compute bytes used by all contents under indicated path in file tree. - - Parameters - ---------- - path : str - Can be a file path or directory. - - Returns - ------- - usage : int - """ - path = _stringify_path(path) - path_info = self.stat(path) - if path_info['kind'] == 'file': - return path_info['size'] - - total = 0 - for root, directories, files in self.walk(path): - for child_path in files: - abspath = self._path_join(root, child_path) - total += self.stat(abspath)['size'] - - return total - - def _path_join(self, *args): - return self.pathsep.join(args) - - def stat(self, path): - """ - Information about a filesystem entry. - - Returns - ------- - stat : dict - """ - raise NotImplementedError('FileSystem.stat') - - def rm(self, path, recursive=False): - """ - Alias for FileSystem.delete. - """ - return self.delete(path, recursive=recursive) - - def mv(self, path, new_path): - """ - Alias for FileSystem.rename. - """ - return self.rename(path, new_path) - - def rename(self, path, new_path): - """ - Rename file, like UNIX mv command. - - Parameters - ---------- - path : str - Path to alter. - new_path : str - Path to move to. - """ - raise NotImplementedError('FileSystem.rename') - - def mkdir(self, path, create_parents=True): - """ - Create a directory. - - Parameters - ---------- - path : str - Path to the directory. - create_parents : bool, default True - If the parent directories don't exists create them as well. - """ - raise NotImplementedError - - def exists(self, path): - """ - Return True if path exists. - - Parameters - ---------- - path : str - Path to check. - """ - raise NotImplementedError - - def isdir(self, path): - """ - Return True if path is a directory. - - Parameters - ---------- - path : str - Path to check. - """ - raise NotImplementedError - - def isfile(self, path): - """ - Return True if path is a file. - - Parameters - ---------- - path : str - Path to check. - """ - raise NotImplementedError - - def _isfilestore(self): - """ - Returns True if this FileSystem is a unix-style file store with - directories. - """ - raise NotImplementedError - - def read_parquet(self, path, columns=None, metadata=None, schema=None, - use_threads=True, use_pandas_metadata=False): - """ - Read Parquet data from path in file system. Can read from a single file - or a directory of files. - - Parameters - ---------- - path : str - Single file path or directory - columns : List[str], optional - Subset of columns to read. - metadata : pyarrow.parquet.FileMetaData - Known metadata to validate files against. - schema : pyarrow.parquet.Schema - Known schema to validate files against. Alternative to metadata - argument. - use_threads : bool, default True - Perform multi-threaded column reads. - use_pandas_metadata : bool, default False - If True and file has custom pandas schema metadata, ensure that - index columns are also loaded. - - Returns - ------- - table : pyarrow.Table - """ - from pyarrow.parquet import ParquetDataset - dataset = ParquetDataset(path, schema=schema, metadata=metadata, - filesystem=self) - return dataset.read(columns=columns, use_threads=use_threads, - use_pandas_metadata=use_pandas_metadata) - - def open(self, path, mode='rb'): - """ - Open file for reading or writing. - """ - raise NotImplementedError - - @property - def pathsep(self): - return '/' - - -class LocalFileSystem(FileSystem): - - _instance = None - - def __init__(self): - warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) - super().__init__() - - @classmethod - def _get_instance(cls): - if cls._instance is None: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - cls._instance = LocalFileSystem() - return cls._instance - - @classmethod - def get_instance(cls): - warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2) - return cls._get_instance() - - @doc(FileSystem.ls) - def ls(self, path): - path = _stringify_path(path) - return sorted(pjoin(path, x) for x in os.listdir(path)) - - @doc(FileSystem.mkdir) - def mkdir(self, path, create_parents=True): - path = _stringify_path(path) - if create_parents: - os.makedirs(path) - else: - os.mkdir(path) - - @doc(FileSystem.isdir) - def isdir(self, path): - path = _stringify_path(path) - return os.path.isdir(path) - - @doc(FileSystem.isfile) - def isfile(self, path): - path = _stringify_path(path) - return os.path.isfile(path) - - @doc(FileSystem._isfilestore) - def _isfilestore(self): - return True - - @doc(FileSystem.exists) - def exists(self, path): - path = _stringify_path(path) - return os.path.exists(path) - - @doc(FileSystem.open) - def open(self, path, mode='rb'): - """ - Open file for reading or writing. - """ - path = _stringify_path(path) - return open(path, mode=mode) - - @property - def pathsep(self): - return os.path.sep - - def walk(self, path): - """ - Directory tree generator, see os.walk. - """ - path = _stringify_path(path) - return os.walk(path) - - -class DaskFileSystem(FileSystem): - """ - Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc. - """ - - def __init__(self, fs): - warnings.warn( - "The pyarrow.filesystem.DaskFileSystem/S3FSWrapper are deprecated " - "as of pyarrow 3.0.0, and will be removed in a future version.", - FutureWarning, stacklevel=2) - self.fs = fs - - @doc(FileSystem.isdir) - def isdir(self, path): - raise NotImplementedError("Unsupported file system API") - - @doc(FileSystem.isfile) - def isfile(self, path): - raise NotImplementedError("Unsupported file system API") - - @doc(FileSystem._isfilestore) - def _isfilestore(self): - """ - Object Stores like S3 and GCSFS are based on key lookups, not true - file-paths. - """ - return False - - @doc(FileSystem.delete) - def delete(self, path, recursive=False): - path = _stringify_path(path) - return self.fs.rm(path, recursive=recursive) - - @doc(FileSystem.exists) - def exists(self, path): - path = _stringify_path(path) - return self.fs.exists(path) - - @doc(FileSystem.mkdir) - def mkdir(self, path, create_parents=True): - path = _stringify_path(path) - if create_parents: - return self.fs.mkdirs(path) - else: - return self.fs.mkdir(path) - - @doc(FileSystem.open) - def open(self, path, mode='rb'): - """ - Open file for reading or writing. - """ - path = _stringify_path(path) - return self.fs.open(path, mode=mode) - - def ls(self, path, detail=False): - path = _stringify_path(path) - return self.fs.ls(path, detail=detail) - - def walk(self, path): - """ - Directory tree generator, like os.walk. - """ - path = _stringify_path(path) - return self.fs.walk(path) - - -class S3FSWrapper(DaskFileSystem): - - @doc(FileSystem.isdir) - def isdir(self, path): - path = _sanitize_s3(_stringify_path(path)) - try: - contents = self.fs.ls(path) - if len(contents) == 1 and contents[0] == path: - return False - else: - return True - except OSError: - return False - - @doc(FileSystem.isfile) - def isfile(self, path): - path = _sanitize_s3(_stringify_path(path)) - try: - contents = self.fs.ls(path) - return len(contents) == 1 and contents[0] == path - except OSError: - return False - - def walk(self, path, refresh=False): - """ - Directory tree generator, like os.walk. - - Generator version of what is in s3fs, which yields a flattened list of - files. - """ - path = _sanitize_s3(_stringify_path(path)) - directories = set() - files = set() - - for key in list(self.fs._ls(path, refresh=refresh)): - path = key['Key'] - if key['StorageClass'] == 'DIRECTORY': - directories.add(path) - elif key['StorageClass'] == 'BUCKET': - pass - else: - files.add(path) - - # s3fs creates duplicate 'DIRECTORY' entries - files = sorted([posixpath.split(f)[1] for f in files - if f not in directories]) - directories = sorted([posixpath.split(x)[1] - for x in directories]) - - yield path, directories, files - - for directory in directories: - yield from self.walk(directory, refresh=refresh) - - -def _sanitize_s3(path): - if path.startswith('s3://'): - return path.replace('s3://', '') - else: - return path - - -def _ensure_filesystem(fs): - fs_type = type(fs) - - # If the arrow filesystem was subclassed, assume it supports the full - # interface and return it - if not issubclass(fs_type, FileSystem): - if "fsspec" in sys.modules: - fsspec = sys.modules["fsspec"] - if isinstance(fs, fsspec.AbstractFileSystem): - # for recent fsspec versions that stop inheriting from - # pyarrow.filesystem.FileSystem, still allow fsspec - # filesystems (which should be compatible with our legacy fs) - return fs - - raise OSError('Unrecognized filesystem: {}'.format(fs_type)) - else: - return fs - - -def resolve_filesystem_and_path(where, filesystem=None): - """ - Return filesystem from path which could be an HDFS URI, a local URI, - or a plain filesystem path. - """ - if not _is_path_like(where): - if filesystem is not None: - raise ValueError("filesystem passed but where is file-like, so" - " there is nothing to open with filesystem.") - return filesystem, where - - if filesystem is not None: - filesystem = _ensure_filesystem(filesystem) - if isinstance(filesystem, LocalFileSystem): - path = _stringify_path(where) - elif not isinstance(where, str): - raise TypeError( - "Expected string path; path-like objects are only allowed " - "with a local filesystem" - ) - else: - path = where - return filesystem, path - - path = _stringify_path(where) - - parsed_uri = urllib.parse.urlparse(path) - if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs': - # Input is hdfs URI such as hdfs://host:port/myfile.parquet - netloc_split = parsed_uri.netloc.split(':') - host = netloc_split[0] - if host == '': - host = 'default' - else: - host = parsed_uri.scheme + "://" + host - port = 0 - if len(netloc_split) == 2 and netloc_split[1].isnumeric(): - port = int(netloc_split[1]) - fs = pa.hdfs._connect(host=host, port=port) - fs_path = parsed_uri.path - elif parsed_uri.scheme == 'file': - # Input is local URI such as file:///home/user/myfile.parquet - fs = LocalFileSystem._get_instance() - fs_path = parsed_uri.path - else: - # Input is local path such as /home/user/myfile.parquet - fs = LocalFileSystem._get_instance() - fs_path = path - - return fs, fs_path diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index ead750ca44e..a256cc540f7 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -98,9 +98,7 @@ def _filesystem_from_str(uri): return filesystem -def _ensure_filesystem( - filesystem, use_mmap=False, allow_legacy_filesystem=False -): +def _ensure_filesystem(filesystem, *, use_mmap=False): if isinstance(filesystem, FileSystem): return filesystem elif isinstance(filesystem, str): @@ -123,15 +121,6 @@ def _ensure_filesystem( return LocalFileSystem(use_mmap=use_mmap) return PyFileSystem(FSSpecHandler(filesystem)) - # map old filesystems to new ones - import pyarrow.filesystem as legacyfs - - if isinstance(filesystem, legacyfs.LocalFileSystem): - return LocalFileSystem(use_mmap=use_mmap) - # TODO handle HDFS? - if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem): - return filesystem - raise TypeError( "Unrecognized filesystem: {}. `filesystem` argument must be a " "FileSystem instance or a valid file system URI'".format( @@ -139,9 +128,7 @@ def _ensure_filesystem( ) -def _resolve_filesystem_and_path( - path, filesystem=None, allow_legacy_filesystem=False, memory_map=False -): +def _resolve_filesystem_and_path(path, filesystem=None, *, memory_map=False): """ Return filesystem/path from path which could be an URI or a plain filesystem path. @@ -155,10 +142,7 @@ def _resolve_filesystem_and_path( return filesystem, path if filesystem is not None: - filesystem = _ensure_filesystem( - filesystem, use_mmap=memory_map, - allow_legacy_filesystem=allow_legacy_filesystem - ) + filesystem = _ensure_filesystem(filesystem, use_mmap=memory_map) if isinstance(filesystem, LocalFileSystem): path = _stringify_path(path) elif not isinstance(path, str): @@ -166,8 +150,7 @@ def _resolve_filesystem_and_path( "Expected string path; path-like objects are only allowed " "with a local filesystem" ) - if not allow_legacy_filesystem: - path = filesystem.normalize_path(path) + path = filesystem.normalize_path(path) return filesystem, path path = _stringify_path(path) diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py deleted file mode 100644 index 2e6c387a8fd..00000000000 --- a/python/pyarrow/hdfs.py +++ /dev/null @@ -1,240 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -import os -import posixpath -import sys -import warnings - -from pyarrow.util import doc, _DEPR_MSG -from pyarrow.filesystem import FileSystem -import pyarrow._hdfsio as _hdfsio - - -class HadoopFileSystem(_hdfsio.HadoopFileSystem, FileSystem): - """ - DEPRECATED: FileSystem interface for HDFS cluster. - - See pyarrow.hdfs.connect for full connection details - - .. deprecated:: 2.0 - ``pyarrow.hdfs.HadoopFileSystem`` is deprecated, - please use ``pyarrow.fs.HadoopFileSystem`` instead. - """ - - def __init__(self, host="default", port=0, user=None, kerb_ticket=None, - driver='libhdfs', extra_conf=None): - warnings.warn( - _DEPR_MSG.format( - "hdfs.HadoopFileSystem", "2.0.0", "fs.HadoopFileSystem"), - FutureWarning, stacklevel=2) - if driver == 'libhdfs': - _maybe_set_hadoop_classpath() - - self._connect(host, port, user, kerb_ticket, extra_conf) - - def __reduce__(self): - return (HadoopFileSystem, (self.host, self.port, self.user, - self.kerb_ticket, self.extra_conf)) - - def _isfilestore(self): - """ - Return True if this is a Unix-style file store with directories. - """ - return True - - @doc(FileSystem.isdir) - def isdir(self, path): - return super().isdir(path) - - @doc(FileSystem.isfile) - def isfile(self, path): - return super().isfile(path) - - @doc(FileSystem.delete) - def delete(self, path, recursive=False): - return super().delete(path, recursive) - - def mkdir(self, path, **kwargs): - """ - Create directory in HDFS. - - Parameters - ---------- - path : str - Directory path to create, including any parent directories. - - Notes - ----- - libhdfs does not support create_parents=False, so we ignore this here - """ - return super().mkdir(path) - - @doc(FileSystem.rename) - def rename(self, path, new_path): - return super().rename(path, new_path) - - @doc(FileSystem.exists) - def exists(self, path): - return super().exists(path) - - def ls(self, path, detail=False): - """ - Retrieve directory contents and metadata, if requested. - - Parameters - ---------- - path : str - HDFS path to retrieve contents of. - detail : bool, default False - If False, only return list of paths. - - Returns - ------- - result : list of dicts (detail=True) or strings (detail=False) - """ - return super().ls(path, detail) - - def walk(self, top_path): - """ - Directory tree generator for HDFS, like os.walk. - - Parameters - ---------- - top_path : str - Root directory for tree traversal. - - Returns - ------- - Generator yielding 3-tuple (dirpath, dirnames, filename) - """ - contents = self.ls(top_path, detail=True) - - directories, files = _libhdfs_walk_files_dirs(top_path, contents) - yield top_path, directories, files - for dirname in directories: - yield from self.walk(self._path_join(top_path, dirname)) - - -def _maybe_set_hadoop_classpath(): - import re - - if re.search(r'hadoop-common[^/]+.jar', os.environ.get('CLASSPATH', '')): - return - - if 'HADOOP_HOME' in os.environ: - if sys.platform != 'win32': - classpath = _derive_hadoop_classpath() - else: - hadoop_bin = '{}/bin/hadoop'.format(os.environ['HADOOP_HOME']) - classpath = _hadoop_classpath_glob(hadoop_bin) - else: - classpath = _hadoop_classpath_glob('hadoop') - - os.environ['CLASSPATH'] = classpath.decode('utf-8') - - -def _derive_hadoop_classpath(): - import subprocess - - find_args = ('find', '-L', os.environ['HADOOP_HOME'], '-name', '*.jar') - find = subprocess.Popen(find_args, stdout=subprocess.PIPE) - xargs_echo = subprocess.Popen(('xargs', 'echo'), - stdin=find.stdout, - stdout=subprocess.PIPE) - jars = subprocess.check_output(('tr', "' '", "':'"), - stdin=xargs_echo.stdout) - hadoop_conf = os.environ["HADOOP_CONF_DIR"] \ - if "HADOOP_CONF_DIR" in os.environ \ - else os.environ["HADOOP_HOME"] + "/etc/hadoop" - return (hadoop_conf + ":").encode("utf-8") + jars - - -def _hadoop_classpath_glob(hadoop_bin): - import subprocess - - hadoop_classpath_args = (hadoop_bin, 'classpath', '--glob') - return subprocess.check_output(hadoop_classpath_args) - - -def _libhdfs_walk_files_dirs(top_path, contents): - files = [] - directories = [] - for c in contents: - scrubbed_name = posixpath.split(c['name'])[1] - if c['kind'] == 'file': - files.append(scrubbed_name) - else: - directories.append(scrubbed_name) - - return directories, files - - -def connect(host="default", port=0, user=None, kerb_ticket=None, - extra_conf=None): - """ - DEPRECATED: Connect to an HDFS cluster. - - All parameters are optional and should only be set if the defaults need - to be overridden. - - Authentication should be automatic if the HDFS cluster uses Kerberos. - However, if a username is specified, then the ticket cache will likely - be required. - - .. deprecated:: 2.0 - ``pyarrow.hdfs.connect`` is deprecated, - please use ``pyarrow.fs.HadoopFileSystem`` instead. - - Parameters - ---------- - host : NameNode. Set to "default" for fs.defaultFS from core-site.xml. - port : NameNode's port. Set to 0 for default or logical (HA) nodes. - user : Username when connecting to HDFS; None implies login user. - kerb_ticket : Path to Kerberos ticket cache. - extra_conf : dict, default None - extra Key/Value pairs for config; Will override any - hdfs-site.xml properties - - Notes - ----- - The first time you call this method, it will take longer than usual due - to JNI spin-up time. - - Returns - ------- - filesystem : HadoopFileSystem - """ - warnings.warn( - _DEPR_MSG.format("hdfs.connect", "2.0.0", "fs.HadoopFileSystem"), - FutureWarning, stacklevel=2 - ) - return _connect( - host=host, port=port, user=user, kerb_ticket=kerb_ticket, - extra_conf=extra_conf - ) - - -def _connect(host="default", port=0, user=None, kerb_ticket=None, - extra_conf=None): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - fs = HadoopFileSystem(host=host, port=port, user=user, - kerb_ticket=kerb_ticket, - extra_conf=extra_conf) - return fs diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index b57980b3d68..7890bf4b2dd 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -30,6 +30,7 @@ import warnings from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation from queue import Queue, Empty as QueueEmpty +from pyarrow.lib cimport check_status, HaveLibHdfs from pyarrow.util import _is_path_like, _stringify_path @@ -46,6 +47,18 @@ cdef extern from "Python.h": bytearray PyByteArray_FromStringAndSize(char *string, Py_ssize_t len) +def have_libhdfs(): + """ + Return true if HDFS (HadoopFileSystem) library is set up correctly. + """ + try: + with nogil: + check_status(HaveLibHdfs()) + return True + except Exception: + return False + + def io_thread_count(): """ Return the number of threads to use for I/O operations. diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 98a4b2a1138..69a1c9d19aa 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -47,7 +47,6 @@ SortingColumn) from pyarrow.fs import (LocalFileSystem, FileSystem, FileType, _resolve_filesystem_and_path, _ensure_filesystem) -from pyarrow import filesystem as legacyfs from pyarrow.util import guid, _is_path_like, _stringify_path, _deprecate_api @@ -309,7 +308,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, self._close_source = getattr(source, 'closed', True) filesystem, source = _resolve_filesystem_and_path( - source, filesystem, memory_map) + source, filesystem, memory_map=memory_map) if filesystem is not None: source = filesystem.open_input_file(source) self._close_source = True # We opened it here, ensure we close it. @@ -989,20 +988,13 @@ def __init__(self, where, schema, filesystem=None, # sure to close it when `self.close` is called. self.file_handle = None - filesystem, path = _resolve_filesystem_and_path( - where, filesystem, allow_legacy_filesystem=True - ) + filesystem, path = _resolve_filesystem_and_path(where, filesystem) if filesystem is not None: - if isinstance(filesystem, legacyfs.FileSystem): - # legacy filesystem (eg custom subclass) - # TODO deprecate - sink = self.file_handle = filesystem.open(path, 'wb') - else: - # ARROW-10480: do not auto-detect compression. While - # a filename like foo.parquet.gz is nonconforming, it - # shouldn't implicitly apply compression. - sink = self.file_handle = filesystem.open_output_stream( - path, compression=None) + # ARROW-10480: do not auto-detect compression. While + # a filename like foo.parquet.gz is nonconforming, it + # shouldn't implicitly apply compression. + sink = self.file_handle = filesystem.open_output_stream( + path, compression=None) else: sink = where self._metadata_collector = options.pop('metadata_collector', None) @@ -1124,12 +1116,6 @@ def _get_pandas_index_columns(keyvalues): EXCLUDED_PARQUET_PATHS = {'_SUCCESS'} -def _is_local_file_system(fs): - return isinstance(fs, LocalFileSystem) or isinstance( - fs, legacyfs.LocalFileSystem - ) - - _read_docstring_common = """\ read_dictionary : list, default None List of names or column paths (for nested types) to read directly @@ -1306,7 +1292,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None, if ( hasattr(path_or_paths, "__fspath__") and filesystem is not None and - not _is_local_file_system(filesystem) + not isinstance(filesystem, LocalFileSystem) ): raise TypeError( "Path-like objects with __fspath__ must only be used with " diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 3c867776ac0..bc21d709ec2 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -25,7 +25,6 @@ import pyarrow as pa from pyarrow import fs -from pyarrow.filesystem import LocalFileSystem, FileSystem from pyarrow.tests import util from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table, _test_dataframe) @@ -259,11 +258,11 @@ def test_fspath(tempdir): # combined with non-local filesystem raises with pytest.raises(TypeError): - _read_table(fs_protocol_obj, filesystem=FileSystem()) + _read_table(fs_protocol_obj, filesystem=fs.FileSystem()) @pytest.mark.parametrize("filesystem", [ - None, fs.LocalFileSystem(), LocalFileSystem._get_instance() + None, fs.LocalFileSystem() ]) @pytest.mark.parametrize("name", ("data.parquet", "δΎ‹.parquet")) def test_relative_paths(tempdir, filesystem, name): diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index b6e351bdef9..30dae05124f 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -26,11 +26,10 @@ import pyarrow as pa import pyarrow.compute as pc -from pyarrow import fs -from pyarrow.filesystem import LocalFileSystem +from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem, + PyFileSystem, SubTreeFileSystem, FSSpecHandler) from pyarrow.tests import util from pyarrow.util import guid -from pyarrow.vendored.version import Version try: import pyarrow.parquet as pq @@ -63,7 +62,7 @@ def test_filesystem_uri(tempdir): # filesystem object result = pq.read_table( - path, filesystem=fs.LocalFileSystem()) + path, filesystem=LocalFileSystem()) assert result.equals(table) # filesystem URI @@ -74,17 +73,17 @@ def test_filesystem_uri(tempdir): @pytest.mark.pandas def test_read_partitioned_directory(tempdir): - fs = LocalFileSystem._get_instance() - _partition_test_for_filesystem(fs, tempdir) + local = LocalFileSystem() + _partition_test_for_filesystem(local, tempdir) @pytest.mark.pandas def test_read_partitioned_columns_selection(tempdir): # ARROW-3861 - do not include partition columns in resulting table when # `columns` keyword was passed without those columns - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir - _partition_test_for_filesystem(fs, base_path) + _partition_test_for_filesystem(local, base_path) dataset = pq.ParquetDataset(base_path) result = dataset.read(columns=["values"]) @@ -93,7 +92,7 @@ def test_read_partitioned_columns_selection(tempdir): @pytest.mark.pandas def test_filters_equivalency(tempdir): - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1] @@ -112,12 +111,12 @@ def test_filters_equivalency(tempdir): 3), }, columns=['integer', 'string', 'boolean']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) # Old filters syntax: # integer == 1 AND string != b AND boolean == True dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[('integer', '=', 1), ('string', '!=', 'b'), ('boolean', '==', 'True')], ) @@ -141,7 +140,7 @@ def test_filters_equivalency(tempdir): [('integer', '=', 0), ('boolean', '==', 'False')] ] dataset = pq.ParquetDataset( - base_path, filesystem=fs, filters=filters) + base_path, filesystem=local, filters=filters) table = dataset.read() result_df = table.to_pandas().reset_index(drop=True) @@ -158,13 +157,13 @@ def test_filters_equivalency(tempdir): for filters in [[[('string', '==', b'1\0a')]], [[('string', '==', '1\0a')]]]: dataset = pq.ParquetDataset( - base_path, filesystem=fs, filters=filters) + base_path, filesystem=local, filters=filters) assert dataset.read().num_rows == 0 @pytest.mark.pandas def test_filters_cutoff_exclusive_integer(tempdir): - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -178,10 +177,10 @@ def test_filters_cutoff_exclusive_integer(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[ ('integers', '<', 4), ('integers', '>', 1), @@ -204,7 +203,7 @@ def test_filters_cutoff_exclusive_integer(tempdir): ) @pytest.mark.pandas def test_filters_cutoff_exclusive_datetime(tempdir): - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir date_keys = [ @@ -224,10 +223,10 @@ def test_filters_cutoff_exclusive_datetime(tempdir): 'dates': np.array(date_keys, dtype='datetime64'), }, columns=['index', 'dates']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[ ('dates', '<', "2018-04-12"), ('dates', '>', "2018-04-10") @@ -264,7 +263,7 @@ def test_filters_inclusive_datetime(tempdir): @pytest.mark.pandas def test_filters_inclusive_integer(tempdir): - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -278,10 +277,10 @@ def test_filters_inclusive_integer(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[ ('integers', '<=', 3), ('integers', '>=', 2), @@ -298,7 +297,7 @@ def test_filters_inclusive_integer(tempdir): @pytest.mark.pandas def test_filters_inclusive_set(tempdir): - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1] @@ -317,10 +316,10 @@ def test_filters_inclusive_set(tempdir): 3), }, columns=['integer', 'string', 'boolean']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[('string', 'in', 'ab')], ) table = dataset.read() @@ -331,7 +330,7 @@ def test_filters_inclusive_set(tempdir): assert 'c' not in result_df['string'].values dataset = pq.ParquetDataset( - base_path, filesystem=fs, + base_path, filesystem=local, filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')), ('boolean', 'not in', {'False'})], ) @@ -345,7 +344,7 @@ def test_filters_inclusive_set(tempdir): @pytest.mark.pandas def test_filters_invalid_pred_op(tempdir): - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -359,26 +358,26 @@ def test_filters_invalid_pred_op(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) with pytest.raises(TypeError): pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', 'in', 3), ]) with pytest.raises(ValueError): pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', '=<', 3), ]) # Dataset API returns empty table dataset = pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', 'in', set()), ]) assert dataset.read().num_rows == 0 dataset = pq.ParquetDataset(base_path, - filesystem=fs, + filesystem=local, filters=[('integers', '!=', {3})]) with pytest.raises(NotImplementedError): assert dataset.read().num_rows == 0 @@ -388,7 +387,7 @@ def test_filters_invalid_pred_op(tempdir): def test_filters_invalid_column(tempdir): # ARROW-5572 - raise error on invalid name in filter specification # works with new dataset - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -400,11 +399,11 @@ def test_filters_invalid_column(tempdir): 'integers': np.array(integer_keys, dtype='i4'), }, columns=['index', 'integers']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) msg = r"No match for FieldRef.Name\(non_existent_column\)" with pytest.raises(ValueError, match=msg): - pq.ParquetDataset(base_path, filesystem=fs, + pq.ParquetDataset(base_path, filesystem=local, filters=[('non_existent_column', '<', 3), ]).read() @@ -419,7 +418,7 @@ def test_filters_invalid_column(tempdir): def test_filters_read_table(tempdir, filters, read_method): read = getattr(pq, read_method) # test that filters keyword is passed through in read_table - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir integer_keys = [0, 1, 2, 3, 4] @@ -434,9 +433,9 @@ def test_filters_read_table(tempdir, filters, read_method): 'nested': np.array([{'a': i, 'b': str(i)} for i in range(N)]) }) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) - kwargs = dict(filesystem=fs, filters=filters) + kwargs = dict(filesystem=local, filters=filters) table = read(base_path, **kwargs) assert table.num_rows == 3 @@ -445,7 +444,7 @@ def test_filters_read_table(tempdir, filters, read_method): @pytest.mark.pandas def test_partition_keys_with_underscores(tempdir): # ARROW-5666 - partition field values with underscores preserve underscores - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() base_path = tempdir string_keys = ["2019_2", "2019_3"] @@ -459,7 +458,7 @@ def test_partition_keys_with_underscores(tempdir): 'year_week': np.array(string_keys, dtype='object'), }, columns=['index', 'year_week']) - _generate_partition_directories(fs, base_path, partition_spec, df) + _generate_partition_directories(local, base_path, partition_spec, df) dataset = pq.ParquetDataset(base_path) result = dataset.read() @@ -499,26 +498,6 @@ def test_read_single_file_list(tempdir): assert result.equals(table) -@pytest.mark.pandas -@pytest.mark.s3 -def test_read_partitioned_directory_s3fs_wrapper(s3_example_s3fs): - import s3fs - - from pyarrow.filesystem import S3FSWrapper - - if Version(s3fs.__version__) >= Version("0.5"): - pytest.skip("S3FSWrapper no longer working for s3fs 0.5+") - - fs, path = s3_example_s3fs - with pytest.warns(FutureWarning): - wrapper = S3FSWrapper(fs) - _partition_test_for_filesystem(wrapper, path) - - # Check that we can auto-wrap - dataset = pq.ParquetDataset(path, filesystem=fs) - dataset.read() - - @pytest.mark.pandas @pytest.mark.s3 def test_read_partitioned_directory_s3fs(s3_example_s3fs): @@ -569,6 +548,9 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df): # partition_spec : list of lists, e.g. [['foo', [0, 1, 2], # ['bar', ['a', 'b', 'c']] # part_table : a pyarrow.Table to write to each partition + if not isinstance(fs, FileSystem): + fs = PyFileSystem(FSSpecHandler(fs)) + DEPTH = len(partition_spec) pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/")) @@ -582,24 +564,27 @@ def _visit_level(base_dir, level, part_keys): str(base_dir), '{}={}'.format(name, value) ]) - fs.mkdir(level_dir) + fs.create_dir(level_dir) if level == DEPTH - 1: # Generate example data + from pyarrow.fs import FileType + file_path = pathsep.join([level_dir, guid()]) filtered_df = _filter_partition(df, this_part_keys) part_table = pa.Table.from_pandas(filtered_df) - with fs.open(file_path, 'wb') as f: + with fs.open_output_stream(file_path) as f: _write_table(part_table, f) - assert fs.exists(file_path) + assert fs.get_file_info(file_path).type != FileType.NotFound + assert fs.get_file_info(file_path).type == FileType.File file_success = pathsep.join([level_dir, '_SUCCESS']) - with fs.open(file_success, 'wb') as f: + with fs.open_output_stream(file_success) as f: pass else: _visit_level(level_dir, level + 1, this_part_keys) file_success = pathsep.join([level_dir, '_SUCCESS']) - with fs.open(file_success, 'wb') as f: + with fs.open_output_stream(file_success) as f: pass _visit_level(base_dir, 0, []) @@ -1009,15 +994,21 @@ def _test_write_to_dataset_no_partitions(base_path, output_table = pa.Table.from_pandas(output_df) if filesystem is None: - filesystem = LocalFileSystem._get_instance() + filesystem = LocalFileSystem() + elif not isinstance(filesystem, FileSystem): + filesystem = PyFileSystem(FSSpecHandler(filesystem)) # Without partitions, append files to root_path n = 5 for i in range(n): pq.write_to_dataset(output_table, base_path, filesystem=filesystem) - output_files = [file for file in filesystem.ls(str(base_path)) - if file.endswith(".parquet")] + + selector = FileSelector(str(base_path), allow_not_found=False, + recursive=True) + + infos = filesystem.get_file_info(selector) + output_files = [info for info in infos if info.path.endswith(".parquet")] assert len(output_files) == n # Deduplicated incoming DataFrame should match @@ -1103,14 +1094,14 @@ def test_write_to_dataset_filesystem(tempdir): table = pa.Table.from_pandas(df) path = str(tempdir) - pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem()) + pq.write_to_dataset(table, path, filesystem=LocalFileSystem()) result = pq.read_table(path) assert result.equals(table) def _make_dataset_for_pickling(tempdir, N=100): path = tempdir / 'data.parquet' - fs = LocalFileSystem._get_instance() + local = LocalFileSystem() df = pd.DataFrame({ 'index': np.arange(N), @@ -1127,11 +1118,11 @@ def _make_dataset_for_pickling(tempdir, N=100): assert reader.metadata.num_row_groups == num_groups metadata_path = tempdir / '_metadata' - with fs.open(metadata_path, 'wb') as f: + with local.open_output_stream(str(metadata_path)) as f: pq.write_metadata(table.schema, f) dataset = pq.ParquetDataset( - tempdir, filesystem=fs) + tempdir, filesystem=local) return dataset @@ -1249,7 +1240,7 @@ def test_parquet_dataset_new_filesystem(tempdir): # Ensure we can pass new FileSystem object to ParquetDataset table = pa.table({'a': [1, 2, 3]}) pq.write_table(table, tempdir / 'data.parquet') - filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem()) + filesystem = SubTreeFileSystem(str(tempdir), LocalFileSystem()) dataset = pq.ParquetDataset('.', filesystem=filesystem) result = dataset.read() assert result.equals(table) diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index 16584684f5c..f4ee7529ae8 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -19,7 +19,6 @@ import pyarrow as pa from pyarrow import fs -from pyarrow.filesystem import FileSystem, LocalFileSystem try: import pyarrow.parquet as pq @@ -161,7 +160,6 @@ def test_parquet_writer_context_obj_with_exception(tempdir): @pytest.mark.pandas @pytest.mark.parametrize("filesystem", [ None, - LocalFileSystem._get_instance(), fs.LocalFileSystem(), ]) def test_parquet_writer_write_wrappers(tempdir, filesystem): @@ -250,7 +248,6 @@ def check_chunk_size(data_size, chunk_size, expect_num_chunks): @pytest.mark.pandas @pytest.mark.parametrize("filesystem", [ None, - LocalFileSystem._get_instance(), fs.LocalFileSystem(), ]) def test_parquet_writer_filesystem_local(tempdir, filesystem): @@ -330,46 +327,6 @@ def test_parquet_writer_filesystem_buffer_raises(): ) -@pytest.mark.pandas -def test_parquet_writer_with_caller_provided_filesystem(): - out = pa.BufferOutputStream() - - class CustomFS(FileSystem): - def __init__(self): - self.path = None - self.mode = None - - def open(self, path, mode='rb'): - self.path = path - self.mode = mode - return out - - fs = CustomFS() - fname = 'expected_fname.parquet' - df = _test_dataframe(100) - table = pa.Table.from_pandas(df, preserve_index=False) - - with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.6') \ - as writer: - writer.write_table(table) - - assert fs.path == fname - assert fs.mode == 'wb' - assert out.closed - - buf = out.getvalue() - table_read = _read_table(pa.BufferReader(buf)) - df_read = table_read.to_pandas() - tm.assert_frame_equal(df_read, df) - - # Should raise ValueError when filesystem is passed with file-like object - with pytest.raises(ValueError) as err_info: - pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs) - expected_msg = ("filesystem passed but where is file-like, so" - " there is nothing to open with filesystem.") - assert str(err_info) == expected_msg - - def test_parquet_writer_store_schema(tempdir): table = pa.table({'a': [1, 2, 3]}) diff --git a/python/pyarrow/tests/test_filesystem.py b/python/pyarrow/tests/test_filesystem.py deleted file mode 100644 index 9862c5990d7..00000000000 --- a/python/pyarrow/tests/test_filesystem.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import pyarrow as pa -from pyarrow import filesystem - -import os -import pytest - - -def test_filesystem_deprecated(): - with pytest.warns(FutureWarning): - filesystem.LocalFileSystem() - - with pytest.warns(FutureWarning): - filesystem.LocalFileSystem.get_instance() - - -def test_filesystem_deprecated_toplevel(): - with pytest.warns(FutureWarning): - pa.localfs - - with pytest.warns(FutureWarning): - pa.FileSystem - - with pytest.warns(FutureWarning): - pa.LocalFileSystem - - with pytest.warns(FutureWarning): - pa.HadoopFileSystem - - -def test_resolve_uri(): - uri = "file:///home/user/myfile.parquet" - fs, path = filesystem.resolve_filesystem_and_path(uri) - assert isinstance(fs, filesystem.LocalFileSystem) - assert path == "/home/user/myfile.parquet" - - -def test_resolve_local_path(): - for uri in ['/home/user/myfile.parquet', - 'myfile.parquet', - 'my # file ? parquet', - 'C:/Windows/myfile.parquet', - r'C:\\Windows\\myfile.parquet', - ]: - fs, path = filesystem.resolve_filesystem_and_path(uri) - assert isinstance(fs, filesystem.LocalFileSystem) - assert path == uri - - -@pytest.mark.filterwarnings("ignore:pyarrow.filesystem.LocalFileSystem") -def test_resolve_home_directory(): - uri = '~/myfile.parquet' - fs, path = filesystem.resolve_filesystem_and_path(uri) - assert isinstance(fs, filesystem.LocalFileSystem) - assert path == os.path.expanduser(uri) - - local_fs = filesystem.LocalFileSystem() - fs, path = filesystem.resolve_filesystem_and_path(uri, local_fs) - assert path == os.path.expanduser(uri) diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py deleted file mode 100644 index 5b94c200f35..00000000000 --- a/python/pyarrow/tests/test_hdfs.py +++ /dev/null @@ -1,451 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import os -import random -from io import BytesIO -from os.path import join as pjoin - -import numpy as np -import pytest - -import pyarrow as pa -from pyarrow.tests import util -from pyarrow.tests.parquet.common import _test_dataframe -from pyarrow.tests.parquet.test_dataset import ( - _test_write_to_dataset_with_partitions, - _test_write_to_dataset_no_partitions -) -from pyarrow.util import guid - -try: - from pandas.testing import assert_frame_equal -except ImportError: - pass - - -# ---------------------------------------------------------------------- -# HDFS tests - - -def check_libhdfs_present(): - if not pa.have_libhdfs(): - message = 'No libhdfs available on system' - if os.environ.get('PYARROW_HDFS_TEST_LIBHDFS_REQUIRE'): - pytest.fail(message) - else: - pytest.skip(message) - - -def hdfs_test_client(): - host = os.environ.get('ARROW_HDFS_TEST_HOST', 'default') - user = os.environ.get('ARROW_HDFS_TEST_USER', None) - try: - port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0)) - except ValueError: - raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' - 'an integer') - - with pytest.warns(FutureWarning): - return pa.hdfs.connect(host, port, user) - - -@pytest.mark.hdfs -class HdfsTestCases: - - def _make_test_file(self, hdfs, test_name, test_path, test_data): - base_path = pjoin(self.tmp_path, test_name) - hdfs.mkdir(base_path) - - full_path = pjoin(base_path, test_path) - - with hdfs.open(full_path, 'wb') as f: - f.write(test_data) - - return full_path - - @classmethod - def setup_class(cls): - cls.check_driver() - cls.hdfs = hdfs_test_client() - cls.tmp_path = '/tmp/pyarrow-test-{}'.format(random.randint(0, 1000)) - cls.hdfs.mkdir(cls.tmp_path) - - @classmethod - def teardown_class(cls): - cls.hdfs.delete(cls.tmp_path, recursive=True) - cls.hdfs.close() - - def test_pickle(self, pickle_module): - s = pickle_module.dumps(self.hdfs) - h2 = pickle_module.loads(s) - assert h2.is_open - assert h2.host == self.hdfs.host - assert h2.port == self.hdfs.port - assert h2.user == self.hdfs.user - assert h2.kerb_ticket == self.hdfs.kerb_ticket - # smoketest unpickled client works - h2.ls(self.tmp_path) - - def test_cat(self): - path = pjoin(self.tmp_path, 'cat-test') - - data = b'foobarbaz' - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - contents = self.hdfs.cat(path) - assert contents == data - - def test_capacity_space(self): - capacity = self.hdfs.get_capacity() - space_used = self.hdfs.get_space_used() - disk_free = self.hdfs.df() - - assert capacity > 0 - assert capacity > space_used - assert disk_free == (capacity - space_used) - - def test_close(self): - client = hdfs_test_client() - assert client.is_open - client.close() - assert not client.is_open - - with pytest.raises(Exception): - client.ls('/') - - def test_mkdir(self): - path = pjoin(self.tmp_path, 'test-dir/test-dir') - parent_path = pjoin(self.tmp_path, 'test-dir') - - self.hdfs.mkdir(path) - assert self.hdfs.exists(path) - - self.hdfs.delete(parent_path, recursive=True) - assert not self.hdfs.exists(path) - - def test_mv_rename(self): - path = pjoin(self.tmp_path, 'mv-test') - new_path = pjoin(self.tmp_path, 'mv-new-test') - - data = b'foobarbaz' - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - assert self.hdfs.exists(path) - self.hdfs.mv(path, new_path) - assert not self.hdfs.exists(path) - assert self.hdfs.exists(new_path) - - assert self.hdfs.cat(new_path) == data - - self.hdfs.rename(new_path, path) - assert self.hdfs.cat(path) == data - - def test_info(self): - path = pjoin(self.tmp_path, 'info-base') - file_path = pjoin(path, 'ex') - self.hdfs.mkdir(path) - - data = b'foobarbaz' - with self.hdfs.open(file_path, 'wb') as f: - f.write(data) - - path_info = self.hdfs.info(path) - file_path_info = self.hdfs.info(file_path) - - assert path_info['kind'] == 'directory' - - assert file_path_info['kind'] == 'file' - assert file_path_info['size'] == len(data) - - def test_exists_isdir_isfile(self): - dir_path = pjoin(self.tmp_path, 'info-base') - file_path = pjoin(dir_path, 'ex') - missing_path = pjoin(dir_path, 'this-path-is-missing') - - self.hdfs.mkdir(dir_path) - with self.hdfs.open(file_path, 'wb') as f: - f.write(b'foobarbaz') - - assert self.hdfs.exists(dir_path) - assert self.hdfs.exists(file_path) - assert not self.hdfs.exists(missing_path) - - assert self.hdfs.isdir(dir_path) - assert not self.hdfs.isdir(file_path) - assert not self.hdfs.isdir(missing_path) - - assert not self.hdfs.isfile(dir_path) - assert self.hdfs.isfile(file_path) - assert not self.hdfs.isfile(missing_path) - - def test_disk_usage(self): - path = pjoin(self.tmp_path, 'disk-usage-base') - p1 = pjoin(path, 'p1') - p2 = pjoin(path, 'p2') - - subdir = pjoin(path, 'subdir') - p3 = pjoin(subdir, 'p3') - - if self.hdfs.exists(path): - self.hdfs.delete(path, True) - - self.hdfs.mkdir(path) - self.hdfs.mkdir(subdir) - - data = b'foobarbaz' - - for file_path in [p1, p2, p3]: - with self.hdfs.open(file_path, 'wb') as f: - f.write(data) - - assert self.hdfs.disk_usage(path) == len(data) * 3 - - def test_ls(self): - base_path = pjoin(self.tmp_path, 'ls-test') - self.hdfs.mkdir(base_path) - - dir_path = pjoin(base_path, 'a-dir') - f1_path = pjoin(base_path, 'a-file-1') - - self.hdfs.mkdir(dir_path) - - f = self.hdfs.open(f1_path, 'wb') - f.write(b'a' * 10) - - contents = sorted(self.hdfs.ls(base_path, False)) - assert contents == [dir_path, f1_path] - - def test_chmod_chown(self): - path = pjoin(self.tmp_path, 'chmod-test') - with self.hdfs.open(path, 'wb') as f: - f.write(b'a' * 10) - - def test_download_upload(self): - base_path = pjoin(self.tmp_path, 'upload-test') - - data = b'foobarbaz' - buf = BytesIO(data) - buf.seek(0) - - self.hdfs.upload(base_path, buf) - - out_buf = BytesIO() - self.hdfs.download(base_path, out_buf) - out_buf.seek(0) - assert out_buf.getvalue() == data - - def test_file_context_manager(self): - path = pjoin(self.tmp_path, 'ctx-manager') - - data = b'foo' - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - with self.hdfs.open(path, 'rb') as f: - assert f.size() == 3 - result = f.read(10) - assert result == data - - def test_open_not_exist(self): - path = pjoin(self.tmp_path, 'does-not-exist-123') - - with pytest.raises(FileNotFoundError): - self.hdfs.open(path) - - def test_open_write_error(self): - with pytest.raises((FileExistsError, IsADirectoryError)): - self.hdfs.open('/', 'wb') - - def test_read_whole_file(self): - path = pjoin(self.tmp_path, 'read-whole-file') - - data = b'foo' * 1000 - with self.hdfs.open(path, 'wb') as f: - f.write(data) - - with self.hdfs.open(path, 'rb') as f: - result = f.read() - - assert result == data - - def _write_multiple_hdfs_pq_files(self, tmpdir): - import pyarrow.parquet as pq - nfiles = 10 - size = 5 - test_data = [] - for i in range(nfiles): - df = _test_dataframe(size, seed=i) - - df['index'] = np.arange(i * size, (i + 1) * size) - - # Hack so that we don't have a dtype cast in v1 files - df['uint32'] = df['uint32'].astype(np.int64) - - path = pjoin(tmpdir, '{}.parquet'.format(i)) - - table = pa.Table.from_pandas(df, preserve_index=False) - with self.hdfs.open(path, 'wb') as f: - pq.write_table(table, f) - - test_data.append(table) - - expected = pa.concat_tables(test_data) - return expected - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.pandas - @pytest.mark.parquet - def test_read_multiple_parquet_files(self): - - tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid()) - - self.hdfs.mkdir(tmpdir) - - expected = self._write_multiple_hdfs_pq_files(tmpdir) - result = self.hdfs.read_parquet(tmpdir) - - assert_frame_equal( - result.to_pandas().sort_values(by='index').reset_index(drop=True), - expected.to_pandas() - ) - - @pytest.mark.pandas - @pytest.mark.parquet - def test_read_multiple_parquet_files_with_uri(self): - import pyarrow.parquet as pq - - tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid()) - - self.hdfs.mkdir(tmpdir) - - expected = self._write_multiple_hdfs_pq_files(tmpdir) - path = _get_hdfs_uri(tmpdir) - result = pq.read_table(path) - - assert_frame_equal( - result.to_pandas().sort_values(by='index').reset_index(drop=True), - expected.to_pandas() - ) - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.pandas - @pytest.mark.parquet - def test_read_write_parquet_files_with_uri(self): - import pyarrow.parquet as pq - - tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid()) - self.hdfs.mkdir(tmpdir) - path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet')) - - size = 5 - df = _test_dataframe(size, seed=0) - # Hack so that we don't have a dtype cast in v1 files - df['uint32'] = df['uint32'].astype(np.int64) - table = pa.Table.from_pandas(df, preserve_index=False) - - pq.write_table(table, path, filesystem=self.hdfs) - - result = pq.read_table(path, filesystem=self.hdfs).to_pandas() - - assert_frame_equal(result, df) - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.parquet - @pytest.mark.pandas - def test_write_to_dataset_with_partitions(self): - tmpdir = pjoin(self.tmp_path, 'write-partitions-' + guid()) - self.hdfs.mkdir(tmpdir) - _test_write_to_dataset_with_partitions( - tmpdir, filesystem=self.hdfs) - - @pytest.mark.xfail(reason="legacy.FileSystem not supported with ParquetDataset " - "due to legacy path being removed in PyArrow 15.0.0.", - raises=TypeError) - @pytest.mark.parquet - @pytest.mark.pandas - def test_write_to_dataset_no_partitions(self): - tmpdir = pjoin(self.tmp_path, 'write-no_partitions-' + guid()) - self.hdfs.mkdir(tmpdir) - _test_write_to_dataset_no_partitions( - tmpdir, filesystem=self.hdfs) - - -class TestLibHdfs(HdfsTestCases): - - @classmethod - def check_driver(cls): - check_libhdfs_present() - - def test_orphaned_file(self): - hdfs = hdfs_test_client() - file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname', - b'foobarbaz') - - f = hdfs.open(file_path) - hdfs = None - f = None # noqa - - -def _get_hdfs_uri(path): - host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') - try: - port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0)) - except ValueError: - raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' - 'an integer') - uri = "hdfs://{}:{}{}".format(host, port, path) - - return uri - - -@pytest.mark.hdfs -@pytest.mark.pandas -@pytest.mark.parquet -@pytest.mark.fastparquet -def test_fastparquet_read_with_hdfs(): - check_libhdfs_present() - try: - import snappy # noqa - except ImportError: - pytest.skip('fastparquet test requires snappy') - - import pyarrow.parquet as pq - fastparquet = pytest.importorskip('fastparquet') - - fs = hdfs_test_client() - - df = util.make_dataframe() - - table = pa.Table.from_pandas(df) - - path = '/tmp/testing.parquet' - with fs.open(path, 'wb') as f: - pq.write_table(table, f) - - parquet_file = fastparquet.ParquetFile(path, open_with=fs.open) - - result = parquet_file.to_pandas() - assert_frame_equal(result, df) diff --git a/python/setup.py b/python/setup.py index 423de708e88..798bd6b05fd 100755 --- a/python/setup.py +++ b/python/setup.py @@ -211,7 +211,6 @@ def initialize_options(self): '_s3fs', '_substrait', '_hdfs', - '_hdfsio', 'gandiva'] def _run_cmake(self):