Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,27 +719,38 @@ Future<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAsync(const IOContext&,
return Future<std::shared_ptr<Buffer>>::MakeFinished(ReadAt(position, nbytes));
}

Status MemoryMappedFile::WillNeed(const std::vector<ReadRange>& ranges) {
using ::arrow::internal::MemoryRegion;

RETURN_NOT_OK(memory_map_->CheckClosed());
auto guard_resize = memory_map_->writable()
? std::unique_lock<std::mutex>(memory_map_->resize_lock())
Status MemoryMappedFile::ReadRangesToMemoryRegions(
const std::vector<ReadRange>& ranges,
std::shared_ptr<MemoryMappedFile::MemoryMap>& memory_map,
std::vector<MemoryRegion>& regions) {
RETURN_NOT_OK(memory_map->CheckClosed());
auto guard_resize = memory_map->writable()
? std::unique_lock<std::mutex>(memory_map->resize_lock())
: std::unique_lock<std::mutex>();

std::vector<MemoryRegion> regions(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {
const auto& range = ranges[i];
ARROW_ASSIGN_OR_RAISE(
auto size,
internal::ValidateReadRange(range.offset, range.length, memory_map_->size()));
DCHECK_NE(memory_map_->data(), nullptr);
regions[i] = {const_cast<uint8_t*>(memory_map_->data() + range.offset),
ARROW_ASSIGN_OR_RAISE(auto size, internal::ValidateReadRange(
range.offset, range.length, memory_map->size()));
DCHECK_NE(memory_map->data(), nullptr);
regions[i] = {const_cast<uint8_t*>(memory_map->data() + range.offset),
static_cast<size_t>(size)};
}
return Status::OK();
}

Status MemoryMappedFile::WillNeed(const std::vector<ReadRange>& ranges) {
std::vector<MemoryRegion> regions(ranges.size());
RETURN_NOT_OK(ReadRangesToMemoryRegions(ranges, memory_map_, regions));
return ::arrow::internal::MemoryAdviseWillNeed(regions);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we already have WillNeed API in MemoryMappedFile to advise OS about the needed ranges, I add an AdviseRandom API similarly to indicate the random access pattern. Initially, I would like to make this API consistent with WillNeed and simply call it Random but I think this may be slightly confusing as well, so I name it AdviseRandom currently. Let me know if you have other naming suggestion for this API.

}

Status MemoryMappedFile::AdviseRandom(const std::vector<ReadRange>& ranges) {
std::vector<MemoryRegion> regions(ranges.size());
RETURN_NOT_OK(ReadRangesToMemoryRegions(ranges, memory_map_, regions));
return ::arrow::internal::MemoryAdviseRandom(regions);
}

bool MemoryMappedFile::supports_zero_copy() const { return true; }

Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "arrow/io/concurrency.h"
#include "arrow/io/interfaces.h"
#include "arrow/util/io_util.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand Down Expand Up @@ -136,6 +137,8 @@ class ARROW_EXPORT ReadableFile
std::unique_ptr<ReadableFileImpl> impl_;
};

using ::arrow::internal::MemoryRegion;

/// \brief A file interface that uses memory-mapped files for memory interactions
///
/// This implementation supports zero-copy reads. The same class is used
Expand Down Expand Up @@ -190,6 +193,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {

Status WillNeed(const std::vector<ReadRange>& ranges) override;

Status AdviseRandom(const std::vector<ReadRange>& ranges);

bool supports_zero_copy() const override;

/// Write data at the current position in the file. Thread-safe
Expand All @@ -215,6 +220,11 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {

class ARROW_NO_EXPORT MemoryMap;
std::shared_ptr<MemoryMap> memory_map_;

Status ReadRangesToMemoryRegions(
const std::vector<ReadRange>& ranges,
std::shared_ptr<MemoryMappedFile::MemoryMap>& memory_map,
std::vector<MemoryRegion>& regions);
};

} // namespace io
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/io/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,21 @@ TEST_F(TestMemoryMappedFile, WillNeed) {
ASSERT_RAISES(IOError, mmap->WillNeed({{1025, 1}})); // Out of bounds
}

TEST_F(TestMemoryMappedFile, AdviseRandom) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);
random_bytes(1024, 0, buffer.data());

std::string path = TempFile("io-memory-map-advise-random-test");
ASSERT_OK_AND_ASSIGN(auto mmap, InitMemoryMap(buffer_size, path));
ASSERT_OK(mmap->Write(buffer.data(), buffer_size));

ASSERT_OK(mmap->AdviseRandom({}));
ASSERT_OK(mmap->AdviseRandom({{0, 4}, {100, 924}}));
ASSERT_OK(mmap->AdviseRandom({{1024, 0}}));
ASSERT_RAISES(IOError, mmap->AdviseRandom({{1025, 1}})); // Out of bounds
}

