diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 8ea62c6e553..5e28d4f2af7 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -91,7 +91,8 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; } - close(fd); + close(fd); // Closing this fd has an effect on performance. + ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; entry.pointer = result; entry.length = map_size; diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index d7a79650785..2de06d5f8cf 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -83,4 +83,6 @@ Status plasma_error_status(int plasma_error) { ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local; ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote; +const PlasmaStoreInfo* plasma_config; + } // namespace plasma diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 2b71da67015..66d5f3069d0 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -95,6 +95,11 @@ enum ObjectRequestType { extern int ObjectStatusLocal; extern int ObjectStatusRemote; +/// Globally accessible reference to plasma store configuration. +/// TODO(pcm): This can be avoided with some refactoring of existing code +/// by making it possible to pass a context object through dlmalloc. +struct PlasmaStoreInfo; +extern const PlasmaStoreInfo* plasma_config; } // namespace plasma #endif // PLASMA_COMMON_H diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 77a8afea754..6b9bc62ab5a 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -25,9 +25,13 @@ #include #include +#include +#include #include +#include #include "plasma/common.h" +#include "plasma/plasma.h" extern "C" { void* fake_mmap(size_t); @@ -60,12 +64,12 @@ struct mmap_record { namespace { -/** Hashtable that contains one entry per segment that we got from the OS - * via mmap. Associates the address of that segment with its file descriptor - * and size. */ +/// Hashtable that contains one entry per segment that we got from the OS +/// via mmap. Associates the address of that segment with its file descriptor +/// and size. std::unordered_map mmap_records; -} /* namespace */ +} // namespace constexpr int GRANULARITY_MULTIPLIER = 2; @@ -77,10 +81,11 @@ static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { return (unsigned char const*)pto - (unsigned char const*)pfrom; } -/* Create a buffer. This is creating a temporary file and then - * immediately unlinking it so we do not leave traces in the system. */ +// Create a buffer. This is creating a temporary file and then +// immediately unlinking it so we do not leave traces in the system. int create_buffer(int64_t size) { int fd; + std::string file_template = plasma::plasma_config->directory; #ifdef _WIN32 if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), @@ -88,53 +93,72 @@ int create_buffer(int64_t size) { fd = -1; } #else -#ifdef __linux__ - constexpr char file_template[] = "/dev/shm/plasmaXXXXXX"; -#else - constexpr char file_template[] = "/tmp/plasmaXXXXXX"; -#endif - char file_name[32]; - strncpy(file_name, file_template, 32); - fd = mkstemp(file_name); - if (fd < 0) return -1; + file_template += "/plasmaXXXXXX"; + std::vector file_name(file_template.begin(), file_template.end()); + file_name.push_back('\0'); + fd = mkstemp(&file_name[0]); + if (fd < 0) { + ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; + return -1; + } + FILE* file = fdopen(fd, "a+"); if (!file) { close(fd); + ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0]; return -1; } - if (unlink(file_name) != 0) { - ARROW_LOG(FATAL) << "unlink error"; + // Immediately unlink the file so we do not leave traces in the system. + if (unlink(&file_name[0]) != 0) { + ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; return -1; } - if (ftruncate(fd, (off_t)size) != 0) { - ARROW_LOG(FATAL) << "ftruncate error"; - return -1; + if (!plasma::plasma_config->hugepages_enabled) { + // Increase the size of the file to the desired size. This seems not to be + // needed for files that are backed by the huge page fs, see also + // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html + if (ftruncate(fd, (off_t)size) != 0) { + ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; + return -1; + } } #endif return fd; } void* fake_mmap(size_t size) { - /* Add sizeof(size_t) so that the returned pointer is deliberately not - * page-aligned. This ensures that the segments of memory returned by - * fake_mmap are never contiguous. */ + // Add sizeof(size_t) so that the returned pointer is deliberately not + // page-aligned. This ensures that the segments of memory returned by + // fake_mmap are never contiguous. size += sizeof(size_t); int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; +#ifdef __linux__ + // MAP_POPULATE will pre-populate the page tables for this memory region + // which avoids work when accessing the pages later. Only supported on Linux. + void* pointer = + mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0); +#else void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); +#endif if (pointer == MAP_FAILED) { + ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); + if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) { + ARROW_LOG(ERROR) + << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; + } return pointer; } - /* Increase dlmalloc's allocation granularity directly. */ + // Increase dlmalloc's allocation granularity directly. mparams.granularity *= GRANULARITY_MULTIPLIER; mmap_record& record = mmap_records[pointer]; record.fd = fd; record.size = size; - /* We lie to dlmalloc about where mapped memory actually lives. */ + // We lie to dlmalloc about where mapped memory actually lives. pointer = pointer_advance(pointer, sizeof(size_t)); ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; return pointer; @@ -148,8 +172,8 @@ int fake_munmap(void* addr, int64_t size) { auto entry = mmap_records.find(addr); if (entry == mmap_records.end() || entry->second.size != size) { - /* Reject requests to munmap that don't directly match previous - * calls to mmap, to prevent dlmalloc from trimming. */ + // Reject requests to munmap that don't directly match previous + // calls to mmap, to prevent dlmalloc from trimming. return -1; } @@ -163,7 +187,7 @@ int fake_munmap(void* addr, int64_t size) { } void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) { - /* TODO(rshin): Implement a more efficient search through mmap_records. */ + // TODO(rshin): Implement a more efficient search through mmap_records. for (const auto& entry : mmap_records) { if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) { *fd = entry.second.fd; @@ -176,3 +200,5 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse *map_size = 0; *offset = 0; } + +void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); } diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h index b4af2c826b5..0df720db598 100644 --- a/cpp/src/plasma/malloc.h +++ b/cpp/src/plasma/malloc.h @@ -23,4 +23,6 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset); +void set_malloc_granularity(int value); + #endif // MALLOC_H diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index d60e5a83630..476002f68c0 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -27,6 +27,7 @@ #include #include // pid_t +#include #include #include @@ -129,6 +130,13 @@ struct PlasmaStoreInfo { /// The amount of memory (in bytes) that we allow to be allocated in the /// store. int64_t memory_capacity; + /// Boolean flag indicating whether to start the object store with hugepages + /// support enabled. Huge pages are substantially larger than normal memory + /// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce + /// bookkeeping overhead from the OS. + bool hugepages_enabled; + /// A (platform-dependent) directory where to create the memory-backed file. + std::string directory; }; /// Get an entry from the object table and return NULL if the object_id diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 9f4b98c0ee7..aaa2bad67c3 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -96,9 +96,12 @@ GetRequest::GetRequest(Client* client, const std::vector& object_ids) Client::Client(int fd) : fd(fd) {} -PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory) +PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory, + bool hugepages_enabled) : loop_(loop), eviction_policy_(&store_info_) { store_info_.memory_capacity = system_memory; + store_info_.directory = directory; + store_info_.hugepages_enabled = hugepages_enabled; } // TODO(pcm): Get rid of this destructor by using RAII to clean up data. @@ -114,6 +117,8 @@ PlasmaStore::~PlasmaStore() { } } +const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; } + // If this client is not already using the object, add the client to the // object's list of clients, otherwise do nothing. void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) { @@ -633,10 +638,13 @@ class PlasmaStoreRunner { public: PlasmaStoreRunner() {} - void Start(char* socket_name, int64_t system_memory) { + void Start(char* socket_name, int64_t system_memory, std::string directory, + bool hugepages_enabled) { // Create the event loop. loop_.reset(new EventLoop); - store_.reset(new PlasmaStore(loop_.get(), system_memory)); + store_.reset( + new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled)); + plasma_config = store_->get_plasma_store_info(); int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. ARROW_CHECK(socket >= 0); @@ -670,7 +678,8 @@ void HandleSignal(int signal) { } } -void start_server(char* socket_name, int64_t system_memory) { +void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory, + bool hugepages_enabled) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); @@ -678,17 +687,26 @@ void start_server(char* socket_name, int64_t system_memory) { PlasmaStoreRunner runner; g_runner = &runner; signal(SIGTERM, HandleSignal); - runner.Start(socket_name, system_memory); + runner.Start(socket_name, system_memory, plasma_directory, hugepages_enabled); } } // namespace plasma int main(int argc, char* argv[]) { char* socket_name = NULL; + // Directory where plasma memory mapped files are stored. + std::string plasma_directory; + bool hugepages_enabled = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { switch (c) { + case 'd': + plasma_directory = std::string(optarg); + break; + case 'h': + hugepages_enabled = true; + break; case 's': socket_name = optarg; break; @@ -705,36 +723,54 @@ int main(int argc, char* argv[]) { exit(-1); } } + // Sanity check command line options. if (!socket_name) { ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; } if (system_memory == -1) { ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; } + if (hugepages_enabled && plasma_directory.empty()) { + ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " + "filesystem with -d"; + } + if (plasma_directory.empty()) { #ifdef __linux__ - // On Linux, check that the amount of memory available in /dev/shm is large - // enough to accommodate the request. If it isn't, then fail. - int shm_fd = open("/dev/shm", O_RDONLY); - struct statvfs shm_vfs_stats; - fstatvfs(shm_fd, &shm_vfs_stats); - // The value shm_vfs_stats.f_bsize is the block size, and the value - // shm_vfs_stats.f_bavail is the number of available blocks. - int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; - close(shm_fd); - if (system_memory > shm_mem_avail) { - ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The " - "request is for " - << system_memory << " bytes, and the amount available is " - << shm_mem_avail - << " bytes. You may be able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, you may need to " - "pass " - "an argument with the flag '--shm-size' to 'docker run'."; + plasma_directory = "/dev/shm"; +#else + plasma_directory = "/tmp"; +#endif + } + ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory + << " and huge page support " + << (hugepages_enabled ? "enabled" : "disabled"); +#ifdef __linux__ + if (!hugepages_enabled) { + // On Linux, check that the amount of memory available in /dev/shm is large + // enough to accommodate the request. If it isn't, then fail. + int shm_fd = open(plasma_directory.c_str(), O_RDONLY); + struct statvfs shm_vfs_stats; + fstatvfs(shm_fd, &shm_vfs_stats); + // The value shm_vfs_stats.f_bsize is the block size, and the value + // shm_vfs_stats.f_bavail is the number of available blocks. + int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; + close(shm_fd); + if (system_memory > shm_mem_avail) { + ARROW_LOG(FATAL) + << "System memory request exceeds memory available in " << plasma_directory + << ". The request is for " << system_memory + << " bytes, and the amount available is " << shm_mem_avail + << " bytes. You may be able to free up space by deleting files in " + "/dev/shm. If you are inside a Docker container, you may need to " + "pass an argument with the flag '--shm-size' to 'docker run'."; + } + } else { + set_malloc_granularity(1024 * 1024 * 1024); // 1 GB } #endif // Make it so dlmalloc fails if we try to request more memory than is // available. plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::start_server(socket_name, system_memory); + plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled); } diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index fb732a1375d..61a3a245610 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -19,6 +19,7 @@ #define PLASMA_STORE_H #include +#include #include #include "plasma/common.h" @@ -47,10 +48,14 @@ struct Client { class PlasmaStore { public: - PlasmaStore(EventLoop* loop, int64_t system_memory); + PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory, + bool hugetlbfs_enabled); ~PlasmaStore(); + /// Get a const pointer to the internal PlasmaStoreInfo object. + const PlasmaStoreInfo* get_plasma_store_info(); + /// Create a new object. The client must do a call to release_object to tell /// the store when it is done with the object. /// diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst index 832d9960cb5..e4665d187e1 100644 --- a/python/doc/source/plasma.rst +++ b/python/doc/source/plasma.rst @@ -335,3 +335,50 @@ the original Pandas ``DataFrame`` structure. # Convert back into Pandas result = record_batch.to_pandas() + +Using Plasma with Huge Pages +---------------------------- + +On Linux it is possible to use the Plasma store with huge pages for increased +throughput. You first need to create a file system and activate huge pages with + +.. code-block:: shell + + sudo mkdir -p /mnt/hugepages + gid=`id -g` + uid=`id -u` + sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages + sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group" + sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages" + +Note that you only need root access to create the file system, not for +running the object store. You can then start the Plasma store with the ``-d`` +flag for the mount point of the huge page file system and the ``-h`` flag +which indicates that huge pages are activated: + +.. code-block:: shell + + plasma_store -s /tmp/plasma -m 10000000000 -d /mnt/hugepages -h + +You can test this with the following script: + +.. code-block:: python + + import numpy as np + import pyarrow as pa + import pyarrow.plasma as plasma + import time + + client = plasma.connect("/tmp/plasma", "", 0) + + data = np.random.randn(100000000) + tensor = pa.Tensor.from_numpy(data) + + object_id = plasma.ObjectID(np.random.bytes(20)) + buf = client.create(object_id, pa.get_tensor_size(tensor)) + + stream = pa.FixedSizeBufferOutputStream(buf) + stream.set_memcopy_threads(4) + a = time.time() + pa.write_tensor(tensor, stream) + print("Writing took ", time.time() - a) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index eed9640861f..c6a9d9d0b7b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -581,6 +581,10 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil: " arrow::io::FixedSizeBufferWriter"(WriteableFile): CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer) + void set_memcopy_threads(int num_threads) + void set_memcopy_blocksize(int64_t blocksize) + void set_memcopy_threshold(int64_t threshold) + cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: enum MessageType" arrow::ipc::Message::Type": diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index eda8de73028..061a7a9a4d6 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -542,6 +542,21 @@ cdef class FixedSizeBufferOutputStream(NativeFile): self.is_writeable = 1 self.is_open = True + def set_memcopy_threads(self, int num_threads): + cdef CFixedSizeBufferWriter* writer = \ + self.wr_file.get() + writer.set_memcopy_threads(num_threads) + + def set_memcopy_blocksize(self, int64_t blocksize): + cdef CFixedSizeBufferWriter* writer = \ + self.wr_file.get() + writer.set_memcopy_blocksize(blocksize) + + def set_memcopy_threshold(self, int64_t threshold): + cdef CFixedSizeBufferWriter* writer = \ + self.wr_file.get() + writer.set_memcopy_threshold(threshold) + # ---------------------------------------------------------------------- # Arrow buffers