From dd04b878f73571d1f647acee91c295a2845266bb Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Thu, 17 Aug 2017 21:32:14 -0700 Subject: [PATCH 01/16] [arrow][putperf] enable HUGETLBFS support on linux --- cpp/src/plasma/CMakeLists.txt | 8 +++++ cpp/src/plasma/client.cc | 5 +-- cpp/src/plasma/common.cc | 2 ++ cpp/src/plasma/common.h | 4 +++ cpp/src/plasma/malloc.cc | 60 ++++++++++++++++++++++++++++------- cpp/src/plasma/plasma.h | 4 +++ cpp/src/plasma/store.cc | 58 +++++++++++++++++++++++++++++---- cpp/src/plasma/store.h | 7 +++- 8 files changed, 126 insertions(+), 22 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 7e91202623e..79f3050b3ac 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -34,6 +34,14 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=20 set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion") +option(ARROW_PLASMA_HUGETLB + "Enable hugetlb support in plasma" + OFF) + +if(ARROW_PLASMA_HUGETLB) + add_definitions(-DARROW_PLASMA_HUGETLB) +endif() + # Compile flatbuffers set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs") diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 8ea62c6e553..90cd2851315 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -91,7 +91,8 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; } - close(fd); + close(fd); // PERF: closing this fd has an effect on performance. + ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; entry.pointer = result; entry.length = map_size; @@ -465,7 +466,7 @@ Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { return Status::PlasmaObjectNonexistent("Object not found"); } // Compute the hash. - uint64_t hash = compute_object_hash(object_buffer); + uint64_t hash = 0; //compute_object_hash(object_buffer); memcpy(digest, &hash, sizeof(hash)); // Release the plasma object. return Release(object_id); diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index d7a79650785..2de06d5f8cf 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -83,4 +83,6 @@ Status plasma_error_status(int plasma_error) { ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local; ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote; +const PlasmaStoreInfo* plasma_config; + } // namespace plasma diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 2b71da67015..a643f92afa4 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -95,6 +95,10 @@ enum ObjectRequestType { extern int ObjectStatusLocal; extern int ObjectStatusRemote; +/// Globally accessible reference to plasma store configuration. +/// TODO: may be avoided with some refactoring of existing code. +struct PlasmaStoreInfo; +extern const PlasmaStoreInfo* plasma_config; } // namespace plasma #endif // PLASMA_COMMON_H diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 77a8afea754..d44c92531b1 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -26,8 +26,12 @@ #include #include +#include +#include +#include #include "plasma/common.h" +#include "plasma/plasma.h" extern "C" { void* fake_mmap(size_t); @@ -40,7 +44,7 @@ int fake_munmap(void*, int64_t); #define USE_DL_PREFIX #define HAVE_MORECORE 0 #define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T -#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) +#define DEFAULT_GRANULARITY ((size_t) 1024U * 1024U * 1024U) //1GB #include "thirdparty/dlmalloc.c" // NOLINT @@ -81,6 +85,7 @@ static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { * immediately unlinking it so we do not leave traces in the system. */ int create_buffer(int64_t size) { int fd; + std::string file_template = plasma::plasma_config->directory; #ifdef _WIN32 if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), @@ -89,26 +94,48 @@ int create_buffer(int64_t size) { } #else #ifdef __linux__ - constexpr char file_template[] = "/dev/shm/plasmaXXXXXX"; + if (plasma::plasma_config->hugetlb_enabled) { + file_template += "/hugepagefile"; + } else { + file_template += "/plasmaXXXXXX"; // template + } + // constexpr char file_template[] = "/dev/shm/plasmaXXXXXX"; #else - constexpr char file_template[] = "/tmp/plasmaXXXXXX"; + // constexpr char file_template[] = "/tmp/plasmaXXXXXX"; + file_template += "/plasmaXXXXXX"; #endif - char file_name[32]; - strncpy(file_name, file_template, 32); - fd = mkstemp(file_name); - if (fd < 0) return -1; + std::vector file_name(file_template.begin(), file_template.end()); + file_name.push_back('\0'); + if (plasma::plasma_config->hugetlb_enabled) { + // create a file descriptor to a hugepage-based file. + fd = open(&file_name[0], O_CREAT | O_RDWR, 0755); + } else { + fd = mkstemp(&file_name[0]); + } + if (fd < 0) { + perror("create_buffer: open failed"); + ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; + return -1; + } + FILE* file = fdopen(fd, "a+"); if (!file) { + perror("create_buffer: fdopen failed"); close(fd); + ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0]; return -1; } - if (unlink(file_name) != 0) { + if (unlink(&file_name[0]) != 0) { + perror("create_buffer: failed to unlink file"); ARROW_LOG(FATAL) << "unlink error"; return -1; } - if (ftruncate(fd, (off_t)size) != 0) { - ARROW_LOG(FATAL) << "ftruncate error"; - return -1; + if (!plasma::plasma_config->hugetlb_enabled) { + if (ftruncate(fd, (off_t)size) != 0) { + perror("create_buffer: failed to truncate file"); + ARROW_LOG(FATAL) << "ftruncate error"; + return -1; + } } #endif return fd; @@ -122,10 +149,19 @@ void* fake_mmap(size_t size) { int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; - void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (pointer == MAP_FAILED) { + ARROW_LOG(ERROR) << "mmap failed with error : " << std::strerror(errno); return pointer; } + // Attempt to mlock the mmaped region of memory (best effort). + int rv = mlock(pointer, size); + if (rv != 0) { + ARROW_LOG(WARNING) << "mlock failed with error : " << std::strerror(errno); + } + ARROW_LOG(INFO) << "mlocking pointer " << pointer << " size " << size + << " success " << rv; + memset(pointer, 0xff, size); /* Increase dlmalloc's allocation granularity directly. */ mparams.granularity *= GRANULARITY_MULTIPLIER; diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index d60e5a83630..d661a3be35a 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -29,6 +29,7 @@ #include #include +#include #include "arrow/status.h" #include "arrow/util/logging.h" @@ -55,6 +56,7 @@ namespace plasma { /// Allocation granularity used in plasma for object allocation. #define BLOCK_SIZE 64 +#define DEFAULT_HUGETLBFS_MOUNTDIR "/mnt/hugepages" struct Client; @@ -129,6 +131,8 @@ struct PlasmaStoreInfo { /// The amount of memory (in bytes) that we allow to be allocated in the /// store. int64_t memory_capacity; + bool hugetlb_enabled; + std::string directory; }; /// Get an entry from the object table and return NULL if the object_id diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 9f4b98c0ee7..de99f90d0af 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -96,9 +96,12 @@ GetRequest::GetRequest(Client* client, const std::vector& object_ids) Client::Client(int fd) : fd(fd) {} -PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory) +PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, + std::string directory, bool hugetlb_enabled) : loop_(loop), eviction_policy_(&store_info_) { store_info_.memory_capacity = system_memory; + store_info_.directory = directory; + store_info_.hugetlb_enabled = hugetlb_enabled; } // TODO(pcm): Get rid of this destructor by using RAII to clean up data. @@ -114,6 +117,16 @@ PlasmaStore::~PlasmaStore() { } } +// Get a const reference to the internal PlasmaStoreInfo object. +const PlasmaStoreInfo& PlasmaStore::getPlasmaStoreInfoRef() { + return store_info_; +} + +// Get a const pointer to the internal PlasmaStoreInfo object. +const PlasmaStoreInfo* PlasmaStore::getPlasmaStoreInfoPtr() { + return &store_info_; +} + // If this client is not already using the object, add the client to the // object's list of clients, otherwise do nothing. void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) { @@ -633,10 +646,13 @@ class PlasmaStoreRunner { public: PlasmaStoreRunner() {} - void Start(char* socket_name, int64_t system_memory) { + void Start(char* socket_name, int64_t system_memory, std::string directory, + bool hugetlb_enabled) { // Create the event loop. loop_.reset(new EventLoop); - store_.reset(new PlasmaStore(loop_.get(), system_memory)); + store_.reset(new PlasmaStore(loop_.get(), system_memory, directory, + hugetlb_enabled)); + plasma_config = store_->getPlasmaStoreInfoPtr(); int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. ARROW_CHECK(socket >= 0); @@ -670,7 +686,8 @@ void HandleSignal(int signal) { } } -void start_server(char* socket_name, int64_t system_memory) { +void start_server(char* socket_name, int64_t system_memory, + std::string memfile_directory, bool hugetlb_enabled) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); @@ -678,17 +695,27 @@ void start_server(char* socket_name, int64_t system_memory) { PlasmaStoreRunner runner; g_runner = &runner; signal(SIGTERM, HandleSignal); - runner.Start(socket_name, system_memory); + // Pre-warm the shared memory store. + ARROW_CHECK(dlmalloc(1024) != NULL); + runner.Start(socket_name, system_memory, memfile_directory, hugetlb_enabled); } } // namespace plasma int main(int argc, char* argv[]) { char* socket_name = NULL; + std::string memfile_directory; // e.g., /dev/shm, /tmp, /mnt/hugepages + bool hugetlb_enabled = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { switch (c) { + case 'd': + memfile_directory = std::string(optarg); + break; + case 'h': + hugetlb_enabled = true; + break; case 's': socket_name = optarg; break; @@ -705,12 +732,28 @@ int main(int argc, char* argv[]) { exit(-1); } } + // Sanity check command line options. if (!socket_name) { ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; } if (system_memory == -1) { ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; } + if (memfile_directory.empty()) { + ARROW_LOG(WARNING) << "Memory-backed file directory not specified."; + // Backward compatibility: deduce directory name automatically. +#ifdef __linux__ + if (hugetlb_enabled) { + memfile_directory = DEFAULT_HUGETLBFS_MOUNTDIR; + } else { + memfile_directory = "/dev/shm"; + } +#else + memfile_directory = "/tmp"; +#endif + } + ARROW_LOG(INFO) << "Starting object store in directory " << memfile_directory + << " and HUGETLBFS " << (hugetlb_enabled?"enabled":"disabled"); #ifdef __linux__ // On Linux, check that the amount of memory available in /dev/shm is large // enough to accommodate the request. If it isn't, then fail. @@ -736,5 +779,6 @@ int main(int argc, char* argv[]) { // available. plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::start_server(socket_name, system_memory); + plasma::start_server(socket_name, system_memory, memfile_directory, + hugetlb_enabled); } diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index fb732a1375d..786788a4247 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -47,10 +47,15 @@ struct Client { class PlasmaStore { public: - PlasmaStore(EventLoop* loop, int64_t system_memory); + PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory, + bool hugetlbfs_enabled); ~PlasmaStore(); + /// Get a const reference to the internal PlasmaStoreInfo object. + const PlasmaStoreInfo& getPlasmaStoreInfoRef(); + const PlasmaStoreInfo* getPlasmaStoreInfoPtr(); + /// Create a new object. The client must do a call to release_object to tell /// the store when it is done with the object. /// From a20ca568dad0deab15faf00635f937f6d69beceb Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Thu, 17 Aug 2017 22:07:21 -0700 Subject: [PATCH 02/16] fix bug --- cpp/src/plasma/store.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index de99f90d0af..309f1a5872c 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -656,6 +656,8 @@ class PlasmaStoreRunner { int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. ARROW_CHECK(socket >= 0); + // Pre-warm the shared memory store. + ARROW_CHECK(dlmalloc(1024) != NULL); loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { this->store_->connect_client(socket); @@ -695,8 +697,6 @@ void start_server(char* socket_name, int64_t system_memory, PlasmaStoreRunner runner; g_runner = &runner; signal(SIGTERM, HandleSignal); - // Pre-warm the shared memory store. - ARROW_CHECK(dlmalloc(1024) != NULL); runner.Start(socket_name, system_memory, memfile_directory, hugetlb_enabled); } From 3073a99030a7c6cbb76bb210268e2ce7c61eb81e Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Thu, 17 Aug 2017 22:17:00 -0700 Subject: [PATCH 03/16] reenable hashing --- cpp/src/plasma/client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 90cd2851315..d751612c4a2 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -466,7 +466,7 @@ Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { return Status::PlasmaObjectNonexistent("Object not found"); } // Compute the hash. - uint64_t hash = 0; //compute_object_hash(object_buffer); + uint64_t hash = compute_object_hash(object_buffer); memcpy(digest, &hash, sizeof(hash)); // Release the plasma object. return Release(object_id); From 4702703843750013fb56e16e55426617a94a7905 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 18 Aug 2017 13:09:01 -0700 Subject: [PATCH 04/16] preliminary documentation --- python/doc/source/plasma.rst | 41 ++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst index 832d9960cb5..9d60c23c194 100644 --- a/python/doc/source/plasma.rst +++ b/python/doc/source/plasma.rst @@ -335,3 +335,44 @@ the original Pandas ``DataFrame`` structure. # Convert back into Pandas result = record_batch.to_pandas() + +Using Plasma with Huge Pages +---------------------------- + +On Linux it is possible to use the Plasma store with huge pages for increased +throughput. You first need to create a file system and activate huge pages with + +``` +sudo mkdir -p /mnt/hugepages +sudo mount -t hugetlbfs -o uid=ubuntu -o gid=adm none /mnt/hugepages +sudo bash -c "echo 4 > /proc/sys/vm/hugetlb_shm_group " +sudo bash -c "echo 2048 > /proc/sys/vm/nr_hugepages" +``` + +and then start the Plasma store with + +``` +plasma_store -s /tmp/plasma -m 1000000000 -d /mnt/hugepages -h +``` + +You can test this with the following script: + +.. code-block:: python + + import numpy as np + import pyarrow as pa + import pyarrow.plasma as plasma + import time + + client = plasma.connect("/tmp/plasma", "", 0) + + data = np.random.randn(1000000000) + tensor = pa.Tensor.from_numpy(data) + + object_id = plasma.ObjectID(np.random.bytes(20)) + buf = client.create(object_id, pa.get_tensor_size(tensor)) + + stream = pa.FixedSizeBufferOutputStream(buf) + a = time.time() + pa.write_tensor(tensor, stream) + print("Writing took ", time.time() - a) From c52f21126c8ed8093ada0e3128b99f474e77c648 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 18 Aug 2017 13:45:16 -0700 Subject: [PATCH 05/16] cleanups (TODO: See if memory locking helps) --- cpp/src/plasma/client.cc | 2 +- cpp/src/plasma/common.h | 3 ++- cpp/src/plasma/malloc.cc | 30 +++--------------------------- cpp/src/plasma/plasma.h | 1 - cpp/src/plasma/store.cc | 24 +++++------------------- cpp/src/plasma/store.h | 5 ++--- 6 files changed, 13 insertions(+), 52 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index d751612c4a2..8b86eaf8e50 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -91,7 +91,7 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; } - close(fd); // PERF: closing this fd has an effect on performance. + close(fd); // Closing this fd has an effect on performance. ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; entry.pointer = result; diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index a643f92afa4..66d5f3069d0 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -96,7 +96,8 @@ extern int ObjectStatusLocal; extern int ObjectStatusRemote; /// Globally accessible reference to plasma store configuration. -/// TODO: may be avoided with some refactoring of existing code. +/// TODO(pcm): This can be avoided with some refactoring of existing code +/// by making it possible to pass a context object through dlmalloc. struct PlasmaStoreInfo; extern const PlasmaStoreInfo* plasma_config; } // namespace plasma diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index d44c92531b1..d1a9061b506 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -26,9 +26,7 @@ #include #include -#include #include -#include #include "plasma/common.h" #include "plasma/plasma.h" @@ -44,7 +42,7 @@ int fake_munmap(void*, int64_t); #define USE_DL_PREFIX #define HAVE_MORECORE 0 #define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T -#define DEFAULT_GRANULARITY ((size_t) 1024U * 1024U * 1024U) //1GB +#define DEFAULT_GRANULARITY ((size_t) 1024U * 1024U * 1024U) // 1GB #include "thirdparty/dlmalloc.c" // NOLINT @@ -93,17 +91,7 @@ int create_buffer(int64_t size) { fd = -1; } #else -#ifdef __linux__ - if (plasma::plasma_config->hugetlb_enabled) { - file_template += "/hugepagefile"; - } else { - file_template += "/plasmaXXXXXX"; // template - } - // constexpr char file_template[] = "/dev/shm/plasmaXXXXXX"; -#else - // constexpr char file_template[] = "/tmp/plasmaXXXXXX"; file_template += "/plasmaXXXXXX"; -#endif std::vector file_name(file_template.begin(), file_template.end()); file_name.push_back('\0'); if (plasma::plasma_config->hugetlb_enabled) { @@ -113,27 +101,23 @@ int create_buffer(int64_t size) { fd = mkstemp(&file_name[0]); } if (fd < 0) { - perror("create_buffer: open failed"); ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; return -1; } FILE* file = fdopen(fd, "a+"); if (!file) { - perror("create_buffer: fdopen failed"); close(fd); ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0]; return -1; } if (unlink(&file_name[0]) != 0) { - perror("create_buffer: failed to unlink file"); - ARROW_LOG(FATAL) << "unlink error"; + ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; return -1; } if (!plasma::plasma_config->hugetlb_enabled) { if (ftruncate(fd, (off_t)size) != 0) { - perror("create_buffer: failed to truncate file"); - ARROW_LOG(FATAL) << "ftruncate error"; + ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; return -1; } } @@ -154,14 +138,6 @@ void* fake_mmap(size_t size) { ARROW_LOG(ERROR) << "mmap failed with error : " << std::strerror(errno); return pointer; } - // Attempt to mlock the mmaped region of memory (best effort). - int rv = mlock(pointer, size); - if (rv != 0) { - ARROW_LOG(WARNING) << "mlock failed with error : " << std::strerror(errno); - } - ARROW_LOG(INFO) << "mlocking pointer " << pointer << " size " << size - << " success " << rv; - memset(pointer, 0xff, size); /* Increase dlmalloc's allocation granularity directly. */ mparams.granularity *= GRANULARITY_MULTIPLIER; diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index d661a3be35a..774d76f8df5 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -56,7 +56,6 @@ namespace plasma { /// Allocation granularity used in plasma for object allocation. #define BLOCK_SIZE 64 -#define DEFAULT_HUGETLBFS_MOUNTDIR "/mnt/hugepages" struct Client; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 309f1a5872c..c1586b6511c 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -117,13 +117,7 @@ PlasmaStore::~PlasmaStore() { } } -// Get a const reference to the internal PlasmaStoreInfo object. -const PlasmaStoreInfo& PlasmaStore::getPlasmaStoreInfoRef() { - return store_info_; -} - -// Get a const pointer to the internal PlasmaStoreInfo object. -const PlasmaStoreInfo* PlasmaStore::getPlasmaStoreInfoPtr() { +const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; } @@ -652,12 +646,10 @@ class PlasmaStoreRunner { loop_.reset(new EventLoop); store_.reset(new PlasmaStore(loop_.get(), system_memory, directory, hugetlb_enabled)); - plasma_config = store_->getPlasmaStoreInfoPtr(); + plasma_config = store_->get_plasma_store_info(); int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. ARROW_CHECK(socket >= 0); - // Pre-warm the shared memory store. - ARROW_CHECK(dlmalloc(1024) != NULL); loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { this->store_->connect_client(socket); @@ -740,20 +732,14 @@ int main(int argc, char* argv[]) { ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; } if (memfile_directory.empty()) { - ARROW_LOG(WARNING) << "Memory-backed file directory not specified."; - // Backward compatibility: deduce directory name automatically. #ifdef __linux__ - if (hugetlb_enabled) { - memfile_directory = DEFAULT_HUGETLBFS_MOUNTDIR; - } else { - memfile_directory = "/dev/shm"; - } + memfile_directory = "/dev/shm"; #else memfile_directory = "/tmp"; #endif } - ARROW_LOG(INFO) << "Starting object store in directory " << memfile_directory - << " and HUGETLBFS " << (hugetlb_enabled?"enabled":"disabled"); + ARROW_LOG(INFO) << "Starting object store with directory " << memfile_directory + << " and huge page support " << (hugetlb_enabled ? "enabled" : "disabled"); #ifdef __linux__ // On Linux, check that the amount of memory available in /dev/shm is large // enough to accommodate the request. If it isn't, then fail. diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 786788a4247..1b8a923008e 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -52,9 +52,8 @@ class PlasmaStore { ~PlasmaStore(); - /// Get a const reference to the internal PlasmaStoreInfo object. - const PlasmaStoreInfo& getPlasmaStoreInfoRef(); - const PlasmaStoreInfo* getPlasmaStoreInfoPtr(); + /// Get a const pointer to the internal PlasmaStoreInfo object. + const PlasmaStoreInfo* get_plasma_store_info(); /// Create a new object. The client must do a call to release_object to tell /// the store when it is done with the object. From ce90ef484e48b526488c6384f1fc8812cd8106c3 Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Fri, 18 Aug 2017 16:26:49 -0700 Subject: [PATCH 06/16] documenting new plasma store info fields --- cpp/src/plasma/plasma.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 774d76f8df5..627c67b18a6 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -130,7 +130,10 @@ struct PlasmaStoreInfo { /// The amount of memory (in bytes) that we allow to be allocated in the /// store. int64_t memory_capacity; + /// Boolean flag indicating whether to start the object store with hugepages + /// support enabled. bool hugetlb_enabled; + /// A (platform-dependent) directory where to create the memory-backed file. std::string directory; }; From 98b603e1caa7439aa20e1e256ce76f296d382ee6 Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Fri, 18 Aug 2017 18:03:13 -0700 Subject: [PATCH 07/16] map_populate on linux; fall back to mlock/memset otherwise --- cpp/src/plasma/malloc.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index d1a9061b506..c6cf1af58b4 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -133,11 +133,28 @@ void* fake_mmap(size_t size) { int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; +#ifdef __linux__ + void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, fd, 0); + if (pointer == MAP_FAILED) { + ARROW_LOG(ERROR) << "mmap failed with error : " << std::strerror(errno); + return pointer; + } +#else void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (pointer == MAP_FAILED) { ARROW_LOG(ERROR) << "mmap failed with error : " << std::strerror(errno); return pointer; } + // Attempt to mlock the mmaped region of memory (best effort). + int rv = mlock(pointer, size); + if (rv != 0) { + ARROW_LOG(WARNING) << "(best effort) mlock failed"; + // Attempt to memset the mmaped region of memory (best effort). + memset(pointer, 0xff, size); + } +#endif + ARROW_LOG(INFO) << "mmaping pointer " << pointer << " size " << size; /* Increase dlmalloc's allocation granularity directly. */ mparams.granularity *= GRANULARITY_MULTIPLIER; From 7260d59ca7379b9ec61c1260eaa8753f8662a420 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 18 Aug 2017 23:15:44 -0700 Subject: [PATCH 08/16] expose number of threads to python and try out cleanups --- cpp/src/plasma/CMakeLists.txt | 8 +-- cpp/src/plasma/malloc.cc | 26 ++++------ cpp/src/plasma/plasma.h | 2 +- cpp/src/plasma/store.cc | 76 +++++++++++++++------------- python/pyarrow/includes/libarrow.pxd | 4 ++ python/pyarrow/io.pxi | 12 +++++ 6 files changed, 71 insertions(+), 57 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 79f3050b3ac..94fc903529b 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -34,12 +34,12 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=20 set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion") -option(ARROW_PLASMA_HUGETLB - "Enable hugetlb support in plasma" +option(ARROW_PLASMA_HUGEPAGES + "Enable huge page tables support in plasma" OFF) -if(ARROW_PLASMA_HUGETLB) - add_definitions(-DARROW_PLASMA_HUGETLB) +if(ARROW_PLASMA_HUGEPAGES) + add_definitions(-DARROW_PLASMA_HUGEPAGES) endif() # Compile flatbuffers diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index c6cf1af58b4..baa1475ec03 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -42,7 +42,12 @@ int fake_munmap(void*, int64_t); #define USE_DL_PREFIX #define HAVE_MORECORE 0 #define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T + +#ifndef ARROW_PLASMA_HUGEPAGES +#define DEFAULT_GRANULARITY ((size_t) 128U * 1024U) // 128KB +#else #define DEFAULT_GRANULARITY ((size_t) 1024U * 1024U * 1024U) // 1GB +#endif #include "thirdparty/dlmalloc.c" // NOLINT @@ -94,8 +99,7 @@ int create_buffer(int64_t size) { file_template += "/plasmaXXXXXX"; std::vector file_name(file_template.begin(), file_template.end()); file_name.push_back('\0'); - if (plasma::plasma_config->hugetlb_enabled) { - // create a file descriptor to a hugepage-based file. + if (plasma::plasma_config->hugepages_enabled) { fd = open(&file_name[0], O_CREAT | O_RDWR, 0755); } else { fd = mkstemp(&file_name[0]); @@ -115,7 +119,7 @@ int create_buffer(int64_t size) { ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; return -1; } - if (!plasma::plasma_config->hugetlb_enabled) { + if (!plasma::plasma_config->hugepages_enabled) { if (ftruncate(fd, (off_t)size) != 0) { ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; return -1; @@ -136,25 +140,13 @@ void* fake_mmap(size_t size) { #ifdef __linux__ void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0); - if (pointer == MAP_FAILED) { - ARROW_LOG(ERROR) << "mmap failed with error : " << std::strerror(errno); - return pointer; - } #else void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); +#endif if (pointer == MAP_FAILED) { - ARROW_LOG(ERROR) << "mmap failed with error : " << std::strerror(errno); + ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); return pointer; } - // Attempt to mlock the mmaped region of memory (best effort). - int rv = mlock(pointer, size); - if (rv != 0) { - ARROW_LOG(WARNING) << "(best effort) mlock failed"; - // Attempt to memset the mmaped region of memory (best effort). - memset(pointer, 0xff, size); - } -#endif - ARROW_LOG(INFO) << "mmaping pointer " << pointer << " size " << size; /* Increase dlmalloc's allocation granularity directly. */ mparams.granularity *= GRANULARITY_MULTIPLIER; diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 627c67b18a6..2fa05598662 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -132,7 +132,7 @@ struct PlasmaStoreInfo { int64_t memory_capacity; /// Boolean flag indicating whether to start the object store with hugepages /// support enabled. - bool hugetlb_enabled; + bool hugepages_enabled; /// A (platform-dependent) directory where to create the memory-backed file. std::string directory; }; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c1586b6511c..39a7eac1392 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -97,11 +97,11 @@ GetRequest::GetRequest(Client* client, const std::vector& object_ids) Client::Client(int fd) : fd(fd) {} PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, - std::string directory, bool hugetlb_enabled) + std::string directory, bool hugepages_enabled) : loop_(loop), eviction_policy_(&store_info_) { store_info_.memory_capacity = system_memory; store_info_.directory = directory; - store_info_.hugetlb_enabled = hugetlb_enabled; + store_info_.hugepages_enabled = hugepages_enabled; } // TODO(pcm): Get rid of this destructor by using RAII to clean up data. @@ -641,11 +641,11 @@ class PlasmaStoreRunner { PlasmaStoreRunner() {} void Start(char* socket_name, int64_t system_memory, std::string directory, - bool hugetlb_enabled) { + bool hugepages_enabled) { // Create the event loop. loop_.reset(new EventLoop); store_.reset(new PlasmaStore(loop_.get(), system_memory, directory, - hugetlb_enabled)); + hugepages_enabled)); plasma_config = store_->get_plasma_store_info(); int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. @@ -681,7 +681,7 @@ void HandleSignal(int signal) { } void start_server(char* socket_name, int64_t system_memory, - std::string memfile_directory, bool hugetlb_enabled) { + std::string plasma_directory, bool hugepages_enabled) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); @@ -689,24 +689,25 @@ void start_server(char* socket_name, int64_t system_memory, PlasmaStoreRunner runner; g_runner = &runner; signal(SIGTERM, HandleSignal); - runner.Start(socket_name, system_memory, memfile_directory, hugetlb_enabled); + runner.Start(socket_name, system_memory, plasma_directory, hugepages_enabled); } } // namespace plasma int main(int argc, char* argv[]) { char* socket_name = NULL; - std::string memfile_directory; // e.g., /dev/shm, /tmp, /mnt/hugepages - bool hugetlb_enabled = false; + // Directory where plasma memory mapped files are stored. + std::string plasma_directory; + bool hugepages_enabled = false; int64_t system_memory = -1; int c; while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { switch (c) { case 'd': - memfile_directory = std::string(optarg); + plasma_directory = std::string(optarg); break; case 'h': - hugetlb_enabled = true; + hugepages_enabled = true; break; case 's': socket_name = optarg; @@ -731,40 +732,45 @@ int main(int argc, char* argv[]) { if (system_memory == -1) { ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; } - if (memfile_directory.empty()) { + if (hugepages_enabled && plasma_directory.empty()) { + ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages filesystem with -d"; + } + if (plasma_directory.empty()) { #ifdef __linux__ - memfile_directory = "/dev/shm"; + plasma_directory = "/dev/shm"; #else - memfile_directory = "/tmp"; + plasma_directory = "/tmp"; #endif } - ARROW_LOG(INFO) << "Starting object store with directory " << memfile_directory - << " and huge page support " << (hugetlb_enabled ? "enabled" : "disabled"); + ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory + << " and huge page support " << (hugepages_enabled ? "enabled" : "disabled"); #ifdef __linux__ - // On Linux, check that the amount of memory available in /dev/shm is large - // enough to accommodate the request. If it isn't, then fail. - int shm_fd = open("/dev/shm", O_RDONLY); - struct statvfs shm_vfs_stats; - fstatvfs(shm_fd, &shm_vfs_stats); - // The value shm_vfs_stats.f_bsize is the block size, and the value - // shm_vfs_stats.f_bavail is the number of available blocks. - int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; - close(shm_fd); - if (system_memory > shm_mem_avail) { - ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The " - "request is for " - << system_memory << " bytes, and the amount available is " - << shm_mem_avail - << " bytes. You may be able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, you may need to " - "pass " - "an argument with the flag '--shm-size' to 'docker run'."; + if (!hugepages_enabled) { + // On Linux, check that the amount of memory available in /dev/shm is large + // enough to accommodate the request. If it isn't, then fail. + int shm_fd = open(plasma_directory.c_str(), O_RDONLY); + struct statvfs shm_vfs_stats; + fstatvfs(shm_fd, &shm_vfs_stats); + // The value shm_vfs_stats.f_bsize is the block size, and the value + // shm_vfs_stats.f_bavail is the number of available blocks. + int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; + close(shm_fd); + if (system_memory > shm_mem_avail) { + ARROW_LOG(FATAL) << "System memory request exceeds memory available in " + << plasma_directory << ". The request is for " + << system_memory << " bytes, and the amount available is " + << shm_mem_avail + << " bytes. You may be able to free up space by deleting files in " + "/dev/shm. If you are inside a Docker container, you may need to " + "pass " + "an argument with the flag '--shm-size' to 'docker run'."; + } } #endif // Make it so dlmalloc fails if we try to request more memory than is // available. plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::start_server(socket_name, system_memory, memfile_directory, - hugetlb_enabled); + plasma::start_server(socket_name, system_memory, plasma_directory, + hugepages_enabled); } diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index eed9640861f..c6a9d9d0b7b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -581,6 +581,10 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil: " arrow::io::FixedSizeBufferWriter"(WriteableFile): CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer) + void set_memcopy_threads(int num_threads) + void set_memcopy_blocksize(int64_t blocksize) + void set_memcopy_threshold(int64_t threshold) + cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: enum MessageType" arrow::ipc::Message::Type": diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index eda8de73028..f8033d8c319 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -542,6 +542,18 @@ cdef class FixedSizeBufferOutputStream(NativeFile): self.is_writeable = 1 self.is_open = True + def set_memcopy_threads(self, int num_threads): + cdef CFixedSizeBufferWriter* writer = self.wr_file.get() + writer.set_memcopy_threads(num_threads) + + def set_memcopy_blocksize(self, int64_t blocksize): + cdef CFixedSizeBufferWriter* writer = self.wr_file.get() + writer.set_memcopy_blocksize(blocksize) + + def set_memcopy_threshold(self, int64_t threshold): + cdef CFixedSizeBufferWriter* writer = self.wr_file.get() + writer.set_memcopy_threshold(threshold) + # ---------------------------------------------------------------------- # Arrow buffers From fb8e1b41189378f1dfdee9896c04ad7f2ed42551 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 00:04:52 -0700 Subject: [PATCH 09/16] add helpful error message --- cpp/src/plasma/malloc.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index baa1475ec03..b530146cc79 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -99,11 +99,7 @@ int create_buffer(int64_t size) { file_template += "/plasmaXXXXXX"; std::vector file_name(file_template.begin(), file_template.end()); file_name.push_back('\0'); - if (plasma::plasma_config->hugepages_enabled) { - fd = open(&file_name[0], O_CREAT | O_RDWR, 0755); - } else { - fd = mkstemp(&file_name[0]); - } + fd = mkstemp(&file_name[0]); if (fd < 0) { ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; return -1; @@ -145,6 +141,9 @@ void* fake_mmap(size_t size) { #endif if (pointer == MAP_FAILED) { ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); + if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) { + ARROW_LOG(ERROR) << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; + } return pointer; } From 4c976bba985b70c3a7893ff4e3fbb4493e8301af Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 01:17:41 -0700 Subject: [PATCH 10/16] make format --- cpp/src/plasma/client.cc | 2 +- cpp/src/plasma/malloc.cc | 15 ++++++++------- cpp/src/plasma/plasma.h | 2 +- cpp/src/plasma/store.cc | 40 +++++++++++++++++++--------------------- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 8b86eaf8e50..5e28d4f2af7 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -91,7 +91,7 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; } - close(fd); // Closing this fd has an effect on performance. + close(fd); // Closing this fd has an effect on performance. ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; entry.pointer = result; diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index b530146cc79..545e7e8176d 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -25,8 +25,8 @@ #include #include -#include #include +#include #include "plasma/common.h" #include "plasma/plasma.h" @@ -44,9 +44,9 @@ int fake_munmap(void*, int64_t); #define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T #ifndef ARROW_PLASMA_HUGEPAGES -#define DEFAULT_GRANULARITY ((size_t) 128U * 1024U) // 128KB +#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) // 128KB #else -#define DEFAULT_GRANULARITY ((size_t) 1024U * 1024U * 1024U) // 1GB +#define DEFAULT_GRANULARITY ((size_t)1024U * 1024U * 1024U) // 1GB #endif #include "thirdparty/dlmalloc.c" // NOLINT @@ -134,15 +134,16 @@ void* fake_mmap(size_t size) { int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; #ifdef __linux__ - void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, - MAP_SHARED | MAP_POPULATE, fd, 0); + void* pointer = + mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0); #else - void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); #endif if (pointer == MAP_FAILED) { ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) { - ARROW_LOG(ERROR) << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; + ARROW_LOG(ERROR) + << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; } return pointer; } diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 2fa05598662..dfe446a248f 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -27,9 +27,9 @@ #include #include // pid_t +#include #include #include -#include #include "arrow/status.h" #include "arrow/util/logging.h" diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 39a7eac1392..22d7b960575 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -96,8 +96,8 @@ GetRequest::GetRequest(Client* client, const std::vector& object_ids) Client::Client(int fd) : fd(fd) {} -PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, - std::string directory, bool hugepages_enabled) +PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory, + bool hugepages_enabled) : loop_(loop), eviction_policy_(&store_info_) { store_info_.memory_capacity = system_memory; store_info_.directory = directory; @@ -117,9 +117,7 @@ PlasmaStore::~PlasmaStore() { } } -const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { - return &store_info_; -} +const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; } // If this client is not already using the object, add the client to the // object's list of clients, otherwise do nothing. @@ -644,8 +642,8 @@ class PlasmaStoreRunner { bool hugepages_enabled) { // Create the event loop. loop_.reset(new EventLoop); - store_.reset(new PlasmaStore(loop_.get(), system_memory, directory, - hugepages_enabled)); + store_.reset( + new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled)); plasma_config = store_->get_plasma_store_info(); int socket = bind_ipc_sock(socket_name, true); // TODO(pcm): Check return value. @@ -680,8 +678,8 @@ void HandleSignal(int signal) { } } -void start_server(char* socket_name, int64_t system_memory, - std::string plasma_directory, bool hugepages_enabled) { +void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory, + bool hugepages_enabled) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); @@ -733,7 +731,8 @@ int main(int argc, char* argv[]) { ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; } if (hugepages_enabled && plasma_directory.empty()) { - ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages filesystem with -d"; + ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " + "filesystem with -d"; } if (plasma_directory.empty()) { #ifdef __linux__ @@ -743,7 +742,8 @@ int main(int argc, char* argv[]) { #endif } ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory - << " and huge page support " << (hugepages_enabled ? "enabled" : "disabled"); + << " and huge page support " + << (hugepages_enabled ? "enabled" : "disabled"); #ifdef __linux__ if (!hugepages_enabled) { // On Linux, check that the amount of memory available in /dev/shm is large @@ -756,14 +756,13 @@ int main(int argc, char* argv[]) { int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; close(shm_fd); if (system_memory > shm_mem_avail) { - ARROW_LOG(FATAL) << "System memory request exceeds memory available in " - << plasma_directory << ". The request is for " - << system_memory << " bytes, and the amount available is " - << shm_mem_avail - << " bytes. You may be able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, you may need to " - "pass " - "an argument with the flag '--shm-size' to 'docker run'."; + ARROW_LOG(FATAL) + << "System memory request exceeds memory available in " << plasma_directory + << ". The request is for " << system_memory + << " bytes, and the amount available is " << shm_mem_avail + << " bytes. You may be able to free up space by deleting files in " + "/dev/shm. If you are inside a Docker container, you may need to " + "pass an argument with the flag '--shm-size' to 'docker run'."; } } #endif @@ -771,6 +770,5 @@ int main(int argc, char* argv[]) { // available. plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::start_server(socket_name, system_memory, plasma_directory, - hugepages_enabled); + plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled); } From 713a0c486d85ec478c422b2313a3dcf015e2be6d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 01:38:39 -0700 Subject: [PATCH 11/16] add missing includes --- cpp/src/plasma/malloc.cc | 2 ++ cpp/src/plasma/store.h | 1 + 2 files changed, 3 insertions(+) diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 545e7e8176d..610217fb074 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -26,7 +26,9 @@ #include #include +#include #include +#include #include "plasma/common.h" #include "plasma/plasma.h" diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 1b8a923008e..50912e19f38 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -20,6 +20,7 @@ #include #include +#include #include "plasma/common.h" #include "plasma/events.h" From 225429b4415b3eaa4478a01db932b6c24f8ae023 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 02:09:36 -0700 Subject: [PATCH 12/16] update documentation with Alexey's fix --- python/doc/source/plasma.rst | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst index 9d60c23c194..e4665d187e1 100644 --- a/python/doc/source/plasma.rst +++ b/python/doc/source/plasma.rst @@ -342,18 +342,23 @@ Using Plasma with Huge Pages On Linux it is possible to use the Plasma store with huge pages for increased throughput. You first need to create a file system and activate huge pages with -``` -sudo mkdir -p /mnt/hugepages -sudo mount -t hugetlbfs -o uid=ubuntu -o gid=adm none /mnt/hugepages -sudo bash -c "echo 4 > /proc/sys/vm/hugetlb_shm_group " -sudo bash -c "echo 2048 > /proc/sys/vm/nr_hugepages" -``` +.. code-block:: shell + + sudo mkdir -p /mnt/hugepages + gid=`id -g` + uid=`id -u` + sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages + sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group" + sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages" -and then start the Plasma store with +Note that you only need root access to create the file system, not for +running the object store. You can then start the Plasma store with the ``-d`` +flag for the mount point of the huge page file system and the ``-h`` flag +which indicates that huge pages are activated: + +.. code-block:: shell -``` -plasma_store -s /tmp/plasma -m 1000000000 -d /mnt/hugepages -h -``` + plasma_store -s /tmp/plasma -m 10000000000 -d /mnt/hugepages -h You can test this with the following script: @@ -366,13 +371,14 @@ You can test this with the following script: client = plasma.connect("/tmp/plasma", "", 0) - data = np.random.randn(1000000000) + data = np.random.randn(100000000) tensor = pa.Tensor.from_numpy(data) object_id = plasma.ObjectID(np.random.bytes(20)) buf = client.create(object_id, pa.get_tensor_size(tensor)) stream = pa.FixedSizeBufferOutputStream(buf) + stream.set_memcopy_threads(4) a = time.time() pa.write_tensor(tensor, stream) print("Writing took ", time.time() - a) From ffb991634a309fcf56be7516d4682673f48f7e3b Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 16:11:31 -0700 Subject: [PATCH 13/16] address comments --- cpp/src/plasma/CMakeLists.txt | 8 -------- cpp/src/plasma/malloc.cc | 10 +++++----- cpp/src/plasma/malloc.h | 2 ++ cpp/src/plasma/store.cc | 2 ++ 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 94fc903529b..7e91202623e 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -34,14 +34,6 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=20 set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion") -option(ARROW_PLASMA_HUGEPAGES - "Enable huge page tables support in plasma" - OFF) - -if(ARROW_PLASMA_HUGEPAGES) - add_definitions(-DARROW_PLASMA_HUGEPAGES) -endif() - # Compile flatbuffers set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs") diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 610217fb074..21f8e9b71af 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -45,11 +45,7 @@ int fake_munmap(void*, int64_t); #define HAVE_MORECORE 0 #define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T -#ifndef ARROW_PLASMA_HUGEPAGES -#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) // 128KB -#else -#define DEFAULT_GRANULARITY ((size_t)1024U * 1024U * 1024U) // 1GB -#endif +#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) #include "thirdparty/dlmalloc.c" // NOLINT @@ -199,3 +195,7 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse *map_size = 0; *offset = 0; } + +void set_malloc_granularity(int value) { + change_mparam(M_GRANULARITY, value); +} diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h index b4af2c826b5..0df720db598 100644 --- a/cpp/src/plasma/malloc.h +++ b/cpp/src/plasma/malloc.h @@ -23,4 +23,6 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset); +void set_malloc_granularity(int value); + #endif // MALLOC_H diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 22d7b960575..c22deb3efeb 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -764,6 +764,8 @@ int main(int argc, char* argv[]) { "/dev/shm. If you are inside a Docker container, you may need to " "pass an argument with the flag '--shm-size' to 'docker run'."; } + } else { + set_malloc_granularity(1024 * 1024 * 1024); // 1 GB } #endif // Make it so dlmalloc fails if we try to request more memory than is From 22188a6c4417270feeffa5a331782d011332a891 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 16:29:24 -0700 Subject: [PATCH 14/16] formatting --- cpp/src/plasma/malloc.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 21f8e9b71af..6ea69de6ecb 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -44,7 +44,6 @@ int fake_munmap(void*, int64_t); #define USE_DL_PREFIX #define HAVE_MORECORE 0 #define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T - #define DEFAULT_GRANULARITY ((size_t)128U * 1024U) #include "thirdparty/dlmalloc.c" // NOLINT From 5aa4b0d0cdb26e8a0ac0e05ef3a40634a168f3a8 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 20 Aug 2017 00:18:09 +0000 Subject: [PATCH 15/16] preflight script formatting changes --- cpp/src/plasma/malloc.cc | 4 +--- cpp/src/plasma/store.cc | 2 +- cpp/src/plasma/store.h | 2 +- python/pyarrow/io.pxi | 9 ++++++--- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 6ea69de6ecb..64f7bfdc2ac 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -195,6 +195,4 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse *offset = 0; } -void set_malloc_granularity(int value) { - change_mparam(M_GRANULARITY, value); -} +void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); } diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c22deb3efeb..aaa2bad67c3 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -765,7 +765,7 @@ int main(int argc, char* argv[]) { "pass an argument with the flag '--shm-size' to 'docker run'."; } } else { - set_malloc_granularity(1024 * 1024 * 1024); // 1 GB + set_malloc_granularity(1024 * 1024 * 1024); // 1 GB } #endif // Make it so dlmalloc fails if we try to request more memory than is diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 50912e19f38..61a3a245610 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -19,8 +19,8 @@ #define PLASMA_STORE_H #include -#include #include +#include #include "plasma/common.h" #include "plasma/events.h" diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index f8033d8c319..061a7a9a4d6 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -543,15 +543,18 @@ cdef class FixedSizeBufferOutputStream(NativeFile): self.is_open = True def set_memcopy_threads(self, int num_threads): - cdef CFixedSizeBufferWriter* writer = self.wr_file.get() + cdef CFixedSizeBufferWriter* writer = \ + self.wr_file.get() writer.set_memcopy_threads(num_threads) def set_memcopy_blocksize(self, int64_t blocksize): - cdef CFixedSizeBufferWriter* writer = self.wr_file.get() + cdef CFixedSizeBufferWriter* writer = \ + self.wr_file.get() writer.set_memcopy_blocksize(blocksize) def set_memcopy_threshold(self, int64_t threshold): - cdef CFixedSizeBufferWriter* writer = self.wr_file.get() + cdef CFixedSizeBufferWriter* writer = \ + self.wr_file.get() writer.set_memcopy_threshold(threshold) From 077b78f8c70151e0caf3114df6f0591b82933e9d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 19 Aug 2017 17:37:16 -0700 Subject: [PATCH 16/16] add more comments --- cpp/src/plasma/malloc.cc | 34 ++++++++++++++++++++-------------- cpp/src/plasma/plasma.h | 4 +++- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 64f7bfdc2ac..6b9bc62ab5a 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -64,12 +64,12 @@ struct mmap_record { namespace { -/** Hashtable that contains one entry per segment that we got from the OS - * via mmap. Associates the address of that segment with its file descriptor - * and size. */ +/// Hashtable that contains one entry per segment that we got from the OS +/// via mmap. Associates the address of that segment with its file descriptor +/// and size. std::unordered_map mmap_records; -} /* namespace */ +} // namespace constexpr int GRANULARITY_MULTIPLIER = 2; @@ -81,8 +81,8 @@ static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { return (unsigned char const*)pto - (unsigned char const*)pfrom; } -/* Create a buffer. This is creating a temporary file and then - * immediately unlinking it so we do not leave traces in the system. */ +// Create a buffer. This is creating a temporary file and then +// immediately unlinking it so we do not leave traces in the system. int create_buffer(int64_t size) { int fd; std::string file_template = plasma::plasma_config->directory; @@ -108,11 +108,15 @@ int create_buffer(int64_t size) { ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0]; return -1; } + // Immediately unlink the file so we do not leave traces in the system. if (unlink(&file_name[0]) != 0) { ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; return -1; } if (!plasma::plasma_config->hugepages_enabled) { + // Increase the size of the file to the desired size. This seems not to be + // needed for files that are backed by the huge page fs, see also + // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html if (ftruncate(fd, (off_t)size) != 0) { ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; return -1; @@ -123,14 +127,16 @@ int create_buffer(int64_t size) { } void* fake_mmap(size_t size) { - /* Add sizeof(size_t) so that the returned pointer is deliberately not - * page-aligned. This ensures that the segments of memory returned by - * fake_mmap are never contiguous. */ + // Add sizeof(size_t) so that the returned pointer is deliberately not + // page-aligned. This ensures that the segments of memory returned by + // fake_mmap are never contiguous. size += sizeof(size_t); int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; #ifdef __linux__ + // MAP_POPULATE will pre-populate the page tables for this memory region + // which avoids work when accessing the pages later. Only supported on Linux. void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0); #else @@ -145,14 +151,14 @@ void* fake_mmap(size_t size) { return pointer; } - /* Increase dlmalloc's allocation granularity directly. */ + // Increase dlmalloc's allocation granularity directly. mparams.granularity *= GRANULARITY_MULTIPLIER; mmap_record& record = mmap_records[pointer]; record.fd = fd; record.size = size; - /* We lie to dlmalloc about where mapped memory actually lives. */ + // We lie to dlmalloc about where mapped memory actually lives. pointer = pointer_advance(pointer, sizeof(size_t)); ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; return pointer; @@ -166,8 +172,8 @@ int fake_munmap(void* addr, int64_t size) { auto entry = mmap_records.find(addr); if (entry == mmap_records.end() || entry->second.size != size) { - /* Reject requests to munmap that don't directly match previous - * calls to mmap, to prevent dlmalloc from trimming. */ + // Reject requests to munmap that don't directly match previous + // calls to mmap, to prevent dlmalloc from trimming. return -1; } @@ -181,7 +187,7 @@ int fake_munmap(void* addr, int64_t size) { } void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) { - /* TODO(rshin): Implement a more efficient search through mmap_records. */ + // TODO(rshin): Implement a more efficient search through mmap_records. for (const auto& entry : mmap_records) { if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) { *fd = entry.second.fd; diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index dfe446a248f..476002f68c0 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -131,7 +131,9 @@ struct PlasmaStoreInfo { /// store. int64_t memory_capacity; /// Boolean flag indicating whether to start the object store with hugepages - /// support enabled. + /// support enabled. Huge pages are substantially larger than normal memory + /// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce + /// bookkeeping overhead from the OS. bool hugepages_enabled; /// A (platform-dependent) directory where to create the memory-backed file. std::string directory;