diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index effbfd30b4b..498ac820b6a 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -719,27 +719,38 @@ Future> MemoryMappedFile::ReadAsync(const IOContext&, return Future>::MakeFinished(ReadAt(position, nbytes)); } -Status MemoryMappedFile::WillNeed(const std::vector& ranges) { - using ::arrow::internal::MemoryRegion; - - RETURN_NOT_OK(memory_map_->CheckClosed()); - auto guard_resize = memory_map_->writable() - ? std::unique_lock(memory_map_->resize_lock()) +Status MemoryMappedFile::ReadRangesToMemoryRegions( + const std::vector& ranges, + std::shared_ptr& memory_map, + std::vector& regions) { + RETURN_NOT_OK(memory_map->CheckClosed()); + auto guard_resize = memory_map->writable() + ? std::unique_lock(memory_map->resize_lock()) : std::unique_lock(); - std::vector regions(ranges.size()); for (size_t i = 0; i < ranges.size(); ++i) { const auto& range = ranges[i]; - ARROW_ASSIGN_OR_RAISE( - auto size, - internal::ValidateReadRange(range.offset, range.length, memory_map_->size())); - DCHECK_NE(memory_map_->data(), nullptr); - regions[i] = {const_cast(memory_map_->data() + range.offset), + ARROW_ASSIGN_OR_RAISE(auto size, internal::ValidateReadRange( + range.offset, range.length, memory_map->size())); + DCHECK_NE(memory_map->data(), nullptr); + regions[i] = {const_cast(memory_map->data() + range.offset), static_cast(size)}; } + return Status::OK(); +} + +Status MemoryMappedFile::WillNeed(const std::vector& ranges) { + std::vector regions(ranges.size()); + RETURN_NOT_OK(ReadRangesToMemoryRegions(ranges, memory_map_, regions)); return ::arrow::internal::MemoryAdviseWillNeed(regions); } +Status MemoryMappedFile::AdviseRandom(const std::vector& ranges) { + std::vector regions(ranges.size()); + RETURN_NOT_OK(ReadRangesToMemoryRegions(ranges, memory_map_, regions)); + return ::arrow::internal::MemoryAdviseRandom(regions); +} + bool MemoryMappedFile::supports_zero_copy() const { return true; } Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) { diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 50d4f2c4dfc..ab54d75c2ea 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -26,6 +26,7 @@ #include "arrow/io/concurrency.h" #include "arrow/io/interfaces.h" +#include "arrow/util/io_util.h" #include "arrow/util/visibility.h" namespace arrow { @@ -136,6 +137,8 @@ class ARROW_EXPORT ReadableFile std::unique_ptr impl_; }; +using ::arrow::internal::MemoryRegion; + /// \brief A file interface that uses memory-mapped files for memory interactions /// /// This implementation supports zero-copy reads. The same class is used @@ -190,6 +193,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { Status WillNeed(const std::vector& ranges) override; + Status AdviseRandom(const std::vector& ranges); + bool supports_zero_copy() const override; /// Write data at the current position in the file. Thread-safe @@ -215,6 +220,11 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { class ARROW_NO_EXPORT MemoryMap; std::shared_ptr memory_map_; + + Status ReadRangesToMemoryRegions( + const std::vector& ranges, + std::shared_ptr& memory_map, + std::vector& regions); }; } // namespace io diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index 7d3d1c621ce..63a41cb02bc 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -689,6 +689,21 @@ TEST_F(TestMemoryMappedFile, WillNeed) { ASSERT_RAISES(IOError, mmap->WillNeed({{1025, 1}})); // Out of bounds } +TEST_F(TestMemoryMappedFile, AdviseRandom) { + const int64_t buffer_size = 1024; + std::vector buffer(buffer_size); + random_bytes(1024, 0, buffer.data()); + + std::string path = TempFile("io-memory-map-advise-random-test"); + ASSERT_OK_AND_ASSIGN(auto mmap, InitMemoryMap(buffer_size, path)); + ASSERT_OK(mmap->Write(buffer.data(), buffer_size)); + + ASSERT_OK(mmap->AdviseRandom({})); + ASSERT_OK(mmap->AdviseRandom({{0, 4}, {100, 924}})); + ASSERT_OK(mmap->AdviseRandom({{1024, 0}})); + ASSERT_RAISES(IOError, mmap->AdviseRandom({{1025, 1}})); // Out of bounds +} + TEST_F(TestMemoryMappedFile, InvalidReads) { std::string path = TempFile("io-memory-map-invalid-reads-test"); ASSERT_OK_AND_ASSIGN(auto result, InitMemoryMap(4096, path)); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index f6566ea7e36..d09d6604547 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1047,21 +1047,45 @@ Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes, #endif } -Status MemoryAdviseWillNeed(const std::vector& regions) { +MemoryRegion align_region(const MemoryRegion& region, size_t page_mask, + size_t page_size) { + const auto addr = reinterpret_cast(region.addr); + const auto aligned_addr = addr & page_mask; + DCHECK_LT(addr - aligned_addr, page_size); + return {reinterpret_cast(aligned_addr), + region.size + static_cast(addr - aligned_addr)}; +} + +#ifdef POSIX_MADV_WILLNEED +Status MemoryAdvise(const std::vector& regions, int advice) { const auto page_size = static_cast(GetPageSize()); DCHECK_GT(page_size, 0); const size_t page_mask = ~(page_size - 1); DCHECK_EQ(page_mask & page_size, page_size); - auto align_region = [=](const MemoryRegion& region) -> MemoryRegion { - const auto addr = reinterpret_cast(region.addr); - const auto aligned_addr = addr & page_mask; - DCHECK_LT(addr - aligned_addr, page_size); - return {reinterpret_cast(aligned_addr), - region.size + static_cast(addr - aligned_addr)}; - }; + for (const auto& region : regions) { + if (region.size != 0) { + const auto aligned = align_region(region, page_mask, page_size); + int err = posix_madvise(aligned.addr, aligned.size, advice); + // EBADF can be returned on Linux in the following cases: + // - the kernel version is older than 3.9 + // - the kernel was compiled with CONFIG_SWAP disabled (ARROW-9577) + if (err != 0 && err != EBADF) { + return IOErrorFromErrno(err, "posix_madvise failed"); + } + } + } + return Status::OK(); +} +#endif +Status MemoryAdviseWillNeed(const std::vector& regions) { #ifdef _WIN32 + const auto page_size = static_cast(GetPageSize()); + DCHECK_GT(page_size, 0); + const size_t page_mask = ~(page_size - 1); + DCHECK_EQ(page_mask & page_size, page_size); + // PrefetchVirtualMemory() is available on Windows 8 or later struct PrefetchEntry { // Like WIN32_MEMORY_RANGE_ENTRY void* VirtualAddress; @@ -1078,7 +1102,7 @@ Status MemoryAdviseWillNeed(const std::vector& regions) { entries.reserve(regions.size()); for (const auto& region : regions) { if (region.size != 0) { - entries.emplace_back(align_region(region)); + entries.emplace_back(align_region(region, page_mask, page_size)); } } if (!entries.empty() && @@ -1090,19 +1114,15 @@ Status MemoryAdviseWillNeed(const std::vector& regions) { } return Status::OK(); #elif defined(POSIX_MADV_WILLNEED) - for (const auto& region : regions) { - if (region.size != 0) { - const auto aligned = align_region(region); - int err = posix_madvise(aligned.addr, aligned.size, POSIX_MADV_WILLNEED); - // EBADF can be returned on Linux in the following cases: - // - the kernel version is older than 3.9 - // - the kernel was compiled with CONFIG_SWAP disabled (ARROW-9577) - if (err != 0 && err != EBADF) { - return IOErrorFromErrno(err, "posix_madvise failed"); - } - } - } + return MemoryAdvise(regions, POSIX_MADV_WILLNEED); +#else return Status::OK(); +#endif +} + +Status MemoryAdviseRandom(const std::vector& regions) { +#ifdef POSIX_MADV_RANDOM + return MemoryAdvise(regions, POSIX_MADV_RANDOM); #else return Status::OK(); #endif diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 4255dd37105..d294b5732ae 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -178,6 +178,9 @@ Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes, ARROW_EXPORT Status MemoryAdviseWillNeed(const std::vector& regions); +ARROW_EXPORT +Status MemoryAdviseRandom(const std::vector& regions); + ARROW_EXPORT Result GetEnvVar(const char* name); ARROW_EXPORT