diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 1edc75c4f3d..ea5b1c9e87e 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -221,7 +221,14 @@ Status StatSelector(const NativePathString& path, const Selector& select, } // namespace -LocalFileSystem::LocalFileSystem() {} +LocalFileSystemOptions LocalFileSystemOptions::Defaults() { + return LocalFileSystemOptions(); +} + +LocalFileSystem::LocalFileSystem() : options_(LocalFileSystemOptions::Defaults()) {} + +LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options) + : options_(options) {} LocalFileSystem::~LocalFileSystem() {} @@ -332,20 +339,34 @@ Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest #endif } +namespace { + +template +Status OpenInputStreamGeneric(const std::string& path, + const LocalFileSystemOptions& options, + std::shared_ptr* out) { + if (options.use_mmap) { + std::shared_ptr file; + RETURN_NOT_OK(io::MemoryMappedFile::Open(path, io::FileMode::READ, &file)); + *out = std::move(file); + } else { + std::shared_ptr file; + RETURN_NOT_OK(io::ReadableFile::Open(path, &file)); + *out = std::move(file); + } + return Status::OK(); +} + +} // namespace + Status LocalFileSystem::OpenInputStream(const std::string& path, std::shared_ptr* out) { - std::shared_ptr file; - RETURN_NOT_OK(io::ReadableFile::Open(path, &file)); - *out = std::move(file); - return Status::OK(); + return OpenInputStreamGeneric(path, options_, out); } Status LocalFileSystem::OpenInputFile(const std::string& path, std::shared_ptr* out) { - std::shared_ptr file; - RETURN_NOT_OK(io::ReadableFile::Open(path, &file)); - *out = std::move(file); - return Status::OK(); + return OpenInputStreamGeneric(path, options_, out); } namespace { diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h index 57da283b036..7a8f2145ae5 100644 --- a/cpp/src/arrow/filesystem/localfs.h +++ b/cpp/src/arrow/filesystem/localfs.h @@ -26,6 +26,16 @@ namespace arrow { namespace fs { +/// Options for the LocalFileSystem implementation. +struct ARROW_EXPORT LocalFileSystemOptions { + /// Whether OpenInputStream and OpenInputFile return a mmap'ed file, + /// or a regular one. + bool use_mmap = false; + + /// \brief Initialize with defaults + static LocalFileSystemOptions Defaults(); +}; + /// \brief EXPERIMENTAL: a FileSystem implementation accessing files /// on the local machine. /// @@ -36,6 +46,7 @@ namespace fs { class ARROW_EXPORT LocalFileSystem : public FileSystem { public: LocalFileSystem(); + explicit LocalFileSystem(const LocalFileSystemOptions&); ~LocalFileSystem() override; /// \cond FALSE @@ -66,6 +77,9 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { Status OpenAppendStream(const std::string& path, std::shared_ptr* out) override; + + protected: + LocalFileSystemOptions options_; }; } // namespace fs diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index 30a474b112c..d92caf99c88 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -72,12 +72,14 @@ class TestLocalFSGeneric : public LocalFSTestMixin, public GenericFileSystemTest public: void SetUp() override { LocalFSTestMixin::SetUp(); - local_fs_ = std::make_shared(); + local_fs_ = std::make_shared(options()); auto path = PathFormatter()(temp_dir_->path()); fs_ = std::make_shared(path, local_fs_); } protected: + virtual LocalFileSystemOptions options() { return LocalFileSystemOptions::Defaults(); } + std::shared_ptr GetEmptyFileSystem() override { return fs_; } std::shared_ptr local_fs_; @@ -88,6 +90,17 @@ TYPED_TEST_CASE(TestLocalFSGeneric, PathFormatters); GENERIC_FS_TYPED_TEST_FUNCTIONS(TestLocalFSGeneric); +class TestLocalFSGenericMMap : public TestLocalFSGeneric { + protected: + LocalFileSystemOptions options() override { + auto options = LocalFileSystemOptions::Defaults(); + options.use_mmap = true; + return options; + } +}; + +GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGenericMMap); + //////////////////////////////////////////////////////////////////////////// // Concrete LocalFileSystem tests diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 3a9306300d7..767b4eccace 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -433,6 +433,13 @@ class MemoryMappedFile::MemoryMap bool closed() const { return !file_->is_open(); } + Status CheckClosed() const { + if (closed()) { + return Status::Invalid("Invalid operation on closed file"); + } + return Status::OK(); + } + Status Open(const std::string& path, FileMode::type mode, const int64_t offset = 0, const int64_t length = -1) { file_.reset(new OSFile()); @@ -636,20 +643,26 @@ Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, } Status MemoryMappedFile::GetSize(int64_t* size) const { + RETURN_NOT_OK(memory_map_->CheckClosed()); *size = memory_map_->size(); return Status::OK(); } Status MemoryMappedFile::GetSize(int64_t* size) { + RETURN_NOT_OK(memory_map_->CheckClosed()); return static_cast(this)->GetSize(size); } Status MemoryMappedFile::Tell(int64_t* position) const { + RETURN_NOT_OK(memory_map_->CheckClosed()); *position = memory_map_->position(); return Status::OK(); } -Status MemoryMappedFile::Seek(int64_t position) { return memory_map_->Seek(position); } +Status MemoryMappedFile::Seek(int64_t position) { + RETURN_NOT_OK(memory_map_->CheckClosed()); + return memory_map_->Seek(position); +} Status MemoryMappedFile::Close() { return memory_map_->Close(); } @@ -657,6 +670,7 @@ bool MemoryMappedFile::closed() const { return memory_map_->closed(); } Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) { + RETURN_NOT_OK(memory_map_->CheckClosed()); // if the file is writable, we acquire the lock before creating any slices // in case a resize is triggered concurrently, otherwise we wouldn't detect // a change in the use count @@ -668,6 +682,7 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + RETURN_NOT_OK(memory_map_->CheckClosed()); auto guard_resize = memory_map_->writable() ? std::unique_lock(memory_map_->resize_lock()) : std::unique_lock(); @@ -680,12 +695,14 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes } Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) { + RETURN_NOT_OK(memory_map_->CheckClosed()); RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out)); memory_map_->advance(*bytes_read); return Status::OK(); } Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr* out) { + RETURN_NOT_OK(memory_map_->CheckClosed()); RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out)); memory_map_->advance((*out)->size()); return Status::OK(); @@ -694,6 +711,7 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr* out) { bool MemoryMappedFile::supports_zero_copy() const { return true; } Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) { + RETURN_NOT_OK(memory_map_->CheckClosed()); std::lock_guard guard(memory_map_->write_lock()); if (!memory_map_->opened() || !memory_map_->writable()) { @@ -712,6 +730,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nby } Status MemoryMappedFile::Write(const void* data, int64_t nbytes) { + RETURN_NOT_OK(memory_map_->CheckClosed()); std::lock_guard guard(memory_map_->write_lock()); if (!memory_map_->opened() || !memory_map_->writable()) { @@ -731,6 +750,7 @@ Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) { } Status MemoryMappedFile::Resize(int64_t new_size) { + RETURN_NOT_OK(memory_map_->CheckClosed()); std::unique_lock write_guard(memory_map_->write_lock(), std::defer_lock); std::unique_lock resize_guard(memory_map_->resize_lock(), std::defer_lock); std::lock(write_guard, resize_guard); diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx index 39079aee105..f4e2455e7b2 100644 --- a/python/pyarrow/_fs.pyx +++ b/python/pyarrow/_fs.pyx @@ -453,21 +453,67 @@ cdef class FileSystem: ) +cdef class LocalFileSystemOptions: + """Options for LocalFileSystemOptions. + + Parameters + ---------- + use_mmap: bool, default False + Whether open_input_stream and open_input_file should return + a mmap'ed file or a regular file. + """ + cdef: + CLocalFileSystemOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, use_mmap=None): + self.options = CLocalFileSystemOptions.Defaults() + if use_mmap is not None: + self.use_mmap = use_mmap + + @property + def use_mmap(self): + """ + Whether open_input_stream and open_input_file should return + a mmap'ed file or a regular file. + """ + return self.options.use_mmap + + @use_mmap.setter + def use_mmap(self, value): + self.options.use_mmap = value + + cdef class LocalFileSystem(FileSystem): """A FileSystem implementation accessing files on the local machine. Details such as symlinks are abstracted away (symlinks are always followed, except when deleting an entry). - """ - def __init__(self): - cdef shared_ptr[CLocalFileSystem] wrapped - wrapped = make_shared[CLocalFileSystem]() - self.init( wrapped) + Parameters + ---------- + options: LocalFileSystemOptions, default None + kwargs: individual named options, for convenience - cdef init(self, const shared_ptr[CFileSystem]& wrapped): - FileSystem.init(self, wrapped) - self.localfs = wrapped.get() + """ + + def __init__(self, LocalFileSystemOptions options=None, **kwargs): + cdef: + CLocalFileSystemOptions c_options + shared_ptr[CLocalFileSystem] c_fs + + options = options or LocalFileSystemOptions() + for k, v in kwargs.items(): + setattr(options, k, v) + c_options = options.options + c_fs = make_shared[CLocalFileSystem](c_options) + self.init( c_fs) + + cdef init(self, const shared_ptr[CFileSystem]& c_fs): + FileSystem.init(self, c_fs) + self.localfs = c_fs.get() cdef class SubTreeFileSystem(FileSystem): diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index 5f257d07f30..7b6d242a06c 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -23,5 +23,6 @@ FileStats, FileSystem, LocalFileSystem, + LocalFileSystemOptions, SubTreeFileSystem ) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index cb8d3b28d09..1c3f962c4c0 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1100,8 +1100,15 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: CStatus OpenAppendStream(const c_string& path, shared_ptr[COutputStream]* out) + cdef cppclass CLocalFileSystemOptions "arrow::fs::LocalFileSystemOptions": + c_bool use_mmap + + @staticmethod + CLocalFileSystemOptions Defaults() + cdef cppclass CLocalFileSystem "arrow::fs::LocalFileSystem"(CFileSystem): - LocalFileSystem() + CLocalFileSystem() + CLocalFileSystem(CLocalFileSystemOptions) cdef cppclass CSubTreeFileSystem \ "arrow::fs::SubTreeFileSystem"(CFileSystem): diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index f6b6bf1d18c..c728df5813f 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,7 @@ import pyarrow as pa from pyarrow.tests.test_io import gzip_compress, gzip_decompress from pyarrow.fs import (FileType, Selector, FileSystem, LocalFileSystem, - SubTreeFileSystem) + LocalFileSystemOptions, SubTreeFileSystem) @pytest.fixture @@ -39,6 +39,16 @@ def localfs(request, tempdir): ) +@pytest.fixture +def localfs_with_mmap(request, tempdir): + return dict( + fs=LocalFileSystem(use_mmap=True), + pathfn=lambda p: (tempdir / p).as_posix(), + allow_move_dir=True, + allow_append_to_file=True, + ) + + @pytest.fixture def subtree_localfs(request, tempdir, localfs): prefix = 'subtree/prefix/' @@ -91,6 +101,10 @@ def subtree_s3fs(request, s3fs): pytest.lazy_fixture('localfs'), id='LocalFileSystem()' ), + pytest.param( + pytest.lazy_fixture('localfs_with_mmap'), + id='LocalFileSystem(use_mmap=True)' + ), pytest.param( pytest.lazy_fixture('subtree_localfs'), id='SubTreeFileSystem(LocalFileSystem())' @@ -391,6 +405,26 @@ def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor, fs.open_append_stream(p, compression, buffer_size) +def test_localfs_options(): + options = LocalFileSystemOptions() + assert options.use_mmap is False + options.use_mmap = True + assert options.use_mmap is True + + with pytest.raises(AttributeError): + options.xxx = True + + options = LocalFileSystemOptions(use_mmap=True) + assert options.use_mmap is True + + # LocalFileSystem instantiation + LocalFileSystem(LocalFileSystemOptions(use_mmap=True)) + LocalFileSystem(use_mmap=False) + + with pytest.raises(AttributeError): + LocalFileSystem(xxx=False) + + @pytest.mark.s3 def test_s3_options(minio_server): from pyarrow.s3fs import S3Options