diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 9447e5d688d..ce9795d20fc 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -301,6 +301,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; + /// A mutex which protects this class. + std::recursive_mutex client_mutex_; #ifdef PLASMA_CUDA /// Cuda Device Manager. @@ -341,6 +343,8 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { } bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) { + std::lock_guard guard(client_mutex_); + const auto elem = objects_in_use_.find(object_id); return (elem != objects_in_use_.end()); } @@ -384,6 +388,8 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, std::shared_ptr* data, int device_num) { + std::lock_guard guard(client_mutex_); + ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK( @@ -451,8 +457,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id, const std::string& data, const std::string& metadata) { - ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; + std::lock_guard guard(client_mutex_); + ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; // Compute the object hash. static unsigned char digest[kDigestSize]; // CreateAndSeal currently only supports device_num = 0, which corresponds to @@ -608,6 +615,8 @@ Status PlasmaClient::Impl::GetBuffers( Status PlasmaClient::Impl::Get(const std::vector& object_ids, int64_t timeout_ms, std::vector* out) { + std::lock_guard guard(client_mutex_); + const auto wrap_buffer = [=](const ObjectID& object_id, const std::shared_ptr& buffer) { return std::make_shared(shared_from_this(), object_id, buffer); @@ -619,6 +628,8 @@ Status PlasmaClient::Impl::Get(const std::vector& object_ids, Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, ObjectBuffer* out) { + std::lock_guard guard(client_mutex_); + const auto wrap_buffer = [](const ObjectID& object_id, const std::shared_ptr& buffer) { return buffer; }; return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out); @@ -635,6 +646,8 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) { } Status PlasmaClient::Impl::Release(const ObjectID& object_id) { + std::lock_guard guard(client_mutex_); + // If the client is already disconnected, ignore release requests. if (store_conn_ < 0) { return Status::OK(); @@ -672,6 +685,8 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) { // This method is used to query whether the plasma store contains an object. Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) { + std::lock_guard guard(client_mutex_); + // Check if we already have a reference to the object. if (objects_in_use_.count(object_id) > 0) { *has_object = 1; @@ -690,6 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) } Status PlasmaClient::Impl::List(ObjectTable* objects) { + std::lock_guard guard(client_mutex_); RETURN_NOT_OK(SendListRequest(store_conn_)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer)); @@ -768,6 +784,8 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data } Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { + std::lock_guard guard(client_mutex_); + // Make sure this client has a reference to the object before sending the // request to Plasma. auto object_entry = objects_in_use_.find(object_id); @@ -794,6 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { } Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { + std::lock_guard guard(client_mutex_); auto object_entry = objects_in_use_.find(object_id); ARROW_CHECK(object_entry != objects_in_use_.end()) << "Plasma client called abort on an object without a reference to it"; @@ -832,6 +851,8 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { } Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { + std::lock_guard guard(client_mutex_); + std::vector not_in_use_ids; for (auto& object_id : object_ids) { // If the object is in used, skip it. @@ -855,6 +876,8 @@ Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { } Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { + std::lock_guard guard(client_mutex_); + // Send a request to the store to evict objects. RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes)); // Wait for a response with the number of bytes actually evicted. @@ -865,6 +888,8 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) } Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) { + std::lock_guard guard(client_mutex_); + // Get the plasma object data. We pass in a timeout of 0 to indicate that // the operation should timeout immediately. std::vector object_buffers; @@ -880,6 +905,8 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) { } Status PlasmaClient::Impl::Subscribe(int* fd) { + std::lock_guard guard(client_mutex_); + int sock[2]; // Create a non-blocking socket pair. This will only be used to send // notifications from the Plasma store to the client. @@ -902,6 +929,8 @@ Status PlasmaClient::Impl::Subscribe(int* fd) { Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { + std::lock_guard guard(client_mutex_); + auto object_info = flatbuffers::GetRoot(buffer); ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID)); @@ -917,6 +946,8 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { + std::lock_guard guard(client_mutex_); + auto notification = ReadMessageAsync(fd); if (notification == NULL) { return Status::IOError("Failed to read object notification from Plasma socket"); @@ -927,6 +958,8 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, const std::string& manager_socket_name, int release_delay, int num_retries) { + std::lock_guard guard(client_mutex_); + RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_)); if (manager_socket_name != "") { return Status::NotImplemented("plasma manager is no longer supported"); @@ -944,6 +977,8 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, } Status PlasmaClient::Impl::Disconnect() { + std::lock_guard guard(client_mutex_); + // NOTE: We purposefully do not finish sending release calls for objects in // use, so that we don't duplicate PlasmaClient::Release calls (when handling // a SIGTERM, for example).