Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 73 additions & 60 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <gflags/gflags.h>

#include "arrow/status.h"

#include "arrow/util/config.h"

#include "plasma/common.h"
Expand Down Expand Up @@ -115,8 +116,8 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)

Client::Client(int fd) : fd(fd), notification_fd(-1) {}

PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled,
const std::string& socket_name,
PlasmaStore::PlasmaStore(EventLoop* loop, const std::string& directory,
bool hugepages_enabled, const std::string& socket_name,
std::shared_ptr<ExternalStore> external_store)
: loop_(loop),
eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()),
Expand Down Expand Up @@ -1147,8 +1148,8 @@ class PlasmaStoreRunner {
public:
PlasmaStoreRunner() {}

void Start(char* socket_name, std::string directory, bool hugepages_enabled,
std::shared_ptr<ExternalStore> external_store) {
void Start(const std::string& socket_name, std::string directory,
bool hugepages_enabled, std::shared_ptr<ExternalStore> external_store) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, socket_name,
Expand All @@ -1159,15 +1160,14 @@ class PlasmaStoreRunner {
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = plasma::PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
auto prealloc_memory = PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t);
void* pointer = PlasmaAllocator::Memalign(kBlockSize, prealloc_memory);
ARROW_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
plasma::PlasmaAllocator::Free(
pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
PlasmaAllocator::Free(pointer, prealloc_memory);

int socket = BindIpcSock(socket_name, true);
int socket = BindIpcSock(socket_name.c_str(), true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);

Expand Down Expand Up @@ -1201,8 +1201,64 @@ void HandleSignal(int signal) {
}
}

void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled,
std::shared_ptr<ExternalStore> external_store) {
void StartServer(std::string socket_name, int64_t system_memory,
std::string plasma_directory, bool hugepages_enabled,
std::string external_store_endpoint) {
if (plasma_directory.empty()) {
#ifdef __linux__
plasma_directory = "/dev/shm";
#else
plasma_directory = "/tmp";
#endif
}
ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
if (!hugepages_enabled) {
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
// Keep some safety margin for allocator fragmentation.
shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
ARROW_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
system_memory = shm_mem_avail;
}
} else {
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
// Set system memory capacity
PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000 << "GB of memory.";

// Get external store
std::shared_ptr<ExternalStore> external_store{nullptr};
if (!external_store_endpoint.empty()) {
std::string name;
ARROW_CHECK_OK(ExternalStores::ExtractStoreName(external_store_endpoint, &name));
external_store = ExternalStores::GetStore(name);
if (external_store == nullptr) {
ARROW_LOG(FATAL) << "No such external store \"" << name << "\"";
}
ARROW_LOG(DEBUG) << "connecting to external store...";
ARROW_CHECK_OK(external_store->Connect(external_store_endpoint));
}

// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
Expand Down Expand Up @@ -1241,11 +1297,11 @@ DEFINE_string(m, "", "amount of memory in bytes to use for Plasma store, require
int main(int argc, char* argv[]) {
ArrowLog::StartArrowLog(argv[0], ArrowLogLevel::ARROW_INFO);
ArrowLog::InstallFailureSignalHandler();
std::string socket_name;

gflags::SetUsageMessage("Shared-memory server for Arrow data.\nUsage: ");
gflags::SetVersionString(ARROW_VERSION_STRING);

char* socket_name = nullptr;
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
std::string external_store_endpoint;
Expand All @@ -1258,7 +1314,7 @@ int main(int argc, char* argv[]) {
hugepages_enabled = FLAGS_h;
if (!FLAGS_s.empty()) {
// We only check below if socket_name is null, so don't set it if the flag was empty.
socket_name = const_cast<char*>(FLAGS_s.c_str());
socket_name = FLAGS_s;
}

if (!FLAGS_m.empty()) {
Expand All @@ -1276,13 +1332,13 @@ int main(int argc, char* argv[]) {
}

// Sanity check command line options.
if (socket_name == nullptr && system_memory == -1) {
if (socket_name.empty() && system_memory == -1) {
// Nicer error message for the case where the user ran the program without
// any of the required command-line switches.
plasma::ExitWithUsageError(
"please specify socket for incoming connections with -s, "
"and the amount of memory (in bytes) to use with -m");
} else if (socket_name == nullptr) {
} else if (socket_name.empty()) {
plasma::ExitWithUsageError("please specify socket for incoming connections with -s");
} else if (system_memory == -1) {
plasma::ExitWithUsageError(
Expand All @@ -1298,52 +1354,9 @@ int main(int argc, char* argv[]) {
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");

#ifdef __linux__
if (!hugepages_enabled) {
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
// Keep some safety margin for allocator fragmentation.
shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
ARROW_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
system_memory = shm_mem_avail;
}
} else {
plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GiB
}
#endif

// Get external store
std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
if (!external_store_endpoint.empty()) {
std::string name;
ARROW_CHECK_OK(
plasma::ExternalStores::ExtractStoreName(external_store_endpoint, &name));
external_store = plasma::ExternalStores::GetStore(name);
if (external_store == nullptr) {
std::ostringstream error_msg;
error_msg << "no such external store \"" << name << "\"";
plasma::ExitWithUsageError(error_msg.str().c_str());
}
ARROW_LOG(DEBUG) << "connecting to external store...";
ARROW_CHECK_OK(external_store->Connect(external_store_endpoint));
}

ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store);
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
external_store_endpoint);
plasma::g_runner->Shutdown();
plasma::g_runner = nullptr;

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class PlasmaStore {
public:
using NotificationMap = std::unordered_map<int, NotificationQueue>;

// TODO: PascalCase PlasmaStore methods.
PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled,
PlasmaStore(EventLoop* loop, const std::string& directory, bool hugepages_enabled,
const std::string& socket_name,
std::shared_ptr<ExternalStore> external_store);

Expand Down
Loading