diff --git a/cpp/src/arrow/filesystem/api.h b/cpp/src/arrow/filesystem/api.h new file mode 100644 index 00000000000..fd8f566a78e --- /dev/null +++ b/cpp/src/arrow/filesystem/api.h @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_FILESYSTEM_API_H +#define ARROW_FILESYSTEM_API_H + +#include "arrow/filesystem/filesystem.h" // IWYU pragma: export +#include "arrow/filesystem/localfs.h" // IWYU pragma: export +#include "arrow/filesystem/mockfs.h" // IWYU pragma: export +#include "arrow/filesystem/s3fs.h" // IWYU pragma: export + +#endif // ARROW_FILESYSTEM_API_H diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index a00fa1269ed..0be5adad92d 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -57,7 +57,7 @@ using TimePoint = std::chrono::time_point; /// \brief EXPERIMENTAL: FileSystem entry type -enum class ARROW_EXPORT FileType { +enum class ARROW_EXPORT FileType : int8_t { // Target does not exist NonExistent, // Target exists but its type is unknown (could be a special file such diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index a89eba91c06..85a4aadf3f4 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -32,6 +32,7 @@ set(ARROW_PYTHON_SRCS benchmark.cc common.cc config.cc + datetime.cc decimal.cc deserialize.cc extension_type.cc diff --git a/cpp/src/arrow/python/api.h b/cpp/src/arrow/python/api.h index 6bbfcbfa34b..c9eb6e87fb5 100644 --- a/cpp/src/arrow/python/api.h +++ b/cpp/src/arrow/python/api.h @@ -20,6 +20,7 @@ #include "arrow/python/arrow_to_pandas.h" #include "arrow/python/common.h" +#include "arrow/python/datetime.h" #include "arrow/python/deserialize.h" #include "arrow/python/helpers.h" #include "arrow/python/inference.h" diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc index beabcebf9c3..539d82eae8e 100644 --- a/cpp/src/arrow/python/arrow_to_pandas.cc +++ b/cpp/src/arrow/python/arrow_to_pandas.cc @@ -46,13 +46,13 @@ #include "arrow/python/common.h" #include "arrow/python/config.h" +#include "arrow/python/datetime.h" #include "arrow/python/decimal.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/numpy_internal.h" #include "arrow/python/python_to_arrow.h" #include "arrow/python/type_traits.h" -#include "arrow/python/util/datetime.h" namespace arrow { @@ -654,7 +654,7 @@ static Status ConvertDates(const PandasOptions& options, const ChunkedArray& dat PyDateTime_IMPORT; } auto WrapValue = [](typename Type::c_type value, PyObject** out) { - RETURN_NOT_OK(PyDate_from_int(value, Type::UNIT, out)); + RETURN_NOT_OK(internal::PyDate_from_int(value, Type::UNIT, out)); RETURN_IF_PYERROR(); return Status::OK(); }; @@ -672,7 +672,7 @@ static Status ConvertTimes(const PandasOptions& options, const ChunkedArray& dat const TimeUnit::type unit = checked_cast(*data.type()).unit(); auto WrapValue = [unit](typename Type::c_type value, PyObject** out) { - RETURN_NOT_OK(PyTime_from_int(value, unit, out)); + RETURN_NOT_OK(internal::PyTime_from_int(value, unit, out)); RETURN_IF_PYERROR(); return Status::OK(); }; diff --git a/cpp/src/arrow/python/util/datetime.h b/cpp/src/arrow/python/datetime.cc similarity index 76% rename from cpp/src/arrow/python/util/datetime.h rename to cpp/src/arrow/python/datetime.cc index 6194b7f7115..592fea24393 100644 --- a/cpp/src/arrow/python/util/datetime.h +++ b/cpp/src/arrow/python/datetime.cc @@ -15,18 +15,19 @@ // specific language governing permissions and limitations // under the License. -#ifndef PYARROW_UTIL_DATETIME_H -#define PYARROW_UTIL_DATETIME_H - #include +#include +#include -#include +#include "arrow/python/datetime.h" #include "arrow/python/platform.h" #include "arrow/status.h" +#include "arrow/type.h" #include "arrow/util/logging.h" namespace arrow { namespace py { +namespace internal { // The following code is adapted from // https://github.com/numpy/numpy/blob/master/numpy/core/src/multiarray/datetime.c @@ -155,25 +156,6 @@ static void get_date_from_days(int64_t days, int64_t* date_year, int64_t* date_m return; } -static inline int64_t PyTime_to_us(PyObject* pytime) { - return (static_cast(PyDateTime_TIME_GET_HOUR(pytime)) * 3600000000LL + - static_cast(PyDateTime_TIME_GET_MINUTE(pytime)) * 60000000LL + - static_cast(PyDateTime_TIME_GET_SECOND(pytime)) * 1000000LL + - PyDateTime_TIME_GET_MICROSECOND(pytime)); -} - -static inline int64_t PyTime_to_s(PyObject* pytime) { - return PyTime_to_us(pytime) / 1000000; -} - -static inline int64_t PyTime_to_ms(PyObject* pytime) { - return PyTime_to_us(pytime) / 1000; -} - -static inline int64_t PyTime_to_ns(PyObject* pytime) { - return PyTime_to_us(pytime) * 1000; -} - // Splitting time quantities, for example splitting total seconds into // minutes and remaining seconds. After we run // int64_t remaining = split_time(total, quotient, &next) @@ -233,8 +215,7 @@ static inline Status PyDate_convert_int(int64_t val, const DateUnit unit, int64_ return Status::OK(); } -static inline Status PyTime_from_int(int64_t val, const TimeUnit::type unit, - PyObject** out) { +Status PyTime_from_int(int64_t val, const TimeUnit::type unit, PyObject** out) { int64_t hour = 0, minute = 0, second = 0, microsecond = 0; RETURN_NOT_OK(PyTime_convert_int(val, unit, &hour, &minute, &second, µsecond)); *out = PyTime_FromTime(static_cast(hour), static_cast(minute), @@ -242,7 +223,7 @@ static inline Status PyTime_from_int(int64_t val, const TimeUnit::type unit, return Status::OK(); } -static inline Status PyDate_from_int(int64_t val, const DateUnit unit, PyObject** out) { +Status PyDate_from_int(int64_t val, const DateUnit unit, PyObject** out) { int64_t year = 0, month = 0, day = 0; RETURN_NOT_OK(PyDate_convert_int(val, unit, &year, &month, &day)); *out = PyDate_FromDate(static_cast(year), static_cast(month), @@ -250,8 +231,7 @@ static inline Status PyDate_from_int(int64_t val, const DateUnit unit, PyObject* return Status::OK(); } -static inline Status PyDateTime_from_int(int64_t val, const TimeUnit::type unit, - PyObject** out) { +Status PyDateTime_from_int(int64_t val, const TimeUnit::type unit, PyObject** out) { int64_t hour = 0, minute = 0, second = 0, microsecond = 0; RETURN_NOT_OK(PyTime_convert_int(val, unit, &hour, &minute, &second, µsecond)); int64_t total_days = 0; @@ -265,47 +245,19 @@ static inline Status PyDateTime_from_int(int64_t val, const TimeUnit::type unit, return Status::OK(); } -static inline int64_t PyDate_to_days(PyDateTime_Date* pydate) { - return get_days_from_date(PyDateTime_GET_YEAR(pydate), PyDateTime_GET_MONTH(pydate), - PyDateTime_GET_DAY(pydate)); -} - -static inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) { - int64_t total_seconds = 0; - int64_t days = - get_days_from_date(PyDateTime_GET_YEAR(pydate), PyDateTime_GET_MONTH(pydate), - PyDateTime_GET_DAY(pydate)); - total_seconds += days * 24 * 3600; - return total_seconds * 1000; -} - -static inline int64_t PyDateTime_to_s(PyDateTime_DateTime* pydatetime) { - int64_t total_seconds = 0; - total_seconds += PyDateTime_DATE_GET_SECOND(pydatetime); - total_seconds += PyDateTime_DATE_GET_MINUTE(pydatetime) * 60; - total_seconds += PyDateTime_DATE_GET_HOUR(pydatetime) * 3600; - - return total_seconds + - (PyDate_to_ms(reinterpret_cast(pydatetime)) / 1000LL); -} - -static inline int64_t PyDateTime_to_ms(PyDateTime_DateTime* pydatetime) { - int64_t date_ms = PyDateTime_to_s(pydatetime) * 1000; - int ms = PyDateTime_DATE_GET_MICROSECOND(pydatetime) / 1000; - return date_ms + ms; -} - -static inline int64_t PyDateTime_to_us(PyDateTime_DateTime* pydatetime) { - int64_t ms = PyDateTime_to_s(pydatetime) * 1000; - int us = PyDateTime_DATE_GET_MICROSECOND(pydatetime); - return ms * 1000 + us; +Status PyDateTime_from_TimePoint(TimePoint val, PyObject** out) { + PyDateTime_IMPORT; + auto nanos = val.time_since_epoch(); + auto micros = std::chrono::duration_cast(nanos); + RETURN_NOT_OK(PyDateTime_from_int(micros.count(), TimeUnit::MICRO, out)); + return Status::OK(); } -static inline int64_t PyDateTime_to_ns(PyDateTime_DateTime* pydatetime) { - return PyDateTime_to_us(pydatetime) * 1000; +int64_t PyDate_to_days(PyDateTime_Date* pydate) { + return get_days_from_date(PyDateTime_GET_YEAR(pydate), PyDateTime_GET_MONTH(pydate), + PyDateTime_GET_DAY(pydate)); } +} // namespace internal } // namespace py } // namespace arrow - -#endif // PYARROW_UTIL_DATETIME_H diff --git a/cpp/src/arrow/python/datetime.h b/cpp/src/arrow/python/datetime.h new file mode 100644 index 00000000000..7af4f7a5ad7 --- /dev/null +++ b/cpp/src/arrow/python/datetime.h @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYARROW_UTIL_DATETIME_H +#define PYARROW_UTIL_DATETIME_H + +#include +#include + +#include "arrow/python/platform.h" +#include "arrow/python/visibility.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace py { +namespace internal { + +ARROW_PYTHON_EXPORT +inline int64_t PyTime_to_us(PyObject* pytime) { + return (static_cast(PyDateTime_TIME_GET_HOUR(pytime)) * 3600000000LL + + static_cast(PyDateTime_TIME_GET_MINUTE(pytime)) * 60000000LL + + static_cast(PyDateTime_TIME_GET_SECOND(pytime)) * 1000000LL + + PyDateTime_TIME_GET_MICROSECOND(pytime)); +} + +ARROW_PYTHON_EXPORT +inline int64_t PyTime_to_s(PyObject* pytime) { return PyTime_to_us(pytime) / 1000000; } + +ARROW_PYTHON_EXPORT +inline int64_t PyTime_to_ms(PyObject* pytime) { return PyTime_to_us(pytime) / 1000; } + +ARROW_PYTHON_EXPORT +inline int64_t PyTime_to_ns(PyObject* pytime) { return PyTime_to_us(pytime) * 1000; } + +ARROW_PYTHON_EXPORT +Status PyTime_from_int(int64_t val, const TimeUnit::type unit, PyObject** out); + +ARROW_PYTHON_EXPORT +Status PyDate_from_int(int64_t val, const DateUnit unit, PyObject** out); + +ARROW_PYTHON_EXPORT +Status PyDateTime_from_int(int64_t val, const TimeUnit::type unit, PyObject** out); + +using TimePoint = + std::chrono::time_point; + +ARROW_PYTHON_EXPORT +Status PyDateTime_from_TimePoint(TimePoint val, PyObject** out); + +ARROW_PYTHON_EXPORT +int64_t PyDate_to_days(PyDateTime_Date* pydate); + +ARROW_PYTHON_EXPORT +inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) { + return PyDate_to_days(pydate) * 24 * 3600 * 1000; +} + +ARROW_PYTHON_EXPORT +inline int64_t PyDateTime_to_s(PyDateTime_DateTime* pydatetime) { + int64_t total_seconds = 0; + total_seconds += PyDateTime_DATE_GET_SECOND(pydatetime); + total_seconds += PyDateTime_DATE_GET_MINUTE(pydatetime) * 60; + total_seconds += PyDateTime_DATE_GET_HOUR(pydatetime) * 3600; + + return total_seconds + + (PyDate_to_ms(reinterpret_cast(pydatetime)) / 1000LL); +} + +ARROW_PYTHON_EXPORT +inline int64_t PyDateTime_to_ms(PyDateTime_DateTime* pydatetime) { + int64_t date_ms = PyDateTime_to_s(pydatetime) * 1000; + int ms = PyDateTime_DATE_GET_MICROSECOND(pydatetime) / 1000; + return date_ms + ms; +} + +ARROW_PYTHON_EXPORT +inline int64_t PyDateTime_to_us(PyDateTime_DateTime* pydatetime) { + int64_t ms = PyDateTime_to_s(pydatetime) * 1000; + int us = PyDateTime_DATE_GET_MICROSECOND(pydatetime); + return ms * 1000 + us; +} + +ARROW_PYTHON_EXPORT +inline int64_t PyDateTime_to_ns(PyDateTime_DateTime* pydatetime) { + return PyDateTime_to_us(pydatetime) * 1000; +} + +} // namespace internal +} // namespace py +} // namespace arrow + +#endif // PYARROW_UTIL_DATETIME_H diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index 45f7d61890e..fb07442a8d9 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -39,11 +39,11 @@ #include "arrow/util/parsing.h" #include "arrow/python/common.h" +#include "arrow/python/datetime.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/pyarrow.h" #include "arrow/python/serialize.h" -#include "arrow/python/util/datetime.h" namespace arrow { @@ -154,7 +154,7 @@ Status GetValue(PyObject* context, const Array& arr, int64_t index, int8_t type, *result = PyFloat_FromDouble(checked_cast(arr).Value(index)); return Status::OK(); case PythonType::DATE64: { - RETURN_NOT_OK(PyDateTime_from_int( + RETURN_NOT_OK(internal::PyDateTime_from_int( checked_cast(arr).Value(index), TimeUnit::MICRO, result)); RETURN_IF_PYERROR(); return Status::OK(); diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc index eddba0ec4e0..e8af80ee28d 100644 --- a/cpp/src/arrow/python/inference.cc +++ b/cpp/src/arrow/python/inference.cc @@ -31,11 +31,11 @@ #include "arrow/util/decimal.h" #include "arrow/util/logging.h" +#include "arrow/python/datetime.h" #include "arrow/python/decimal.h" #include "arrow/python/helpers.h" #include "arrow/python/iterators.h" #include "arrow/python/numpy_convert.h" -#include "arrow/python/util/datetime.h" namespace arrow { namespace py { diff --git a/cpp/src/arrow/python/numpy_to_arrow.cc b/cpp/src/arrow/python/numpy_to_arrow.cc index caee934c913..1cb68f4732d 100644 --- a/cpp/src/arrow/python/numpy_to_arrow.cc +++ b/cpp/src/arrow/python/numpy_to_arrow.cc @@ -48,13 +48,13 @@ #include "arrow/python/common.h" #include "arrow/python/config.h" +#include "arrow/python/datetime.h" #include "arrow/python/helpers.h" #include "arrow/python/iterators.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/numpy_internal.h" #include "arrow/python/python_to_arrow.h" #include "arrow/python/type_traits.h" -#include "arrow/python/util/datetime.h" namespace arrow { diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 4b4e759e523..2b3c9ffebf5 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -38,13 +38,13 @@ #include "arrow/util/decimal.h" #include "arrow/util/logging.h" +#include "arrow/python/datetime.h" #include "arrow/python/decimal.h" #include "arrow/python/helpers.h" #include "arrow/python/inference.h" #include "arrow/python/iterators.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/type_traits.h" -#include "arrow/python/util/datetime.h" namespace arrow { @@ -301,7 +301,7 @@ class Date32Converter int32_t t; if (PyDate_Check(obj)) { auto pydate = reinterpret_cast(obj); - t = static_cast(PyDate_to_days(pydate)); + t = static_cast(internal::PyDate_to_days(pydate)); } else { RETURN_NOT_OK(internal::CIntFromPython(obj, &t, "Integer too large for date32")); } @@ -317,12 +317,12 @@ class Date64Converter int64_t t; if (PyDateTime_Check(obj)) { auto pydate = reinterpret_cast(obj); - t = PyDateTime_to_ms(pydate); + t = internal::PyDateTime_to_ms(pydate); // Truncate any intraday milliseconds t -= t % 86400000LL; } else if (PyDate_Check(obj)) { auto pydate = reinterpret_cast(obj); - t = PyDate_to_ms(pydate); + t = internal::PyDate_to_ms(pydate); } else { RETURN_NOT_OK(internal::CIntFromPython(obj, &t, "Integer too large for date64")); } @@ -343,10 +343,10 @@ class Time32Converter // datetime.time stores microsecond resolution switch (unit_) { case TimeUnit::SECOND: - t = static_cast(PyTime_to_s(obj)); + t = static_cast(internal::PyTime_to_s(obj)); break; case TimeUnit::MILLI: - t = static_cast(PyTime_to_ms(obj)); + t = static_cast(internal::PyTime_to_ms(obj)); break; default: return Status::UnknownError("Invalid time unit"); @@ -373,10 +373,10 @@ class Time64Converter // datetime.time stores microsecond resolution switch (unit_) { case TimeUnit::MICRO: - t = PyTime_to_us(obj); + t = internal::PyTime_to_us(obj); break; case TimeUnit::NANO: - t = PyTime_to_ns(obj); + t = internal::PyTime_to_ns(obj); break; default: return Status::UnknownError("Invalid time unit"); @@ -404,16 +404,16 @@ class TimestampConverter switch (unit_) { case TimeUnit::SECOND: - t = PyDateTime_to_s(pydatetime); + t = internal::PyDateTime_to_s(pydatetime); break; case TimeUnit::MILLI: - t = PyDateTime_to_ms(pydatetime); + t = internal::PyDateTime_to_ms(pydatetime); break; case TimeUnit::MICRO: - t = PyDateTime_to_us(pydatetime); + t = internal::PyDateTime_to_us(pydatetime); break; case TimeUnit::NANO: - t = PyDateTime_to_ns(pydatetime); + t = internal::PyDateTime_to_ns(pydatetime); break; default: return Status::UnknownError("Invalid time unit"); diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index aaed0736f63..a344e90ed72 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -41,12 +41,12 @@ #include "arrow/util/logging.h" #include "arrow/python/common.h" +#include "arrow/python/datetime.h" #include "arrow/python/helpers.h" #include "arrow/python/iterators.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/platform.h" #include "arrow/python/pyarrow.h" -#include "arrow/python/util/datetime.h" constexpr int32_t kMaxRecursionDepth = 100; @@ -466,7 +466,7 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, RETURN_NOT_OK(builder->AppendNone()); } else if (PyDateTime_Check(elem)) { PyDateTime_DateTime* datetime = reinterpret_cast(elem); - RETURN_NOT_OK(builder->AppendDate64(PyDateTime_to_us(datetime))); + RETURN_NOT_OK(builder->AppendDate64(internal::PyDateTime_to_us(datetime))); } else if (is_buffer(elem)) { RETURN_NOT_OK(builder->AppendBuffer(static_cast(blobs_out->buffers.size()))); std::shared_ptr buffer; diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 87d26d319dd..1529d77585c 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -379,7 +379,7 @@ if(UNIX) set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) endif() -set(CYTHON_EXTENSIONS lib _csv _json) +set(CYTHON_EXTENSIONS lib _fs _csv _json) set(LINK_LIBS arrow_shared arrow_python_shared) diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx new file mode 100644 index 00000000000..769ca8d7391 --- /dev/null +++ b/python/pyarrow/_fs.pyx @@ -0,0 +1,526 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +import six + +from pyarrow.compat import frombytes, tobytes +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport PyDateTime_from_TimePoint +from pyarrow.includes.libarrow_fs cimport * +from pyarrow.util import _stringify_path +from pyarrow.lib import _detect_compression +from pyarrow.lib cimport ( + check_status, + NativeFile, + BufferedOutputStream, + BufferedInputStream, + CompressedInputStream, + CompressedOutputStream +) + + +cdef inline c_string _path_as_bytes(path) except *: + # handle only abstract paths, not bound to any filesystem like pathlib is, + # so we only accept plain strings + if not isinstance(path, six.string_types): + raise TypeError('Path must be a string') + # tobytes always uses utf-8, which is more or less ok, at least on Windows + # since the C++ side then decodes from utf-8. On Unix, os.fsencode may be + # better. + return tobytes(path) + + +cpdef enum FileType: + NonExistent = CFileType_NonExistent + Unknown = CFileType_Unknown + File = CFileType_File + Directory = CFileType_Directory + + +cdef class FileStats: + """FileSystem entry stats""" + + cdef CFileStats stats + + def __init__(self): + raise TypeError('dont initialize me') + + @staticmethod + cdef FileStats wrap(CFileStats stats): + cdef FileStats self = FileStats.__new__(FileStats) + self.stats = stats + return self + + def __repr__(self): + def getvalue(attr): + try: + return getattr(self, attr) + except ValueError: + return '' + attrs = ['type', 'path', 'base_name', 'size', 'extension', 'mtime'] + attrs = '\n'.join('{}: {}'.format(a, getvalue(a)) for a in attrs) + return '{}\n{}'.format(object.__repr__(self), attrs) + + @property + def type(self): + """Type of the file + + The returned enum variants have the following meanings: + - FileType.NonExistent: target does not exist + - FileType.Unknown: target exists but its type is unknown (could be a + special file such as a Unix socket or character + device, or Windows NUL / CON / ...) + - FileType.File: target is a regular file + - FileType.Directory: target is a regular directory + + Returns + ------- + type : FileType + """ + return FileType( self.stats.type()) + + @property + def path(self): + """The full file path in the filesystem.""" + return frombytes(self.stats.path()) + + @property + def base_name(self): + """The file base name + + Component after the last directory separator. + """ + return frombytes(self.stats.base_name()) + + @property + def size(self): + """The size in bytes, if available + + Only regular files are guaranteed to have a size. + """ + if self.stats.type() != CFileType_File: + raise ValueError( + 'Only regular files are guaranteed to have a size' + ) + return self.stats.size() + + @property + def extension(self): + """The file extension""" + return frombytes(self.stats.extension()) + + @property + def mtime(self): + """The time of last modification, if available. + + Returns + ------- + mtime : datetime.datetime + """ + cdef PyObject *out + check_status(PyDateTime_from_TimePoint(self.stats.mtime(), &out)) + return PyObject_to_object(out) + + +cdef class Selector: + """File and directory selector. + + It contains a set of options that describes how to search for files and + directories. + + Parameters + ---------- + base_dir : str or pathlib.Path + The directory in which to select files. Relative paths also work, use + '.' for the current directory and '..' for the parent. + allow_non_existent : bool, default False + The behavior if `base_dir` doesn't exist in the filesystem. + If false, an error is returned. + If true, an empty selection is returned. + recursive : bool, default False + Whether to recurse into subdirectories. + """ + cdef CSelector selector + + def __init__(self, base_dir, bint allow_non_existent=False, + bint recursive=False): + self.base_dir = base_dir + self.recursive = recursive + self.allow_non_existent = allow_non_existent + + @property + def base_dir(self): + return frombytes(self.selector.base_dir) + + @base_dir.setter + def base_dir(self, base_dir): + self.selector.base_dir = _path_as_bytes(base_dir) + + @property + def allow_non_existent(self): + return self.selector.allow_non_existent + + @allow_non_existent.setter + def allow_non_existent(self, bint allow_non_existent): + self.selector.allow_non_existent = allow_non_existent + + @property + def recursive(self): + return self.selector.recursive + + @recursive.setter + def recursive(self, bint recursive): + self.selector.recursive = recursive + + +cdef class FileSystem: + """Abstract file system API""" + + cdef: + shared_ptr[CFileSystem] wrapped + CFileSystem* fs + + def __init__(self): + raise TypeError("FileSystem is an abstract class, instantiate one of " + "the subclasses instead: LocalFileSystem or " + "SubTreeFileSystem") + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + self.wrapped = wrapped + self.fs = wrapped.get() + + def get_target_stats(self, paths_or_selector): + """Get statistics for the given target. + + Any symlink is automatically dereferenced, recursively. A non-existing + or unreachable file returns a FileStats object and has a FileType of + value NonExistent. An exception indicates a truly exceptional condition + (low-level I/O error, etc.). + + Parameters + ---------- + paths_or_selector: Selector or list of path-likes + Either a Selector object or a list of path-like objects. + The selector's base directory will not be part of the results, even + if it exists. If it doesn't exist, use `allow_non_existent`. + + Returns + ------- + file_stats : list of FileStats + """ + cdef: + vector[CFileStats] stats + vector[c_string] paths + CSelector selector + + if isinstance(paths_or_selector, Selector): + with nogil: + selector = (paths_or_selector).selector + check_status(self.fs.GetTargetStats(selector, &stats)) + elif isinstance(paths_or_selector, (list, tuple)): + paths = [_path_as_bytes(s) for s in paths_or_selector] + with nogil: + check_status(self.fs.GetTargetStats(paths, &stats)) + else: + raise TypeError('Must pass either paths or a Selector') + + return [FileStats.wrap(stat) for stat in stats] + + def create_dir(self, path, bint recursive=True): + """Create a directory and subdirectories. + + This function succeeds if the directory already exists. + + Parameters + ---------- + path : str or pathlib.Path + The path of the new directory. + recursive: bool, default True + Create nested directories as well. + """ + cdef c_string directory = _path_as_bytes(path) + with nogil: + check_status(self.fs.CreateDir(directory, recursive=recursive)) + + def delete_dir(self, path): + """Delete a directory and its contents, recursively. + + Parameters + ---------- + path : str or pathlib.Path + The path of the directory to be deleted. + """ + cdef c_string directory = _path_as_bytes(path) + with nogil: + check_status(self.fs.DeleteDir(directory)) + + def move(self, src, dest): + """Move / rename a file or directory. + + If the destination exists: + - if it is a non-empty directory, an error is returned + - otherwise, if it has the same type as the source, it is replaced + - otherwise, behavior is unspecified (implementation-dependent). + + Parameters + ---------- + src : str or pathlib.Path + The path of the file or the directory to be moved. + dest : str or pathlib.Path + The destination path where the file or directory is moved to. + """ + cdef: + c_string source = _path_as_bytes(src) + c_string destination = _path_as_bytes(dest) + with nogil: + check_status(self.fs.Move(source, destination)) + + def copy_file(self, src, dest): + """Copy a file. + + If the destination exists and is a directory, an error is returned. + Otherwise, it is replaced. + + Parameters + ---------- + src : str or pathlib.Path + The path of the file to be copied from. + dest : str or pathlib.Path + The destination path where the file is copied to. + """ + cdef: + c_string source = _path_as_bytes(src) + c_string destination = _path_as_bytes(dest) + with nogil: + check_status(self.fs.CopyFile(source, destination)) + + def delete_file(self, path): + """Delete a file. + + Parameters + ---------- + path : str or pathlib.Path + The path of the file to be deleted. + """ + cdef c_string file = _path_as_bytes(path) + with nogil: + check_status(self.fs.DeleteFile(file)) + + def _wrap_input_stream(self, stream, path, compression, buffer_size): + if buffer_size is not None and buffer_size != 0: + stream = BufferedInputStream(stream, buffer_size) + if compression == 'detect': + compression = _detect_compression(path) + if compression is not None: + stream = CompressedInputStream(stream, compression) + return stream + + def _wrap_output_stream(self, stream, path, compression, buffer_size): + if buffer_size is not None and buffer_size != 0: + stream = BufferedOutputStream(stream, buffer_size) + if compression == 'detect': + compression = _detect_compression(path) + if compression is not None: + stream = CompressedOutputStream(stream, compression) + return stream + + def open_input_file(self, path): + """Open an input file for random access reading. + + Parameters + ---------- + path : Union[str, pathlib.Path] + The source to open for reading. + + Returns + ------- + stram : NativeFile + """ + cdef: + c_string pathstr = _path_as_bytes(path) + NativeFile stream = NativeFile() + shared_ptr[CRandomAccessFile] in_handle + + with nogil: + check_status(self.fs.OpenInputFile(pathstr, &in_handle)) + + stream.set_random_access_file(in_handle) + stream.is_readable = True + return stream + + def open_input_stream(self, path, compression='detect', buffer_size=None): + """Open an input stream for sequential reading. + + Parameters + ---------- + source: str or pathlib.Path + The source to open for reading. + compression: str optional, default 'detect' + The compression algorithm to use for on-the-fly decompression. + If "detect" and source is a file path, then compression will be + chosen based on the file extension. + If None, no compression will be applied. Otherwise, a well-known + algorithm name must be supplied (e.g. "gzip"). + buffer_size: int optional, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary read buffer. + + Returns + ------- + stream : NativeFile + """ + cdef: + c_string pathstr = _path_as_bytes(path) + NativeFile stream = NativeFile() + shared_ptr[CInputStream] in_handle + + with nogil: + check_status(self.fs.OpenInputStream(pathstr, &in_handle)) + + stream.set_input_stream(in_handle) + stream.is_readable = True + + return self._wrap_input_stream( + stream, path=path, compression=compression, buffer_size=buffer_size + ) + + def open_output_stream(self, path, compression='detect', buffer_size=None): + """Open an output stream for sequential writing. + + If the target already exists, existing data is truncated. + + Parameters + ---------- + path : str or pathlib.Path + The source to open for writing. + compression: str optional, default 'detect' + The compression algorithm to use for on-the-fly compression. + If "detect" and source is a file path, then compression will be + chosen based on the file extension. + If None, no compression will be applied. Otherwise, a well-known + algorithm name must be supplied (e.g. "gzip"). + buffer_size: int optional, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary write buffer. + + Returns + ------- + stream : NativeFile + """ + cdef: + c_string pathstr = _path_as_bytes(path) + NativeFile stream = NativeFile() + shared_ptr[COutputStream] out_handle + + with nogil: + check_status(self.fs.OpenOutputStream(pathstr, &out_handle)) + + stream.set_output_stream(out_handle) + stream.is_writable = True + + return self._wrap_output_stream( + stream, path=path, compression=compression, buffer_size=buffer_size + ) + + def open_append_stream(self, path, compression='detect', buffer_size=None): + """Open an output stream for appending. + + If the target doesn't exist, a new empty file is created. + + Parameters + ---------- + path : str or pathlib.Path + The source to open for writing. + compression: str optional, default 'detect' + The compression algorithm to use for on-the-fly compression. + If "detect" and source is a file path, then compression will be + chosen based on the file extension. + If None, no compression will be applied. Otherwise, a well-known + algorithm name must be supplied (e.g. "gzip"). + buffer_size: int optional, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary write buffer. + + Returns + ------- + stream : NativeFile + """ + cdef: + c_string pathstr = _path_as_bytes(path) + NativeFile stream = NativeFile() + shared_ptr[COutputStream] out_handle + + with nogil: + check_status(self.fs.OpenAppendStream(pathstr, &out_handle)) + + stream.set_output_stream(out_handle) + stream.is_writable = True + + return self._wrap_output_stream( + stream, path=path, compression=compression, buffer_size=buffer_size + ) + + +cdef class LocalFileSystem(FileSystem): + """A FileSystem implementation accessing files on the local machine. + + Details such as symlinks are abstracted away (symlinks are always followed, + except when deleting an entry). + """ + + cdef: + CLocalFileSystem* localfs + + def __init__(self): + cdef shared_ptr[CLocalFileSystem] wrapped + wrapped = make_shared[CLocalFileSystem]() + self.init( wrapped) + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + FileSystem.init(self, wrapped) + self.localfs = wrapped.get() + + +cdef class SubTreeFileSystem(FileSystem): + """Delegates to another implementation after prepending a fixed base path. + + This is useful to expose a logical view of a subtree of a filesystem, + for example a directory in a LocalFileSystem. + + Note, that this makes no security guarantee. For example, symlinks may + allow to "escape" the subtree and access other parts of the underlying + filesystem. + """ + + cdef: + CSubTreeFileSystem* subtreefs + + def __init__(self, base_path, FileSystem base_fs): + cdef: + c_string pathstr + shared_ptr[CSubTreeFileSystem] wrapped + + pathstr = _path_as_bytes(base_path) + wrapped = make_shared[CSubTreeFileSystem](pathstr, base_fs.wrapped) + + self.init( wrapped) + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + FileSystem.init(self, wrapped) + self.subtreefs = wrapped.get() diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py new file mode 100644 index 00000000000..cd5263acbca --- /dev/null +++ b/python/pyarrow/fs.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import + +from pyarrow._fs import * # noqa diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 721facdfbb2..72247b776e7 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -29,6 +29,7 @@ from libcpp.unordered_set cimport unordered_set from cpython cimport PyObject cimport cpython + cdef extern from * namespace "std" nogil: cdef shared_ptr[T] static_pointer_cast[T, U](shared_ptr[U]) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 83df0e9533c..14d672edf89 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1408,6 +1408,16 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: CSerializedPyObject* out) +cdef extern from "arrow/python/api.h" namespace "arrow::py::internal" nogil: + + cdef cppclass CTimePoint "arrow::py::internal::TimePoint": + pass + + cdef CStatus PyDateTime_from_int(int64_t val, const TimeUnit unit, + PyObject** out) + cdef CStatus PyDateTime_from_TimePoint(CTimePoint val, PyObject** out) + + cdef extern from 'arrow/python/init.h': int arrow_init_numpy() except -1 diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd new file mode 100644 index 00000000000..f54a2e50357 --- /dev/null +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from libcpp.functional cimport function + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport ( + InputStream as CInputStream, + OutputStream as COutputStream, + RandomAccessFile as CRandomAccessFile +) + + +cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: + + enum CFileType "arrow::fs::FileType": + CFileType_NonExistent "arrow::fs::FileType::NonExistent" + CFileType_Unknown "arrow::fs::FileType::Unknown" + CFileType_File "arrow::fs::FileType::File" + CFileType_Directory "arrow::fs::FileType::Directory" + + cdef cppclass CTimePoint "arrow::fs::TimePoint": + pass + + cdef cppclass CFileStats "arrow::fs::FileStats": + CFileStats() + CFileStats(CFileStats&&) + CFileStats& operator=(CFileStats&&) + CFileStats(const CFileStats&) + CFileStats& operator=(const CFileStats&) + + CFileType type() + void set_type(CFileType type) + c_string path() + void set_path(const c_string& path) + c_string base_name() + int64_t size() + void set_size(int64_t size) + c_string extension() + CTimePoint mtime() + void set_mtime(CTimePoint mtime) + + cdef cppclass CSelector "arrow::fs::Selector": + CSelector() + c_string base_dir + c_bool allow_non_existent + c_bool recursive + + cdef cppclass CFileSystem "arrow::fs::FileSystem": + CStatus GetTargetStats(const c_string& path, CFileStats* out) + CStatus GetTargetStats(const vector[c_string]& paths, + vector[CFileStats]* out) + CStatus GetTargetStats(const CSelector& select, + vector[CFileStats]* out) + CStatus CreateDir(const c_string& path, c_bool recursive) + CStatus DeleteDir(const c_string& path) + CStatus DeleteFile(const c_string& path) + CStatus DeleteFiles(const vector[c_string]& paths) + CStatus Move(const c_string& src, const c_string& dest) + CStatus CopyFile(const c_string& src, const c_string& dest) + CStatus OpenInputStream(const c_string& path, + shared_ptr[CInputStream]* out) + CStatus OpenInputFile(const c_string& path, + shared_ptr[CRandomAccessFile]* out) + CStatus OpenOutputStream(const c_string& path, + shared_ptr[COutputStream]* out) + CStatus OpenAppendStream(const c_string& path, + shared_ptr[COutputStream]* out) + + cdef cppclass CLocalFileSystem "arrow::fs::LocalFileSystem"(CFileSystem): + LocalFileSystem() + + cdef cppclass CSubTreeFileSystem \ + "arrow::fs::SubTreeFileSystem"(CFileSystem): + CSubTreeFileSystem(const c_string& base_path, + shared_ptr[CFileSystem] base_fs) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 3f5f2ff3411..7f341727e62 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1610,7 +1610,7 @@ def input_stream(source, compression='detect', buffer_size=None): ---------- source: str, Path, buffer, file-like object, ... The source to open for reading - compression: str or None + compression: str optional, default 'detect' The compression algorithm to use for on-the-fly decompression. If "detect" and source is a file path, then compression will be chosen based on the file extension. @@ -1642,6 +1642,7 @@ def input_stream(source, compression='detect', buffer_size=None): .format(source.__class__)) if compression == 'detect': + # detect for OSFile too compression = _detect_compression(source_path) if buffer_size is not None and buffer_size != 0: @@ -1661,7 +1662,7 @@ def output_stream(source, compression='detect', buffer_size=None): ---------- source: str, Path, buffer, file-like object, ... The source to open for writing - compression: str or None + compression: str optional, default 'detect' The compression algorithm to use for on-the-fly compression. If "detect" and source is a file path, then compression will be chosen based on the file extension. diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 04a6157126c..2f4aeceaf57 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -452,6 +452,22 @@ cdef class NativeFile: cdef shared_ptr[OutputStream] get_output_stream(self) except * +cdef class BufferedInputStream(NativeFile): + pass + + +cdef class BufferedOutputStream(NativeFile): + pass + + +cdef class CompressedInputStream(NativeFile): + pass + + +cdef class CompressedOutputStream(NativeFile): + pass + + cdef class _CRecordBatchWriter: cdef: shared_ptr[CRecordBatchWriter] writer diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py new file mode 100644 index 00000000000..f897e0d36f9 --- /dev/null +++ b/python/pyarrow/tests/test_fs.py @@ -0,0 +1,309 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime +try: + import pathlib +except ImportError: + import pathlib2 as pathlib # py2 compat + +import pytest + +from pyarrow import ArrowIOError +from pyarrow.fs import (FileType, Selector, FileSystem, LocalFileSystem, + SubTreeFileSystem) +from pyarrow.tests.test_io import gzip_compress, gzip_decompress + + +@pytest.fixture(params=[ + pytest.param( + lambda tmp: LocalFileSystem(), + id='LocalFileSystem' + ), + pytest.param( + lambda tmp: SubTreeFileSystem(tmp, LocalFileSystem()), + id='SubTreeFileSystem(LocalFileSystem)' + ) +]) +def fs(request, tempdir): + return request.param(tempdir.as_posix()) + + +@pytest.fixture +def testpath(request, fs, tempdir): + # we always use the tempdir for reading and writing test artifacts, but + # if the filesystem is wrapped in a SubTreeFileSystem then we don't need + # to prepend the path with the tempdir, we also test the API with both + # pathlib.Path objects and plain python strings + def convert(path): + if isinstance(fs, SubTreeFileSystem): + path = pathlib.Path(path) + else: + path = tempdir / path + return path.as_posix() + return convert + + +def test_cannot_instantiate_base_filesystem(): + with pytest.raises(TypeError): + FileSystem() + + +def test_non_path_like_input_raises(fs): + class Path: + pass + + invalid_paths = [1, 1.1, Path(), tuple(), {}, [], lambda: 1, + pathlib.Path()] + for path in invalid_paths: + with pytest.raises(TypeError): + fs.create_dir(path) + + +def test_get_target_stats(fs, tempdir, testpath): + aaa, aaa_ = testpath('a/aa/aaa'), tempdir / 'a' / 'aa' / 'aaa' + bb, bb_ = testpath('a/bb'), tempdir / 'a' / 'bb' + c, c_ = testpath('c.txt'), tempdir / 'c.txt' + + aaa_.mkdir(parents=True) + bb_.touch() + c_.write_bytes(b'test') + + def mtime_almost_equal(fs_dt, pathlib_ts): + # arrow's filesystem implementation truncates mtime to microsends + # resolution whereas pathlib rounds + pathlib_dt = datetime.utcfromtimestamp(pathlib_ts) + difference = (fs_dt - pathlib_dt).total_seconds() + return abs(difference) <= 10**-6 + + aaa_stat, bb_stat, c_stat = fs.get_target_stats([aaa, bb, c]) + + assert aaa_stat.path == aaa + assert 'aaa' in repr(aaa_stat) + assert aaa_stat.base_name == 'aaa' + assert aaa_stat.extension == '' + assert aaa_stat.type == FileType.Directory + assert mtime_almost_equal(aaa_stat.mtime, aaa_.stat().st_mtime) + with pytest.raises(ValueError): + aaa_stat.size + + assert bb_stat.path == str(bb) + assert bb_stat.base_name == 'bb' + assert bb_stat.extension == '' + assert bb_stat.type == FileType.File + assert bb_stat.size == 0 + assert mtime_almost_equal(bb_stat.mtime, bb_.stat().st_mtime) + + assert c_stat.path == str(c) + assert c_stat.base_name == 'c.txt' + assert c_stat.extension == 'txt' + assert c_stat.type == FileType.File + assert c_stat.size == 4 + assert mtime_almost_equal(c_stat.mtime, c_.stat().st_mtime) + + +def test_get_target_stats_with_selector(fs, tempdir, testpath): + base_dir = testpath('.') + base_dir_ = tempdir + + selector = Selector(base_dir, allow_non_existent=False, recursive=True) + assert selector.base_dir == str(base_dir) + + (tempdir / 'test_file').touch() + (tempdir / 'test_directory').mkdir() + + stats = fs.get_target_stats(selector) + expected = list(base_dir_.iterdir()) + assert len(stats) == len(expected) + + for st in stats: + p = base_dir_ / st.path + if p.is_dir(): + assert st.type == FileType.Directory + if p.is_file(): + assert st.type == FileType.File + + +def test_create_dir(fs, tempdir, testpath): + directory = testpath('directory') + directory_ = tempdir / 'directory' + assert not directory_.exists() + fs.create_dir(directory) + assert directory_.exists() + + # recursive + directory = testpath('deeply/nested/directory') + directory_ = tempdir / 'deeply' / 'nested' / 'directory' + assert not directory_.exists() + with pytest.raises(ArrowIOError): + fs.create_dir(directory, recursive=False) + fs.create_dir(directory) + assert directory_.exists() + + +def test_delete_dir(fs, tempdir, testpath): + folder = testpath('directory') + nested = testpath('nested/directory') + folder_ = tempdir / 'directory' + nested_ = tempdir / 'nested' / 'directory' + + folder_.mkdir() + nested_.mkdir(parents=True) + + assert folder_.exists() + fs.delete_dir(folder) + assert not folder_.exists() + + assert nested_.exists() + fs.delete_dir(nested) + assert not nested_.exists() + + +def test_copy_file(fs, tempdir, testpath): + # copy file + source = testpath('source-file') + source_ = tempdir / 'source-file' + source_.touch() + target = testpath('target-file') + target_ = tempdir / 'target-file' + assert not target_.exists() + fs.copy_file(source, target) + assert source_.exists() + assert target_.exists() + + +def test_move(fs, tempdir, testpath): + # move directory + source = testpath('source-dir') + source_ = tempdir / 'source-dir' + source_.mkdir() + target = testpath('target-dir') + target_ = tempdir / 'target-dir' + assert not target_.exists() + fs.move(source, target) + assert not source_.exists() + assert target_.exists() + + # move file + source = testpath('source-file') + source_ = tempdir / 'source-file' + source_.touch() + target = testpath('target-file') + target_ = tempdir / 'target-file' + assert not target_.exists() + fs.move(source, target) + assert not source_.exists() + assert target_.exists() + + +def test_delete_file(fs, tempdir, testpath): + target = testpath('target-file') + target_ = tempdir / 'target-file' + target_.touch() + assert target_.exists() + fs.delete_file(target) + assert not target_.exists() + + nested = testpath('nested/target-file') + nested_ = tempdir / 'nested/target-file' + nested_.parent.mkdir() + nested_.touch() + assert nested_.exists() + fs.delete_file(nested) + assert not nested_.exists() + + +def identity(v): + return v + + +@pytest.mark.parametrize( + ('compression', 'buffer_size', 'compressor'), + [ + (None, None, identity), + (None, 64, identity), + ('gzip', None, gzip_compress), + ('gzip', 256, gzip_compress), + ] +) +def test_open_input_stream(fs, tempdir, testpath, compression, buffer_size, + compressor): + file = testpath('abc') + file_ = tempdir / 'abc' + data = b'some data' * 1024 + file_.write_bytes(compressor(data)) + + with fs.open_input_stream(file, compression, buffer_size) as f: + result = f.read() + + assert result == data + + +def test_open_input_file(fs, tempdir, testpath): + file = testpath('abc') + file_ = tempdir / 'abc' + data = b'some data' * 1024 + file_.write_bytes(data) + + read_from = len(b'some data') * 512 + with fs.open_input_file(file) as f: + f.seek(read_from) + result = f.read() + + assert result == data[read_from:] + + +@pytest.mark.parametrize( + ('compression', 'buffer_size', 'decompressor'), + [ + (None, None, identity), + (None, 64, identity), + ('gzip', None, gzip_decompress), + ('gzip', 256, gzip_decompress), + ] +) +def test_open_output_stream(fs, tempdir, testpath, compression, buffer_size, + decompressor): + file = testpath('abc') + file_ = tempdir / 'abc' + + data = b'some data' * 1024 + with fs.open_output_stream(file, compression, buffer_size) as f: + f.write(data) + + assert decompressor(file_.read_bytes()) == data + + +@pytest.mark.parametrize( + ('compression', 'buffer_size', 'compressor', 'decompressor'), + [ + (None, None, identity, identity), + (None, 64, identity, identity), + ('gzip', None, gzip_compress, gzip_decompress), + ('gzip', 256, gzip_compress, gzip_decompress), + ] +) +def test_open_append_stream(fs, tempdir, testpath, compression, buffer_size, + compressor, decompressor): + file = testpath('abc') + file_ = tempdir / 'abc' + file_.write_bytes(compressor(b'already existing')) + + with fs.open_append_stream(file, compression, buffer_size) as f: + f.write(b'\nnewly added') + + assert decompressor(file_.read_bytes()) == b'already existing\nnewly added' diff --git a/python/setup.py b/python/setup.py index 0187403a839..5db1b7d1fa2 100755 --- a/python/setup.py +++ b/python/setup.py @@ -168,6 +168,7 @@ def initialize_options(self): CYTHON_MODULE_NAMES = [ 'lib', + '_fs', '_csv', '_json', '_cuda',