Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,14 @@ Status StatSelector(const NativePathString& path, const Selector& select,

} // namespace

LocalFileSystem::LocalFileSystem() {}
LocalFileSystemOptions LocalFileSystemOptions::Defaults() {
return LocalFileSystemOptions();
}

LocalFileSystem::LocalFileSystem() : options_(LocalFileSystemOptions::Defaults()) {}

LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options)
: options_(options) {}

LocalFileSystem::~LocalFileSystem() {}

Expand Down Expand Up @@ -332,20 +339,34 @@ Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest
#endif
}

namespace {

template <typename OutputStreamType>
Status OpenInputStreamGeneric(const std::string& path,
const LocalFileSystemOptions& options,
std::shared_ptr<OutputStreamType>* out) {
if (options.use_mmap) {
std::shared_ptr<io::MemoryMappedFile> file;
RETURN_NOT_OK(io::MemoryMappedFile::Open(path, io::FileMode::READ, &file));
*out = std::move(file);
} else {
std::shared_ptr<io::ReadableFile> file;
RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
*out = std::move(file);
}
return Status::OK();
}

} // namespace

Status LocalFileSystem::OpenInputStream(const std::string& path,
std::shared_ptr<io::InputStream>* out) {
std::shared_ptr<io::ReadableFile> file;
RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
*out = std::move(file);
return Status::OK();
return OpenInputStreamGeneric(path, options_, out);
}

Status LocalFileSystem::OpenInputFile(const std::string& path,
std::shared_ptr<io::RandomAccessFile>* out) {
std::shared_ptr<io::ReadableFile> file;
RETURN_NOT_OK(io::ReadableFile::Open(path, &file));
*out = std::move(file);
return Status::OK();
return OpenInputStreamGeneric(path, options_, out);
}