TEST_F(TestMemoryMappedFile, InvalidReads) {
std::string path = TempFile("io-memory-map-invalid-reads-test");
ASSERT_OK_AND_ASSIGN(auto result, InitMemoryMap(4096, path));
Expand Down
62 changes: 41 additions & 21 deletions cpp/src/arrow/util/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1047,21 +1047,45 @@ Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes,
#endif
}

Status MemoryAdviseWillNeed(const std::vector<MemoryRegion>& regions) {
MemoryRegion align_region(const MemoryRegion& region, size_t page_mask,
size_t page_size) {
const auto addr = reinterpret_cast<uintptr_t>(region.addr);
const auto aligned_addr = addr & page_mask;
DCHECK_LT(addr - aligned_addr, page_size);
return {reinterpret_cast<void*>(aligned_addr),
region.size + static_cast<size_t>(addr - aligned_addr)};
}

#ifdef POSIX_MADV_WILLNEED
Status MemoryAdvise(const std::vector<MemoryRegion>& regions, int advice) {
const auto page_size = static_cast<size_t>(GetPageSize());
DCHECK_GT(page_size, 0);
const size_t page_mask = ~(page_size - 1);
DCHECK_EQ(page_mask & page_size, page_size);

auto align_region = [=](const MemoryRegion& region) -> MemoryRegion {
const auto addr = reinterpret_cast<uintptr_t>(region.addr);
const auto aligned_addr = addr & page_mask;
DCHECK_LT(addr - aligned_addr, page_size);
return {reinterpret_cast<void*>(aligned_addr),
region.size + static_cast<size_t>(addr - aligned_addr)};
};
for (const auto& region : regions) {
if (region.size != 0) {
const auto aligned = align_region(region, page_mask, page_size);
int err = posix_madvise(aligned.addr, aligned.size, advice);
// EBADF can be returned on Linux in the following cases:
// - the kernel version is older than 3.9
// - the kernel was compiled with CONFIG_SWAP disabled (ARROW-9577)
if (err != 0 && err != EBADF) {
return IOErrorFromErrno(err, "posix_madvise failed");
}
}
}
return Status::OK();
}
#endif

Status MemoryAdviseWillNeed(const std::vector<MemoryRegion>& regions) {
#ifdef _WIN32
const auto page_size = static_cast<size_t>(GetPageSize());
DCHECK_GT(page_size, 0);
const size_t page_mask = ~(page_size - 1);
DCHECK_EQ(page_mask & page_size, page_size);

// PrefetchVirtualMemory() is available on Windows 8 or later
struct PrefetchEntry { // Like WIN32_MEMORY_RANGE_ENTRY
void* VirtualAddress;
Expand All @@ -1078,7 +1102,7 @@ Status MemoryAdviseWillNeed(const std::vector<MemoryRegion>& regions) {
entries.reserve(regions.size());
for (const auto& region : regions) {
if (region.size != 0) {
entries.emplace_back(align_region(region));
entries.emplace_back(align_region(region, page_mask, page_size));
}
}
if (!entries.empty() &&
Expand All @@ -1090,19 +1114,15 @@ Status MemoryAdviseWillNeed(const std::vector<MemoryRegion>& regions) {
}
return Status::OK();
#elif defined(POSIX_MADV_WILLNEED)
for (const auto& region : regions) {
if (region.size != 0) {
const auto aligned = align_region(region);
int err = posix_madvise(aligned.addr, aligned.size, POSIX_MADV_WILLNEED);
// EBADF can be returned on Linux in the following cases:
// - the kernel version is older than 3.9
// - the kernel was compiled with CONFIG_SWAP disabled (ARROW-9577)
if (err != 0 && err != EBADF) {
return IOErrorFromErrno(err, "posix_madvise failed");
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extract this piece of code from MemoryAdviseWillNeed API so that it can be used to provide other advices to OS.

return MemoryAdvise(regions, POSIX_MADV_WILLNEED);
#else
return Status::OK();
#endif
}

Status MemoryAdviseRandom(const std::vector<MemoryRegion>& regions) {
#ifdef POSIX_MADV_RANDOM
return MemoryAdvise(regions, POSIX_MADV_RANDOM);
#else
return Status::OK();
#endif
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/io_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes,
ARROW_EXPORT
Status MemoryAdviseWillNeed(const std::vector<MemoryRegion>& regions);

ARROW_EXPORT
Status MemoryAdviseRandom(const std::vector<MemoryRegion>& regions);

ARROW_EXPORT
Result<std::string> GetEnvVar(const char* name);
ARROW_EXPORT
Expand Down