From 7d28354fa96699a9498903b3a11a7200d3020926 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 23 Apr 2018 11:19:28 +0200 Subject: [PATCH] ARROW-2489: [Plasma] Fix PlasmaClient ABI variation When compiled with GPU support, the PlasmaClient ABI would differ, leading to a crash in the Python bindings to Plasma. --- cpp/src/plasma/client.cc | 364 ++++++++++++++++++++++++++++++++------- cpp/src/plasma/client.h | 100 +---------- 2 files changed, 309 insertions(+), 155 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 015c9731a8f..733217d0212 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -34,10 +35,14 @@ #include #include #include +#include #include #include +#include #include +#include +#include #include #include "arrow/buffer.h" @@ -66,17 +71,43 @@ namespace plasma { using arrow::MutableBuffer; +typedef struct XXH64_state_s XXH64_state_t; + // Number of threads used for memcopy and hash computations. constexpr int64_t kThreadPoolSize = 8; constexpr int64_t kBytesInMB = 1 << 20; +// Use 100MB as an overestimate of the L3 cache size. +constexpr int64_t kL3CacheSizeBytes = 100000000; + +// ---------------------------------------------------------------------- +// GPU support + +#ifdef PLASMA_GPU +struct GpuProcessHandle { + /// Pointer to CUDA buffer that is backing this GPU object. + std::shared_ptr ptr; + /// Number of client using this GPU object. + int client_count; +}; + +// This is necessary as IPC handles can only be mapped once per process. +// Thus if multiple clients in the same process get the same gpu object, +// they need to access the same mapped CudaBuffer. +static std::unordered_map gpu_object_map; +static std::mutex gpu_mutex; +#endif + +// ---------------------------------------------------------------------- +// PlasmaBuffer + /// A Buffer class that automatically releases the backing plasma object /// when it goes out of scope. -class PlasmaBuffer : public Buffer { +class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { public: ~PlasmaBuffer(); - PlasmaBuffer(PlasmaClient* client, const ObjectID& object_id, + PlasmaBuffer(PlasmaClient::Impl* client, const ObjectID& object_id, const std::shared_ptr& buffer) : Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) { if (buffer->is_mutable()) { @@ -85,11 +116,12 @@ class PlasmaBuffer : public Buffer { } private: - PlasmaClient* client_; + PlasmaClient::Impl* client_; ObjectID object_id_; }; -PlasmaBuffer::~PlasmaBuffer() { ARROW_UNUSED(client_->Release(object_id_)); } +// ---------------------------------------------------------------------- +// PlasmaClient::Impl struct ObjectInUseEntry { /// A count of the number of times this client has called PlasmaClient::Create @@ -105,33 +137,158 @@ struct ObjectInUseEntry { bool is_sealed; }; -#ifdef PLASMA_GPU -struct GpuProcessHandle { - /// Pointer to CUDA buffer that is backing this GPU object. - std::shared_ptr ptr; - /// Number of client using this GPU object. - int client_count; +/// Configuration options for the plasma client. +struct PlasmaClientConfig { + /// Number of release calls we wait until the object is actually released. + /// This allows us to avoid invalidating the cpu cache on workers if objects + /// are reused accross tasks. + size_t release_delay; }; -// This is necessary as IPC handles can only be mapped once per process. -// Thus if multiple clients in the same process get the same gpu object, -// they need to access the same mapped CudaBuffer. -static std::unordered_map gpu_object_map; -static std::mutex gpu_mutex; +struct ClientMmapTableEntry { + /// The result of mmap for this file descriptor. + uint8_t* pointer; + /// The length of the memory-mapped file. + size_t length; + /// The number of objects in this memory-mapped file that are currently being + /// used by the client. When this count reaches zeros, we unmap the file. + int count; +}; + +class PlasmaClient::Impl { + public: + Impl(); + ~Impl(); + + // PlasmaClient method implementations + + Status Connect(const std::string& store_socket_name, + const std::string& manager_socket_name, int release_delay, + int num_retries = -1); + + Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata, + int64_t metadata_size, std::shared_ptr* data, int device_num = 0); + + Status Get(const std::vector& object_ids, int64_t timeout_ms, + std::vector* object_buffers); + + Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, + ObjectBuffer* object_buffers); + + Status Release(const ObjectID& object_id); + + Status Contains(const ObjectID& object_id, bool* has_object); + + Status Abort(const ObjectID& object_id); + + Status Seal(const ObjectID& object_id); + + Status Delete(const ObjectID& object_id); + + Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted); + + Status Hash(const ObjectID& object_id, uint8_t* digest); + + Status Subscribe(int* fd); + + Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size); + + Status Disconnect(); + + Status Fetch(int num_object_ids, const ObjectID* object_ids); + + Status Wait(int64_t num_object_requests, ObjectRequest* object_requests, + int num_ready_objects, int64_t timeout_ms, int* num_objects_ready); + + Status Transfer(const char* addr, int port, const ObjectID& object_id); + + Status Info(const ObjectID& object_id, int* object_status); + + int get_manager_fd() const; + + Status FlushReleaseHistory(); + + bool IsInUse(const ObjectID& object_id); + + private: + /// This is a helper method for unmapping objects for which all references have + /// gone out of scope, either by calling Release or Abort. + /// + /// @param object_id The object ID whose data we should unmap. + Status UnmapObject(const ObjectID& object_id); + + Status PerformRelease(const ObjectID& object_id); + + /// Common helper for Get() variants + Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, + const std::function( + const ObjectID&, const std::shared_ptr&)>& wrap_buffer, + ObjectBuffer* object_buffers); + + uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size); + + uint8_t* lookup_mmapped_file(int store_fd_val); + + void increment_object_count(const ObjectID& object_id, PlasmaObject* object, + bool is_sealed); + + bool compute_object_hash_parallel(XXH64_state_t* hash_state, const unsigned char* data, + int64_t nbytes); + + uint64_t compute_object_hash(const ObjectBuffer& obj_buffer); + + /// File descriptor of the Unix domain socket that connects to the store. + int store_conn_; + /// File descriptor of the Unix domain socket that connects to the manager. + int manager_conn_; + /// Table of dlmalloc buffer files that have been memory mapped so far. This + /// is a hash table mapping a file descriptor to a struct containing the + /// address of the corresponding memory-mapped file. + std::unordered_map mmap_table_; + /// A hash table of the object IDs that are currently being used by this + /// client. + std::unordered_map, UniqueIDHasher> + objects_in_use_; + /// Object IDs of the last few release calls. This is a deque and + /// is used to delay releasing objects to see if they can be reused by + /// subsequent tasks so we do not unneccessarily invalidate cpu caches. + /// TODO(pcm): replace this with a proper lru cache using the size of the L3 + /// cache. + std::deque release_history_; + /// The number of bytes in the combined objects that are held in the release + /// history doubly-linked list. If this is too large then the client starts + /// releasing objects. + int64_t in_use_object_bytes_; + /// Configuration options for the plasma client. + PlasmaClientConfig config_; + /// The amount of memory available to the Plasma store. The client needs this + /// information to make sure that it does not delay in releasing so much + /// memory that the store is unable to evict enough objects to free up space. + int64_t store_capacity_; + /// Threadpool for parallel memcopy and hash computation. + std::vector threadpool_; + +#ifdef PLASMA_GPU + /// Cuda Device Manager. + arrow::gpu::CudaDeviceManager* manager_; #endif +}; + +PlasmaBuffer::~PlasmaBuffer() { ARROW_UNUSED(client_->Release(object_id_)); } -PlasmaClient::PlasmaClient() : threadpool_(kThreadPoolSize) { +PlasmaClient::Impl::Impl() : threadpool_(kThreadPoolSize) { #ifdef PLASMA_GPU CudaDeviceManager::GetInstance(&manager_); #endif } -PlasmaClient::~PlasmaClient() {} +PlasmaClient::Impl::~Impl() {} // If the file descriptor fd has been mmapped in this client process before, // return the pointer that was returned by mmap, otherwise mmap it and store the // pointer in a hash table. -uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) { +uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) { auto entry = mmap_table_.find(store_fd_val); if (entry != mmap_table_.end()) { close(fd); @@ -157,19 +314,19 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size // Get a pointer to a file that we know has been memory mapped in this client // process before. -uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) { +uint8_t* PlasmaClient::Impl::lookup_mmapped_file(int store_fd_val) { auto entry = mmap_table_.find(store_fd_val); ARROW_CHECK(entry != mmap_table_.end()); return entry->second.pointer; } -bool PlasmaClient::IsInUse(const ObjectID& object_id) { +bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) { const auto elem = objects_in_use_.find(object_id); return (elem != objects_in_use_.end()); } -void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObject* object, - bool is_sealed) { +void PlasmaClient::Impl::increment_object_count(const ObjectID& object_id, + PlasmaObject* object, bool is_sealed) { // Increment the count of the object to track the fact that it is being used. // The corresponding decrement should happen in PlasmaClient::Release. auto elem = objects_in_use_.find(object_id); @@ -205,9 +362,9 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec object_entry->count += 1; } -Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, - const uint8_t* metadata, int64_t metadata_size, - std::shared_ptr* data, int device_num) { +Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, + const uint8_t* metadata, int64_t metadata_size, + std::shared_ptr* data, int device_num) { ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK( @@ -271,7 +428,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, return Status::OK(); } -Status PlasmaClient::GetBuffers( +Status PlasmaClient::Impl::GetBuffers( const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, const std::function( const ObjectID&, const std::shared_ptr&)>& wrap_buffer, @@ -397,8 +554,8 @@ Status PlasmaClient::GetBuffers( return Status::OK(); } -Status PlasmaClient::Get(const std::vector& object_ids, int64_t timeout_ms, - std::vector* out) { +Status PlasmaClient::Impl::Get(const std::vector& object_ids, + int64_t timeout_ms, std::vector* out) { const auto wrap_buffer = [=](const ObjectID& object_id, const std::shared_ptr& buffer) { return std::make_shared(this, object_id, buffer); @@ -408,14 +565,14 @@ Status PlasmaClient::Get(const std::vector& object_ids, int64_t timeou return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0]); } -Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, - int64_t timeout_ms, ObjectBuffer* out) { +Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects, + int64_t timeout_ms, ObjectBuffer* out) { const auto wrap_buffer = [](const ObjectID& object_id, const std::shared_ptr& buffer) { return buffer; }; return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out); } -Status PlasmaClient::UnmapObject(const ObjectID& object_id) { +Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) { auto object_entry = objects_in_use_.find(object_id); ARROW_CHECK(object_entry != objects_in_use_.end()); ARROW_CHECK(object_entry->second->count == 0); @@ -459,7 +616,7 @@ Status PlasmaClient::UnmapObject(const ObjectID& object_id) { /// releasing the object when the client is truly done with the object. /// /// @param object_id The object ID to attempt to release. -Status PlasmaClient::PerformRelease(const ObjectID& object_id) { +Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) { // Decrement the count of the number of instances of this object that are // being used by this client. The corresponding increment should have happened // in PlasmaClient::Get. @@ -476,7 +633,7 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) { return Status::OK(); } -Status PlasmaClient::Release(const ObjectID& object_id) { +Status PlasmaClient::Impl::Release(const ObjectID& object_id) { // If the client is already disconnected, ignore release requests. if (store_conn_ < 0) { return Status::OK(); @@ -500,7 +657,7 @@ Status PlasmaClient::Release(const ObjectID& object_id) { return Status::OK(); } -Status PlasmaClient::FlushReleaseHistory() { +Status PlasmaClient::Impl::FlushReleaseHistory() { // If the client is already disconnected, ignore the flush. if (store_conn_ < 0) { return Status::OK(); @@ -515,7 +672,7 @@ Status PlasmaClient::FlushReleaseHistory() { } // This method is used to query whether the plasma store contains an object. -Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { +Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) { // Check if we already have a reference to the object. if (objects_in_use_.count(object_id) > 0) { *has_object = 1; @@ -540,9 +697,9 @@ static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t *hash = XXH64_digest(&hash_state); } -bool PlasmaClient::compute_object_hash_parallel(XXH64_state_t* hash_state, - const unsigned char* data, - int64_t nbytes) { +bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state, + const unsigned char* data, + int64_t nbytes) { // Note that this function will likely be faster if the address of data is // aligned on a 64-byte boundary. const int num_threads = kThreadPoolSize; @@ -576,7 +733,7 @@ bool PlasmaClient::compute_object_hash_parallel(XXH64_state_t* hash_state, return true; } -uint64_t PlasmaClient::compute_object_hash(const ObjectBuffer& obj_buffer) { +uint64_t PlasmaClient::Impl::compute_object_hash(const ObjectBuffer& obj_buffer) { DCHECK(obj_buffer.metadata); DCHECK(obj_buffer.data); XXH64_state_t hash_state; @@ -600,7 +757,7 @@ uint64_t PlasmaClient::compute_object_hash(const ObjectBuffer& obj_buffer) { return XXH64_digest(&hash_state); } -Status PlasmaClient::Seal(const ObjectID& object_id) { +Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { // Make sure this client has a reference to the object before sending the // request to Plasma. auto object_entry = objects_in_use_.find(object_id); @@ -626,7 +783,7 @@ Status PlasmaClient::Seal(const ObjectID& object_id) { return Release(object_id); } -Status PlasmaClient::Abort(const ObjectID& object_id) { +Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { auto object_entry = objects_in_use_.find(object_id); ARROW_CHECK(object_entry != objects_in_use_.end()) << "Plasma client called abort on an object without a reference to it"; @@ -655,7 +812,7 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { return ReadAbortReply(buffer.data(), buffer.size(), &id); } -Status PlasmaClient::Delete(const ObjectID& object_id) { +Status PlasmaClient::Impl::Delete(const ObjectID& object_id) { RETURN_NOT_OK(FlushReleaseHistory()); // If the object is in used, client can't send the remove message. if (objects_in_use_.count(object_id) > 0) { @@ -672,7 +829,7 @@ Status PlasmaClient::Delete(const ObjectID& object_id) { } } -Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { +Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { // Send a request to the store to evict objects. RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes)); // Wait for a response with the number of bytes actually evicted. @@ -682,7 +839,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted); } -Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { +Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) { // Get the plasma object data. We pass in a timeout of 0 to indicate that // the operation should timeout immediately. std::vector object_buffers; @@ -697,7 +854,7 @@ Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { return Status::OK(); } -Status PlasmaClient::Subscribe(int* fd) { +Status PlasmaClient::Impl::Subscribe(int* fd) { int sock[2]; // Create a non-blocking socket pair. This will only be used to send // notifications from the Plasma store to the client. @@ -717,8 +874,8 @@ Status PlasmaClient::Subscribe(int* fd) { return Status::OK(); } -Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_size, - int64_t* metadata_size) { +Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size) { uint8_t* notification = read_message_async(fd); if (notification == NULL) { return Status::IOError("Failed to read object notification from Plasma socket"); @@ -737,9 +894,9 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_ return Status::OK(); } -Status PlasmaClient::Connect(const std::string& store_socket_name, - const std::string& manager_socket_name, int release_delay, - int num_retries) { +Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, + const std::string& manager_socket_name, + int release_delay, int num_retries) { RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_)); if (manager_socket_name != "") { RETURN_NOT_OK( @@ -757,7 +914,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name, return Status::OK(); } -Status PlasmaClient::Disconnect() { +Status PlasmaClient::Impl::Disconnect() { // NOTE: We purposefully do not finish sending release calls for objects in // use, so that we don't duplicate PlasmaClient::Release calls (when handling // a SIGTERM, for example). @@ -773,18 +930,19 @@ Status PlasmaClient::Disconnect() { return Status::OK(); } -Status PlasmaClient::Transfer(const char* address, int port, const ObjectID& object_id) { +Status PlasmaClient::Impl::Transfer(const char* address, int port, + const ObjectID& object_id) { return SendDataRequest(manager_conn_, object_id, address, port); } -Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) { +Status PlasmaClient::Impl::Fetch(int num_object_ids, const ObjectID* object_ids) { ARROW_CHECK(manager_conn_ >= 0); return SendFetchRequest(manager_conn_, object_ids, num_object_ids); } -int PlasmaClient::get_manager_fd() const { return manager_conn_; } +int PlasmaClient::Impl::get_manager_fd() const { return manager_conn_; } -Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { +Status PlasmaClient::Impl::Info(const ObjectID& object_id, int* object_status) { ARROW_CHECK(manager_conn_ >= 0); RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1)); @@ -796,9 +954,9 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { return Status::OK(); } -Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests, - int num_ready_objects, int64_t timeout_ms, - int* num_objects_ready) { +Status PlasmaClient::Impl::Wait(int64_t num_object_requests, + ObjectRequest* object_requests, int num_ready_objects, + int64_t timeout_ms, int* num_objects_ready) { ARROW_CHECK(manager_conn_ >= 0); ARROW_CHECK(num_object_requests > 0); ARROW_CHECK(num_ready_objects > 0); @@ -840,4 +998,94 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req return Status::OK(); } +// ---------------------------------------------------------------------- +// PlasmaClient + +PlasmaClient::PlasmaClient() : impl_(std::make_shared()) {} + +PlasmaClient::~PlasmaClient() {} + +Status PlasmaClient::Connect(const std::string& store_socket_name, + const std::string& manager_socket_name, int release_delay, + int num_retries) { + return impl_->Connect(store_socket_name, manager_socket_name, release_delay, + num_retries); +} + +Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, + const uint8_t* metadata, int64_t metadata_size, + std::shared_ptr* data, int device_num) { + return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num); +} + +Status PlasmaClient::Get(const std::vector& object_ids, int64_t timeout_ms, + std::vector* object_buffers) { + return impl_->Get(object_ids, timeout_ms, object_buffers); +} + +Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, + int64_t timeout_ms, ObjectBuffer* object_buffers) { + return impl_->Get(object_ids, num_objects, timeout_ms, object_buffers); +} + +Status PlasmaClient::Release(const ObjectID& object_id) { + return impl_->Release(object_id); +} + +Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { + return impl_->Contains(object_id, has_object); +} + +Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); } + +Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); } + +Status PlasmaClient::Delete(const ObjectID& object_id) { + return impl_->Delete(object_id); +} + +Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { + return impl_->Evict(num_bytes, num_bytes_evicted); +} + +Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { + return impl_->Hash(object_id, digest); +} + +Status PlasmaClient::Subscribe(int* fd) { return impl_->Subscribe(fd); } + +Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size) { + return impl_->GetNotification(fd, object_id, data_size, metadata_size); +} + +Status PlasmaClient::Disconnect() { return impl_->Disconnect(); } + +Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) { + return impl_->Fetch(num_object_ids, object_ids); +} + +Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests, + int num_ready_objects, int64_t timeout_ms, + int* num_objects_ready) { + return impl_->Wait(num_object_requests, object_requests, num_ready_objects, timeout_ms, + num_objects_ready); +} + +Status PlasmaClient::Transfer(const char* addr, int port, const ObjectID& object_id) { + return impl_->Transfer(addr, port, object_id); +} + +Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { + return impl_->Info(object_id, object_status); +} + +int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); } + +Status PlasmaClient::FlushReleaseHistory() { return impl_->FlushReleaseHistory(); } + +bool PlasmaClient::IsInUse(const ObjectID& object_id) { + return impl_->IsInUse(object_id); +} + } // namespace plasma diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 7e353b27c6b..4221997406d 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -18,15 +18,9 @@ #ifndef PLASMA_CLIENT_H #define PLASMA_CLIENT_H -#include -#include - -#include #include #include #include -#include -#include #include #include "arrow/buffer.h" @@ -34,22 +28,14 @@ #include "arrow/util/macros.h" #include "arrow/util/visibility.h" #include "plasma/common.h" -#ifdef PLASMA_GPU -#include "arrow/gpu/cuda_api.h" -#endif using arrow::Buffer; using arrow::Status; -typedef struct XXH64_state_s XXH64_state_t; - namespace plasma { #define PLASMA_DEFAULT_RELEASE_DELAY 64 -// Use 100MB as an overestimate of the L3 cache size. -constexpr int64_t kL3CacheSizeBytes = 100000000; - /// Object buffer data structure. struct ObjectBuffer { /// The data buffer. @@ -60,32 +46,9 @@ struct ObjectBuffer { int device_num; }; -/// Configuration options for the plasma client. -struct PlasmaClientConfig { - /// Number of release calls we wait until the object is actually released. - /// This allows us to avoid invalidating the cpu cache on workers if objects - /// are reused accross tasks. - size_t release_delay; -}; - -struct ClientMmapTableEntry { - /// The result of mmap for this file descriptor. - uint8_t* pointer; - /// The length of the memory-mapped file. - size_t length; - /// The number of objects in this memory-mapped file that are currently being - /// used by the client. When this count reaches zeros, we unmap the file. - int count; -}; - -struct ObjectInUseEntry; -struct ObjectRequest; -struct PlasmaObject; - class ARROW_EXPORT PlasmaClient { public: PlasmaClient(); - ~PlasmaClient(); /// Connect to the local plasma store and plasma manager. Return @@ -346,76 +309,19 @@ class ARROW_EXPORT PlasmaClient { int get_manager_fd() const; private: + friend class PlasmaBuffer; FRIEND_TEST(TestPlasmaStore, GetTest); FRIEND_TEST(TestPlasmaStore, LegacyGetTest); FRIEND_TEST(TestPlasmaStore, AbortTest); - /// This is a helper method for unmapping objects for which all references have - /// gone out of scope, either by calling Release or Abort. - /// - /// @param object_id The object ID whose data we should unmap. - Status UnmapObject(const ObjectID& object_id); - /// This is a helper method that flushes all pending release calls to the /// store. Status FlushReleaseHistory(); - Status PerformRelease(const ObjectID& object_id); - - /// Common helper for Get() variants - Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, - const std::function( - const ObjectID&, const std::shared_ptr&)>& wrap_buffer, - ObjectBuffer* object_buffers); - bool IsInUse(const ObjectID& object_id); - uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size); - - uint8_t* lookup_mmapped_file(int store_fd_val); - - void increment_object_count(const ObjectID& object_id, PlasmaObject* object, - bool is_sealed); - - bool compute_object_hash_parallel(XXH64_state_t* hash_state, const unsigned char* data, - int64_t nbytes); - - uint64_t compute_object_hash(const ObjectBuffer& obj_buffer); - - /// File descriptor of the Unix domain socket that connects to the store. - int store_conn_; - /// File descriptor of the Unix domain socket that connects to the manager. - int manager_conn_; - /// Table of dlmalloc buffer files that have been memory mapped so far. This - /// is a hash table mapping a file descriptor to a struct containing the - /// address of the corresponding memory-mapped file. - std::unordered_map mmap_table_; - /// A hash table of the object IDs that are currently being used by this - /// client. - std::unordered_map, UniqueIDHasher> - objects_in_use_; - /// Object IDs of the last few release calls. This is a deque and - /// is used to delay releasing objects to see if they can be reused by - /// subsequent tasks so we do not unneccessarily invalidate cpu caches. - /// TODO(pcm): replace this with a proper lru cache using the size of the L3 - /// cache. - std::deque release_history_; - /// The number of bytes in the combined objects that are held in the release - /// history doubly-linked list. If this is too large then the client starts - /// releasing objects. - int64_t in_use_object_bytes_; - /// Configuration options for the plasma client. - PlasmaClientConfig config_; - /// The amount of memory available to the Plasma store. The client needs this - /// information to make sure that it does not delay in releasing so much - /// memory that the store is unable to evict enough objects to free up space. - int64_t store_capacity_; - /// Threadpool for parallel memcopy and hash computation. - std::vector threadpool_; -#ifdef PLASMA_GPU - /// Cuda Device Manager. - arrow::gpu::CudaDeviceManager* manager_; -#endif + class ARROW_NO_EXPORT Impl; + std::shared_ptr impl_; }; } // namespace plasma