namespace {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/filesystem/localfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
namespace arrow {
namespace fs {

/// Options for the LocalFileSystem implementation.
struct ARROW_EXPORT LocalFileSystemOptions {
/// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
/// or a regular one.
bool use_mmap = false;

/// \brief Initialize with defaults
static LocalFileSystemOptions Defaults();
};

/// \brief EXPERIMENTAL: a FileSystem implementation accessing files
/// on the local machine.
///
Expand All @@ -36,6 +46,7 @@ namespace fs {
class ARROW_EXPORT LocalFileSystem : public FileSystem {
public:
LocalFileSystem();
explicit LocalFileSystem(const LocalFileSystemOptions&);
~LocalFileSystem() override;

/// \cond FALSE
Expand Down Expand Up @@ -66,6 +77,9 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem {

Status OpenAppendStream(const std::string& path,
std::shared_ptr<io::OutputStream>* out) override;

protected:
LocalFileSystemOptions options_;
};

} // namespace fs
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ class TestLocalFSGeneric : public LocalFSTestMixin, public GenericFileSystemTest
public:
void SetUp() override {
LocalFSTestMixin::SetUp();
local_fs_ = std::make_shared<LocalFileSystem>();
local_fs_ = std::make_shared<LocalFileSystem>(options());
auto path = PathFormatter()(temp_dir_->path());
fs_ = std::make_shared<SubTreeFileSystem>(path, local_fs_);
}

protected:
virtual LocalFileSystemOptions options() { return LocalFileSystemOptions::Defaults(); }

std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; }

std::shared_ptr<LocalFileSystem> local_fs_;
Expand All @@ -88,6 +90,17 @@ TYPED_TEST_CASE(TestLocalFSGeneric, PathFormatters);

GENERIC_FS_TYPED_TEST_FUNCTIONS(TestLocalFSGeneric);

class TestLocalFSGenericMMap : public TestLocalFSGeneric<CommonPathFormatter> {
protected:
LocalFileSystemOptions options() override {
auto options = LocalFileSystemOptions::Defaults();
options.use_mmap = true;
return options;
}
};

GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGenericMMap);

////////////////////////////////////////////////////////////////////////////
// Concrete LocalFileSystem tests

Expand Down
22 changes: 21 additions & 1 deletion cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,13 @@ class MemoryMappedFile::MemoryMap

bool closed() const { return !file_->is_open(); }

Status CheckClosed() const {
if (closed()) {
return Status::Invalid("Invalid operation on closed file");
}
return Status::OK();
}

Status Open(const std::string& path, FileMode::type mode, const int64_t offset = 0,
const int64_t length = -1) {
file_.reset(new OSFile());
Expand Down Expand Up @@ -636,27 +643,34 @@ Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
}

Status MemoryMappedFile::GetSize(int64_t* size) const {
RETURN_NOT_OK(memory_map_->CheckClosed());
*size = memory_map_->size();
return Status::OK();
}

Status MemoryMappedFile::GetSize(int64_t* size) {
RETURN_NOT_OK(memory_map_->CheckClosed());
return static_cast<const MemoryMappedFile*>(this)->GetSize(size);
}

Status MemoryMappedFile::Tell(int64_t* position) const {
RETURN_NOT_OK(memory_map_->CheckClosed());
*position = memory_map_->position();
return Status::OK();
}

Status MemoryMappedFile::Seek(int64_t position) { return memory_map_->Seek(position); }
Status MemoryMappedFile::Seek(int64_t position) {
RETURN_NOT_OK(memory_map_->CheckClosed());
return memory_map_->Seek(position);
}

Status MemoryMappedFile::Close() { return memory_map_->Close(); }

bool MemoryMappedFile::closed() const { return memory_map_->closed(); }

Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
RETURN_NOT_OK(memory_map_->CheckClosed());
// if the file is writable, we acquire the lock before creating any slices
// in case a resize is triggered concurrently, otherwise we wouldn't detect
// a change in the use count
Expand All @@ -668,6 +682,7 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,

Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) {
RETURN_NOT_OK(memory_map_->CheckClosed());
auto guard_resize = memory_map_->writable()
? std::unique_lock<std::mutex>(memory_map_->resize_lock())
: std::unique_lock<std::mutex>();
Expand All @@ -680,12 +695,14 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes
}

Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
RETURN_NOT_OK(memory_map_->CheckClosed());
RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out));
memory_map_->advance(*bytes_read);
return Status::OK();
}

Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
RETURN_NOT_OK(memory_map_->CheckClosed());
RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out));
memory_map_->advance((*out)->size());
return Status::OK();
Expand All @@ -694,6 +711,7 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
bool MemoryMappedFile::supports_zero_copy() const { return true; }

Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) {
RETURN_NOT_OK(memory_map_->CheckClosed());
std::lock_guard<std::mutex> guard(memory_map_->write_lock());

if (!memory_map_->opened() || !memory_map_->writable()) {
Expand All @@ -712,6 +730,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nby
}

Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
RETURN_NOT_OK(memory_map_->CheckClosed());
std::lock_guard<std::mutex> guard(memory_map_->write_lock());

if (!memory_map_->opened() || !memory_map_->writable()) {
Expand All @@ -731,6 +750,7 @@ Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) {
}

Status MemoryMappedFile::Resize(int64_t new_size) {
RETURN_NOT_OK(memory_map_->CheckClosed());
std::unique_lock<std::mutex> write_guard(memory_map_->write_lock(), std::defer_lock);
std::unique_lock<std::mutex> resize_guard(memory_map_->resize_lock(), std::defer_lock);
std::lock(write_guard, resize_guard);
Expand Down
62 changes: 54 additions & 8 deletions python/pyarrow/_fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -453,21 +453,67 @@ cdef class FileSystem:
)


cdef class LocalFileSystemOptions:
"""Options for LocalFileSystemOptions.

Parameters
----------
use_mmap: bool, default False
Whether open_input_stream and open_input_file should return
a mmap'ed file or a regular file.
"""
cdef:
CLocalFileSystemOptions options

# Avoid mistakingly creating attributes
__slots__ = ()

def __init__(self, use_mmap=None):
self.options = CLocalFileSystemOptions.Defaults()
if use_mmap is not None:
self.use_mmap = use_mmap

@property
def use_mmap(self):
"""
Whether open_input_stream and open_input_file should return
a mmap'ed file or a regular file.
"""
return self.options.use_mmap

@use_mmap.setter
def use_mmap(self, value):
self.options.use_mmap = value


cdef class LocalFileSystem(FileSystem):
"""A FileSystem implementation accessing files on the local machine.

Details such as symlinks are abstracted away (symlinks are always followed,
except when deleting an entry).
"""

def __init__(self):
cdef shared_ptr[CLocalFileSystem] wrapped
wrapped = make_shared[CLocalFileSystem]()
self.init(<shared_ptr[CFileSystem]> wrapped)
Parameters
----------
options: LocalFileSystemOptions, default None
kwargs: individual named options, for convenience

cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.localfs = <CLocalFileSystem*> wrapped.get()
"""

def __init__(self, LocalFileSystemOptions options=None, **kwargs):
cdef:
CLocalFileSystemOptions c_options
shared_ptr[CLocalFileSystem] c_fs

options = options or LocalFileSystemOptions()
for k, v in kwargs.items():
setattr(options, k, v)
c_options = options.options
c_fs = make_shared[CLocalFileSystem](c_options)
self.init(<shared_ptr[CFileSystem]> c_fs)

cdef init(self, const shared_ptr[CFileSystem]& c_fs):
FileSystem.init(self, c_fs)
self.localfs = <CLocalFileSystem*> c_fs.get()


cdef class SubTreeFileSystem(FileSystem):
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
FileStats,
FileSystem,
LocalFileSystem,
LocalFileSystemOptions,
SubTreeFileSystem
)
9 changes: 8 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1100,8 +1100,15 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
CStatus OpenAppendStream(const c_string& path,
shared_ptr[COutputStream]* out)

cdef cppclass CLocalFileSystemOptions "arrow::fs::LocalFileSystemOptions":
c_bool use_mmap

@staticmethod
CLocalFileSystemOptions Defaults()

cdef cppclass CLocalFileSystem "arrow::fs::LocalFileSystem"(CFileSystem):
LocalFileSystem()
CLocalFileSystem()
CLocalFileSystem(CLocalFileSystemOptions)

cdef cppclass CSubTreeFileSystem \
"arrow::fs::SubTreeFileSystem"(CFileSystem):
Expand Down
36 changes: 35 additions & 1 deletion python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import pyarrow as pa
from pyarrow.tests.test_io import gzip_compress, gzip_decompress
from pyarrow.fs import (FileType, Selector, FileSystem, LocalFileSystem,
SubTreeFileSystem)
LocalFileSystemOptions, SubTreeFileSystem)


@pytest.fixture
Expand All @@ -39,6 +39,16 @@ def localfs(request, tempdir):
)


@pytest.fixture
def localfs_with_mmap(request, tempdir):
return dict(
fs=LocalFileSystem(use_mmap=True),
pathfn=lambda p: (tempdir / p).as_posix(),
allow_move_dir=True,
allow_append_to_file=True,
)


@pytest.fixture
def subtree_localfs(request, tempdir, localfs):
prefix = 'subtree/prefix/'
Expand Down Expand Up @@ -91,6 +101,10 @@ def subtree_s3fs(request, s3fs):
pytest.lazy_fixture('localfs'),
id='LocalFileSystem()'
),
pytest.param(
pytest.lazy_fixture('localfs_with_mmap'),
id='LocalFileSystem(use_mmap=True)'
),
pytest.param(
pytest.lazy_fixture('subtree_localfs'),
id='SubTreeFileSystem(LocalFileSystem())'
Expand Down Expand Up @@ -391,6 +405,26 @@ def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor,
fs.open_append_stream(p, compression, buffer_size)


def test_localfs_options():
options = LocalFileSystemOptions()
assert options.use_mmap is False
options.use_mmap = True
assert options.use_mmap is True

with pytest.raises(AttributeError):
options.xxx = True

options = LocalFileSystemOptions(use_mmap=True)
assert options.use_mmap is True

# LocalFileSystem instantiation
LocalFileSystem(LocalFileSystemOptions(use_mmap=True))
LocalFileSystem(use_mmap=False)

with pytest.raises(AttributeError):
LocalFileSystem(xxx=False)


@pytest.mark.s3
def test_s3_options(minio_server):
from pyarrow.s3fs import S3Options
Expand Down