Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,10 +512,12 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_
}

Status PlasmaClient::Connect(const std::string& store_socket_name,
const std::string& manager_socket_name, int release_delay) {
store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
const std::string& manager_socket_name, int release_delay,
int num_retries) {
RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
if (manager_socket_name != "") {
manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
RETURN_NOT_OK(
ConnectIpcSocketRetry(manager_socket_name, num_retries, -1, &manager_conn_));
} else {
manager_conn_ = -1;
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ class ARROW_EXPORT PlasmaClient {
/// function will not connect to a manager.
/// @param release_delay Number of released objects that are kept around
/// and not evicted to avoid too many munmaps.
/// @param num_retries number of attempts to connect to IPC socket, default 50
/// @return The return status.
Status Connect(const std::string& store_socket_name,
const std::string& manager_socket_name, int release_delay);
const std::string& manager_socket_name, int release_delay,
int num_retries = -1);

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
Expand Down
30 changes: 21 additions & 9 deletions cpp/src/plasma/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

#include "plasma/io.h"

#include <cstdint>
#include <sstream>

#include "arrow/status.h"

#include "plasma/common.h"

using arrow::Status;
Expand All @@ -29,6 +34,8 @@ using arrow::Status;
#define NUM_CONNECT_ATTEMPTS 50
#define CONNECT_TIMEOUT_MS 100

namespace plasma {

Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
ssize_t nbytes = 0;
size_t bytesleft = length;
Expand Down Expand Up @@ -140,8 +147,8 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
return socket_fd;
}

int connect_ipc_sock_retry(const std::string& pathname, int num_retries,
int64_t timeout) {
Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
int64_t timeout, int* fd) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
Expand All @@ -150,23 +157,26 @@ int connect_ipc_sock_retry(const std::string& pathname, int num_retries,
timeout = CONNECT_TIMEOUT_MS;
}

int fd = -1;
*fd = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
fd = connect_ipc_sock(pathname);
if (fd >= 0) {
*fd = connect_ipc_sock(pathname);
if (*fd >= 0) {
break;
}
if (num_attempts == 0) {
ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname;
ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
<< ", retrying " << num_retries << " times";
}
/* Sleep for timeout milliseconds. */
usleep(static_cast<int>(timeout * 1000));
}
/* If we could not connect to the socket, exit. */
if (fd == -1) {
ARROW_LOG(FATAL) << "Could not connect to socket " << pathname;
if (*fd == -1) {
std::stringstream ss;
ss << "Could not connect to socket " << pathname;
return Status::IOError(ss.str());
}
return fd;
return Status::OK();
}

int connect_ipc_sock(const std::string& pathname) {
Expand Down Expand Up @@ -224,3 +234,5 @@ uint8_t* read_message_async(int sock) {
}
return message;
}

} // namespace plasma
17 changes: 12 additions & 5 deletions cpp/src/plasma/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@
#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
#define DISCONNECT_CLIENT 0

arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
namespace plasma {

arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
using arrow::Status;

arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
Status WriteBytes(int fd, uint8_t* cursor, size_t length);

arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);

Status ReadBytes(int fd, uint8_t* cursor, size_t length);

Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);

int bind_ipc_sock(const std::string& pathname, bool shall_listen);

int connect_ipc_sock(const std::string& pathname);

int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout);
Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
int64_t timeout, int* fd);

int AcceptClient(int socket_fd);

uint8_t* read_message_async(int sock);

} // namespace plasma

