Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class ARROW_EXPORT PlasmaClient {
/// object is present, has been sealed and not used by another client. Otherwise,
/// it is a no operation.
///
/// @todo We may want to allow the deletion of objects that are not present or
/// \todo We may want to allow the deletion of objects that are not present or
/// haven't been sealed.
///
/// \param object_id The ID of the object to delete.
Expand Down
46 changes: 23 additions & 23 deletions cpp/src/plasma/eviction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ class EvictionPolicy {
public:
/// Construct an eviction policy.
///
/// @param store_info Information about the Plasma store that is exposed
/// \param store_info Information about the Plasma store that is exposed
/// to the eviction policy.
/// @param max_size Max size in bytes total of objects to store.
/// \param max_size Max size in bytes total of objects to store.
explicit EvictionPolicy(PlasmaStoreInfo* store_info, int64_t max_size);

/// Destroy an eviction policy.
Expand All @@ -108,66 +108,66 @@ class EvictionPolicy {
/// store calls begin_object_access, we can remove the object from the LRU
/// cache.
///
/// @param object_id The object ID of the object that was created.
/// @param client The pointer to the client.
/// @param is_create Whether we are creating a new object (vs reading an object).
/// \param object_id The object ID of the object that was created.
/// \param client The pointer to the client.
/// \param is_create Whether we are creating a new object (vs reading an object).
virtual void ObjectCreated(const ObjectID& object_id, Client* client, bool is_create);

/// Set quota for a client.
///
/// @param client The pointer to the client.
/// @param output_memory_quota Set the quota for this client. This can only be
/// \param client The pointer to the client.
/// \param output_memory_quota Set the quota for this client. This can only be
/// called once per client. This is effectively the equivalent of giving
/// the client its own LRU cache instance. The memory for this is taken
/// out of the capacity of the global LRU cache for the client lifetime.
///
/// @return True if enough space can be reserved for the given client quota.
/// \return True if enough space can be reserved for the given client quota.
virtual bool SetClientQuota(Client* client, int64_t output_memory_quota);

/// Determine what objects need to be evicted to enforce the given client's quota.
///
/// @param client The pointer to the client creating the object.
/// @param size The size of the object to create.
/// @param is_create Whether we are creating a new object (vs reading an object).
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// \param client The pointer to the client creating the object.
/// \param size The size of the object to create.
/// \param is_create Whether we are creating a new object (vs reading an object).
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
///
/// @return True if enough space could be freed and false otherwise.
/// \return True if enough space could be freed and false otherwise.
virtual bool EnforcePerClientQuota(Client* client, int64_t size, bool is_create,
std::vector<ObjectID>* objects_to_evict);

/// Called to clean up any resources allocated by this client. This merges any
/// per-client LRU queue created by SetClientQuota into the global LRU queue.
///
/// @param client The pointer to the client.
/// \param client The pointer to the client.
virtual void ClientDisconnected(Client* client);

/// This method will be called when the Plasma store needs more space, perhaps
/// to create a new object. When this method is called, the eviction
/// policy will assume that the objects chosen to be evicted will in fact be
/// evicted from the Plasma store by the caller.
///
/// @param size The size in bytes of the new object, including both data and
/// \param size The size in bytes of the new object, including both data and
/// metadata.
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// @return True if enough space can be freed and false otherwise.
/// \return True if enough space can be freed and false otherwise.
virtual bool RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict);

/// This method will be called whenever an unused object in the Plasma store
/// starts to be used. When this method is called, the eviction policy will
/// assume that the objects chosen to be evicted will in fact be evicted from
/// the Plasma store by the caller.
///
/// @param object_id The ID of the object that is now being used.
/// \param object_id The ID of the object that is now being used.
virtual void BeginObjectAccess(const ObjectID& object_id);

/// This method will be called whenever an object in the Plasma store that was
/// being used is no longer being used. When this method is called, the
/// eviction policy will assume that the objects chosen to be evicted will in
/// fact be evicted from the Plasma store by the caller.
///
/// @param object_id The ID of the object that is no longer being used.
/// \param object_id The ID of the object that is no longer being used.
virtual void EndObjectAccess(const ObjectID& object_id);

/// Choose some objects to evict from the Plasma store. When this method is
Expand All @@ -177,16 +177,16 @@ class EvictionPolicy {
/// @note This method is not part of the API. It is exposed in the header file
/// only for testing.
///
/// @param num_bytes_required The number of bytes of space to try to free up.
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// \param num_bytes_required The number of bytes of space to try to free up.
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// @return The total number of bytes of space chosen to be evicted.
/// \return The total number of bytes of space chosen to be evicted.
virtual int64_t ChooseObjectsToEvict(int64_t num_bytes_required,
std::vector<ObjectID>* objects_to_evict);

/// This method will be called when an object is going to be removed
///
/// @param object_id The ID of the object that is now being used.
/// \param object_id The ID of the object that is now being used.
virtual void RemoveObject(const ObjectID& object_id);

virtual void RefreshObjects(const std::vector<ObjectID>& object_ids);
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/plasma/fling.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len);

// Send a file descriptor over a unix domain socket.
//
// @param conn Unix domain socket to send the file descriptor over.
// @param fd File descriptor to send over.
// @return Status code which is < 0 on failure.
// \param conn Unix domain socket to send the file descriptor over.
// \param fd File descriptor to send over.
// \return Status code which is < 0 on failure.
int send_fd(int conn, int fd);

// Receive a file descriptor over a unix domain socket.
//
// @param conn Unix domain socket to receive the file descriptor from.
// @return File descriptor or a value < 0 on failure.
// \param conn Unix domain socket to receive the file descriptor from.
// \return File descriptor or a value < 0 on failure.
int recv_fd(int conn);
4 changes: 2 additions & 2 deletions cpp/src/plasma/malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ void GetMallocMapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offse

/// Get the mmap size corresponding to a specific file descriptor.
///
/// @param fd The file descriptor to look up.
/// @return The size of the corresponding memory-mapped file.
/// \param fd The file descriptor to look up.
/// \return The size of the corresponding memory-mapped file.
int64_t GetMmapSize(int fd);

struct MmapRecord {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/plasma/plasma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ int WarnIfSigpipe(int status, int client_sock) {
* of this buffer are the length of the remaining message and the
* remaining message is a serialized version of the object info.
*
* @param object_info The object info to be serialized
* @return The object info buffer. It is the caller's responsibility to free
* \param object_info The object info to be serialized
* \return The object info buffer. It is the caller's responsibility to free
* this buffer with "delete" after it has been used.
*/
std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(fb::ObjectInfoT* object_info) {
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ struct PlasmaStoreInfo {
/// Get an entry from the object table and return NULL if the object_id
/// is not present.
///
/// @param store_info The PlasmaStoreInfo that contains the object table.
/// @param object_id The object_id of the entry we are looking for.
/// @return The entry associated with the object_id or NULL if the object_id
/// \param store_info The PlasmaStoreInfo that contains the object table.
/// \param object_id The object_id of the entry we are looking for.
/// \return The entry associated with the object_id or NULL if the object_id
/// is not present.
ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
const ObjectID& object_id);
Expand All @@ -161,11 +161,11 @@ ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
/// have not, then we should get a SIGPIPE. If we write to a TCP socket that
/// isn't connected yet, then we should get an ECONNRESET.
///
/// @param status The status to check. If it is less less than zero, we will
/// \param status The status to check. If it is less less than zero, we will
/// print a warning.
/// @param client_sock The client socket. This is just used to print some extra
/// \param client_sock The client socket. This is just used to print some extra
/// information.
/// @return The errno set.
/// \return The errno set.
int WarnIfSigpipe(int status, int client_sock);

std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(flatbuf::ObjectInfoT* object_info);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/plasma/quota_aware_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class QuotaAwarePolicy : public EvictionPolicy {
public:
/// Construct a quota-aware eviction policy.
///
/// @param store_info Information about the Plasma store that is exposed
/// \param store_info Information about the Plasma store that is exposed
/// to the eviction policy.
/// @param max_size Max size in bytes total of objects to store.
/// \param max_size Max size in bytes total of objects to store.
explicit QuotaAwarePolicy(PlasmaStoreInfo* store_info, int64_t max_size);
void ObjectCreated(const ObjectID& object_id, Client* client, bool is_create) override;
bool SetClientQuota(Client* client, int64_t output_memory_quota) override;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,8 @@ void PlasmaStore::DisconnectClient(int client_fd) {
/// invalidated, which is why we return a valid iterator to the next client to
/// be used in PushNotification.
///
/// @param it Iterator that points to the client to send the notification to.
/// @return Iterator pointing to the next client.
/// \param it Iterator that points to the client to send the notification to.
/// \return Iterator pointing to the next client.
PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications(
PlasmaStore::NotificationMap::iterator it) {
int client_fd = it->first;
Expand Down
54 changes: 27 additions & 27 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,17 @@ class PlasmaStore {
/// Create a new object. The client must do a call to release_object to tell
/// the store when it is done with the object.
///
/// @param object_id Object ID of the object to be created.
/// @param data_size Size in bytes of the object to be created.
/// @param metadata_size Size in bytes of the object metadata.
/// @param device_num The number of the device where the object is being
/// \param object_id Object ID of the object to be created.
/// \param data_size Size in bytes of the object to be created.
/// \param metadata_size Size in bytes of the object metadata.
/// \param device_num The number of the device where the object is being
/// created.
/// device_num = 0 corresponds to the host,
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
/// @param client The client that created the object.
/// @param result The object that has been created.
/// @return One of the following error codes:
/// \param client The client that created the object.
/// \param result The object that has been created.
/// \return One of the following error codes:
/// - PlasmaError::OK, if the object was created successfully.
/// - PlasmaError::ObjectExists, if an object with this ID is already
/// present in the store. In this case, the client should not call
Expand All @@ -96,24 +96,24 @@ class PlasmaStore {
/// Abort a created but unsealed object. If the client is not the
/// creator, then the abort will fail.
///
/// @param object_id Object ID of the object to be aborted.
/// @param client The client who created the object. If this does not
/// \param object_id Object ID of the object to be aborted.
/// \param client The client who created the object. If this does not
/// match the creator of the object, then the abort will fail.
/// @return 1 if the abort succeeds, else 0.
/// \return 1 if the abort succeeds, else 0.
int AbortObject(const ObjectID& object_id, Client* client);

/// Delete a specific object by object_id that have been created in the hash table.
///
/// @param object_id Object ID of the object to be deleted.
/// @return One of the following error codes:
/// \param object_id Object ID of the object to be deleted.
/// \return One of the following error codes:
/// - PlasmaError::OK, if the object was delete successfully.
/// - PlasmaError::ObjectNonexistent, if ths object isn't existed.
/// - PlasmaError::ObjectInUse, if the object is in use.
PlasmaError DeleteObject(ObjectID& object_id);

/// Evict objects returned by the eviction policy.
///
/// @param object_ids Object IDs of the objects to be evicted.
/// \param object_ids Object IDs of the objects to be evicted.
void EvictObjects(const std::vector<ObjectID>& object_ids);

/// Process a get request from a client. This method assumes that we will
Expand All @@ -124,47 +124,47 @@ class PlasmaStore {
/// For each object, the client must do a call to release_object to tell the
/// store when it is done with the object.
///
/// @param client The client making this request.
/// @param object_ids Object IDs of the objects to be gotten.
/// @param timeout_ms The timeout for the get request in milliseconds.
/// \param client The client making this request.
/// \param object_ids Object IDs of the objects to be gotten.
/// \param timeout_ms The timeout for the get request in milliseconds.
void ProcessGetRequest(Client* client, const std::vector<ObjectID>& object_ids,
int64_t timeout_ms);

/// Seal a vector of objects. The objects are now immutable and can be accessed with
/// get.
///
/// @param object_ids The vector of Object IDs of the objects to be sealed.
/// @param digests The vector of digests of the objects. This is used to tell if two
/// \param object_ids The vector of Object IDs of the objects to be sealed.
/// \param digests The vector of digests of the objects. This is used to tell if two
/// objects with the same object ID are the same.
void SealObjects(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& digests);

/// Check if the plasma store contains an object:
///
/// @param object_id Object ID that will be checked.
/// @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
/// \param object_id Object ID that will be checked.
/// \return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
/// not
ObjectStatus ContainsObject(const ObjectID& object_id);

/// Record the fact that a particular client is no longer using an object.
///
/// @param object_id The object ID of the object that is being released.
/// @param client The client making this request.
/// \param object_id The object ID of the object that is being released.
/// \param client The client making this request.
void ReleaseObject(const ObjectID& object_id, Client* client);

/// Subscribe a file descriptor to updates about new sealed objects.
///
/// @param client The client making this request.
/// \param client The client making this request.
void SubscribeToUpdates(Client* client);

/// Connect a new client to the PlasmaStore.
///
/// @param listener_sock The socket that is listening to incoming connections.
/// \param listener_sock The socket that is listening to incoming connections.
void ConnectClient(int listener_sock);

/// Disconnect a client from the PlasmaStore.
///
/// @param client_fd The client file descriptor that is disconnected.
/// \param client_fd The client file descriptor that is disconnected.
void DisconnectClient(int client_fd);

NotificationMap::iterator SendNotifications(NotificationMap::iterator it);
Expand All @@ -183,12 +183,12 @@ class PlasmaStore {

/// Remove a GetRequest and clean up the relevant data structures.
///
/// @param get_request The GetRequest to remove.
/// \param get_request The GetRequest to remove.
void RemoveGetRequest(GetRequest* get_request);

/// Remove all of the GetRequests for a given client.
///
/// @param client The client whose GetRequests should be removed.
/// \param client The client whose GetRequests should be removed.
void RemoveGetRequestsForClient(Client* client);

void ReturnFromGet(GetRequest* get_req);
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/plasma/test/serialization_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ using arrow::internal::TemporaryDir;
/**
* Seek to the beginning of a file and read a message from it.
*
* @param fd File descriptor of the file.
* @param message_type Message type that we expect in the file.
* \param fd File descriptor of the file.
* \param message_type Message type that we expect in the file.
*
* @return Pointer to the content of the message. Needs to be freed by the
* \return Pointer to the content of the message. Needs to be freed by the
* caller.
*/
std::vector<uint8_t> read_message_from_file(int fd, MessageType message_type) {
Expand Down