Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
#include <Win32_Interop/win32_types.h>
#endif

#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
Expand Down Expand Up @@ -107,7 +105,7 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
public:
~PlasmaBuffer();

PlasmaBuffer(PlasmaClient::Impl* client, const ObjectID& object_id,
PlasmaBuffer(std::shared_ptr<PlasmaClient::Impl> client, const ObjectID& object_id,
const std::shared_ptr<Buffer>& buffer)
: Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) {
if (buffer->is_mutable()) {
Expand All @@ -116,7 +114,7 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
}

private:
PlasmaClient::Impl* client_;
std::shared_ptr<PlasmaClient::Impl> client_;
ObjectID object_id_;
};

Expand Down Expand Up @@ -155,7 +153,7 @@ struct ClientMmapTableEntry {
int count;
};

class PlasmaClient::Impl {
class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
public:
Impl();
~Impl();
Expand Down Expand Up @@ -558,7 +556,7 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
int64_t timeout_ms, std::vector<ObjectBuffer>* out) {
const auto wrap_buffer = [=](const ObjectID& object_id,
const std::shared_ptr<Buffer>& buffer) {
return std::make_shared<PlasmaBuffer>(this, object_id, buffer);
return std::make_shared<PlasmaBuffer>(shared_from_this(), object_id, buffer);
};
const size_t num_objects = object_ids.size();
*out = std::vector<ObjectBuffer>(num_objects);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/plasma/format/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ table PlasmaStatusRequest {

enum ObjectStatus:int {
// Object is stored in the local Plasma Store.
Local = 1,
Local,
// Object is stored on a remote Plasma store, and it is not stored on the
// local Plasma Store.
Remote,
Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,19 @@ def test_use_huge_pages():
use_hugepages=True) as (plasma_store_name, p):
plasma_client = plasma.connect(plasma_store_name, "", 64)
create_object(plasma_client, 100000000)


# This is checking to make sure plasma_clients cannot be destroyed
# before all the PlasmaBuffers that have handles to them are
# destroyed, see ARROW-2448.
@pytest.mark.plasma
def test_plasma_client_sharing():
import pyarrow.plasma as plasma

with start_plasma_store() as (plasma_store_name, p):
plasma_client = plasma.connect(plasma_store_name, "", 64)
object_id = plasma_client.put(np.zeros(3))
buf = plasma_client.get(object_id)
del plasma_client
assert (buf == np.zeros(3)).all()
del buf # This segfaulted pre ARROW-2448.