#endif // PLASMA_IO_H
5 changes: 4 additions & 1 deletion python/manylinux1/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ for PYTHON in ${PYTHON_VERSIONS}; do
echo "=== (${PYTHON}) Testing manylinux1 wheel ==="
source /venv-test-${PYTHON}/bin/activate
pip install repaired_wheels/*.whl
py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow

# ARROW-1264; for some reason the test case added causes a segfault inside
# the Docker container when writing and error message to stderr
py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow -v -s --disable-plasma
deactivate

mv repaired_wheels/*.whl /io/dist
Expand Down
60 changes: 41 additions & 19 deletions python/pyarrow/plasma.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ cdef extern from "plasma/client.h" nogil:
CPlasmaClient()

CStatus Connect(const c_string& store_socket_name,
const c_string& manager_socket_name, int release_delay)
const c_string& manager_socket_name,
int release_delay, int num_retries)

CStatus Create(const CUniqueID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
Expand Down Expand Up @@ -98,10 +99,13 @@ cdef extern from "plasma/client.h" nogil:

CStatus Fetch(int num_object_ids, const CUniqueID* object_ids)

CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests,
int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
CStatus Wait(int64_t num_object_requests,
CObjectRequest* object_requests,
int num_ready_objects, int64_t timeout_ms,
int* num_objects_ready);

CStatus Transfer(const char* addr, int port, const CUniqueID& object_id)
CStatus Transfer(const char* addr, int port,
const CUniqueID& object_id)


cdef extern from "plasma/client.h" nogil:
Expand Down Expand Up @@ -247,7 +251,8 @@ cdef class PlasmaClient:
def manager_socket_name(self):
return self.manager_socket_name.decode()

def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""):
def create(self, ObjectID object_id, int64_t data_size,
c_string metadata=b""):
"""
Create a new buffer in the PlasmaStore for a particular object ID.

Expand Down Expand Up @@ -439,7 +444,8 @@ cdef class PlasmaClient:
"""
cdef c_string addr = address.encode()
with nogil:
check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data))
check_status(self.client.get()
.Transfer(addr.c_str(), port, object_id.data))

def fetch(self, object_ids):
"""
Expand All @@ -457,7 +463,8 @@ cdef class PlasmaClient:
with nogil:
check_status(self.client.get().Fetch(ids.size(), ids.data()))

def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1):
def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT,
int num_returns=1):
"""
Wait until num_returns objects in object_ids are ready.
Currently, the object ID arguments to wait must be unique.
Expand All @@ -483,24 +490,31 @@ cdef class PlasmaClient:
if len(object_ids) != len(set(object_ids)):
raise Exception("Wait requires a list of unique object IDs.")
cdef int64_t num_object_requests = len(object_ids)
cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests)
cdef c_vector[CObjectRequest] object_requests = (
c_vector[CObjectRequest](num_object_requests))
cdef int num_objects_ready = 0
cdef ObjectID object_id
for i, object_id in enumerate(object_ids):
object_requests[i].object_id = object_id.data
object_requests[i].type = PLASMA_QUERY_ANYWHERE
with nogil:
check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready))
check_status(self.client.get().Wait(num_object_requests,
object_requests.data(),
num_returns, timeout,
&num_objects_ready))
cdef int num_to_return = min(num_objects_ready, num_returns);
ready_ids = []
waiting_ids = set(object_ids)
cdef int num_returned = 0
for i in range(len(object_ids)):
if num_returned == num_to_return:
break
if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote:
ready_ids.append(ObjectID(object_requests[i].object_id.binary()))
waiting_ids.discard(ObjectID(object_requests[i].object_id.binary()))
if (object_requests[i].status == ObjectStatusLocal or
object_requests[i].status == ObjectStatusRemote):
ready_ids.append(
ObjectID(object_requests[i].object_id.binary()))
waiting_ids.discard(
ObjectID(object_requests[i].object_id.binary()))
num_returned += 1
return ready_ids, list(waiting_ids)

Expand All @@ -526,10 +540,11 @@ cdef class PlasmaClient:
cdef int64_t data_size
cdef int64_t metadata_size
with nogil:
check_status(self.client.get().GetNotification(self.notification_fd,
&object_id.data,
&data_size,
&metadata_size))
check_status(self.client.get()
.GetNotification(self.notification_fd,
&object_id.data,
&data_size,
&metadata_size))
return object_id, data_size, metadata_size

def to_capsule(self):
Expand All @@ -542,7 +557,9 @@ cdef class PlasmaClient:
with nogil:
check_status(self.client.get().Disconnect())

def connect(store_socket_name, manager_socket_name, int release_delay):

def connect(store_socket_name, manager_socket_name, int release_delay,
int num_retries=-1):
"""
Return a new PlasmaClient that is connected a plasma store and
optionally a manager.
Expand All @@ -556,11 +573,16 @@ def connect(store_socket_name, manager_socket_name, int release_delay):
release_delay : int
The maximum number of objects that the client will keep and
delay releasing (for caching reasons).
num_retries : int, default -1
Number of times tor ty to connect to plasma store. Default value of -1
uses the default (50)
"""
cdef PlasmaClient result = PlasmaClient()
result.store_socket_name = store_socket_name.encode()
result.manager_socket_name = manager_socket_name.encode()
with nogil:
check_status(result.client.get().Connect(result.store_socket_name,
result.manager_socket_name, release_delay))
check_status(result.client.get()
.Connect(result.store_socket_name,
result.manager_socket_name,
release_delay, num_retries))
return result
9 changes: 8 additions & 1 deletion python/pyarrow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def pytest_addoption(parser):
default=defaults[group],
help=('Enable the {0} test group'.format(group)))

for group in groups:
parser.addoption('--disable-{0}'.format(group), action='store_true',
default=False,
help=('Disable the {0} test group'.format(group)))

for group in groups:
parser.addoption('--only-{0}'.format(group), action='store_true',
default=False,
Expand All @@ -62,12 +67,14 @@ def pytest_runtest_setup(item):

for group in groups:
only_flag = '--only-{0}'.format(group)
disable_flag = '--disable-{0}'.format(group)
flag = '--{0}'.format(group)

if item.config.getoption(only_flag):
only_set = True
elif getattr(item.obj, group, None):
if not item.config.getoption(flag):
if (item.config.getoption(disable_flag) or
not item.config.getoption(flag)):
skip('{0} NOT enabled'.format(flag))

if only_set:
Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ def teardown_method(self, test_method):
else:
self.p.kill()

def test_connection_failure_raises_exception(self):
import pyarrow.plasma as plasma
# ARROW-1264
with pytest.raises(IOError):
plasma.connect('unknown-store-name', '', 0, 1)

def test_create(self):
# Create an object id string.
object_id = random_object_id()
Expand Down