Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ cdef class CoreWorker:
owner_address=*)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[CObjectID]* contained_ids)
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)

Expand Down
23 changes: 15 additions & 8 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ cdef execute_task(
const c_vector[CObjectID] &c_arg_reference_ids,
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns):
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[CObjectID] *contained_id):

worker = ray.worker.global_worker
manager = worker.function_actor_manager
Expand Down Expand Up @@ -563,7 +564,7 @@ cdef execute_task(
# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
core_worker.store_task_outputs(
worker, outputs, c_return_ids, returns)
worker, outputs, c_return_ids, returns, contained_id)
except Exception as error:
# If the debugger is enabled, drop into the remote pdb here.
if "RAY_PDB" in os.environ:
Expand All @@ -582,7 +583,7 @@ cdef execute_task(
for _ in range(c_return_ids.size()):
errors.append(failure_object)
core_worker.store_task_outputs(
worker, errors, c_return_ids, returns)
worker, errors, c_return_ids, returns, contained_id)
ray._private.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
Expand Down Expand Up @@ -619,6 +620,7 @@ cdef CRayStatus task_execution_handler(
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[CObjectID] *contained_id,
shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes) nogil:
with gil:
try:
Expand All @@ -628,7 +630,7 @@ cdef CRayStatus task_execution_handler(
# it does, that indicates that there was an internal error.
execute_task(task_type, task_name, ray_function, c_resources,
c_args, c_arg_reference_ids, c_return_ids,
debugger_breakpoint, returns)
debugger_breakpoint, returns, contained_id)
except Exception as e:
sys_exit = SystemExit()
if isinstance(e, RayActorError) and \
Expand Down Expand Up @@ -1633,17 +1635,22 @@ cdef class CoreWorker:

cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns):
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[CObjectID]* contained_id_out):
cdef:
CObjectID return_id
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID]* contained_id_ptr
c_vector[CObjectID] contained_id
c_vector[CObjectID] return_ids_vector

if return_ids.size() == 0:
return

if contained_id_out == NULL:
contained_id_ptr = &contained_id
else:
contained_id_ptr = contained_id_out
n_returns = len(outputs)
returns.resize(n_returns)
for i in range(n_returns):
Expand All @@ -1661,13 +1668,13 @@ cdef class CoreWorker:
# Reset debugging context of this worker.
ray.worker.global_worker.debugger_get_breakpoint = b""
metadata = string_to_buffer(metadata_str)
contained_id = ObjectRefsToVector(
contained_id_ptr[0] = ObjectRefsToVector(
serialized_object.contained_object_refs)

with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().AllocateReturnObject(
return_id, data_size, metadata, contained_id,
return_id, data_size, metadata, contained_id_ptr[0],
&returns[0][i]))

if returns[0][i].get() != NULL:
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[CObjectID] &return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[CObjectID] *return_contained_ids,
shared_ptr[LocalMemoryBuffer]
&creation_task_exception_pb_bytes) nogil
) task_execution_callback
Expand Down
16 changes: 16 additions & 0 deletions python/ray/tests/test_basic_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@
logger = logging.getLogger(__name__)


def test_object_transfer(shutdown_only):
ray.init()

@ray.remote
class Test:
def gen(self):
r = ray.put(b"a" * 10 * 1024 * 1024)
return [r]

actor = Test.remote()
v = actor.gen.remote()
ray.wait([v])
ray.kill(actor)
assert ray.get(ray.get(v)[0]) == b"a" * 10 * 1024 * 1024


def test_auto_global_gc(shutdown_only):
# 100MB
ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,8 @@ RAY_CONFIG(bool, gcs_task_scheduling_enabled,
getenv("RAY_GCS_TASK_SCHEDULING_ENABLED") != nullptr &&
getenv("RAY_GCS_TASK_SCHEDULING_ENABLED") == std::string("true"))

RAY_CONFIG(bool, ownership_transfer_enabled,
getenv("RAY_TRANSFER_OWNERSHIP") != nullptr &&
getenv("RAY_TRANSFER_OWNERSHIP") == std::string("1"))

RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024)
92 changes: 85 additions & 7 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,15 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// Initialize task receivers.
if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) {
RAY_CHECK(options_.task_execution_callback != nullptr);
auto execute_task =
std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5);
auto object_transfer =
std::bind(&CoreWorker::ShareOwnershipInternal, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
direct_task_receiver_ = std::make_unique<CoreWorkerDirectTaskReceiver>(
worker_context_, task_execution_service_, execute_task,
[this] { return local_raylet_client_->TaskDone(); });
[this] { return local_raylet_client_->TaskDone(); }, object_transfer);
}

// Initialize raylet client.
Expand Down Expand Up @@ -554,7 +557,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
"CoreWorker.ReconstructObject");
};
task_manager_.reset(new TaskManager(
memory_store_, reference_counter_,
memory_store_, reference_counter_, rpc_address_,
/* retry_task_callback= */
[this](TaskSpecification &spec, bool delay) {
if (delay) {
Expand Down Expand Up @@ -2069,6 +2072,7 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects,
std::vector<ObjectID> *contained_ids,
ReferenceCounter::ReferenceTableProto *borrowed_refs) {
RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString();
task_queue_length_ -= 1;
Expand Down Expand Up @@ -2126,7 +2130,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
status = options_.task_execution_callback(
task_type, task_spec.GetName(), func,
task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids,
return_ids, task_spec.GetDebuggerBreakpoint(), return_objects,
return_ids, task_spec.GetDebuggerBreakpoint(), return_objects, contained_ids,
creation_task_exception_pb_bytes);

// Get the reference counts for any IDs that we borrowed during this task and
Expand Down Expand Up @@ -2217,7 +2221,8 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
}
auto old_id = GetActorId();
SetActorId(actor_id);
RAY_UNUSED(ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs));
RAY_UNUSED(
ExecuteTask(task_spec, resource_ids, &return_objects, nullptr, &borrowed_refs));
SetActorId(old_id);
}

Expand Down Expand Up @@ -2824,6 +2829,79 @@ void CoreWorker::HandleRunOnUtilWorker(const rpc::RunOnUtilWorkerRequest &reques
}
}

void CoreWorker::ShareOwnershipInternal(
const rpc::Address &to_addr, const std::vector<ObjectID> &ids,
std::function<void(google::protobuf::RepeatedPtrField<rpc::SharedObjectInfo>)> cb) {
std::vector<std::pair<NodeID, ObjectID>> node_id_mapping;
for (auto id : ids) {
if (!reference_counter_->OwnedByUs(id)) {
continue;
}
auto node_id = reference_counter_->GetObjectPinnedLocation(id);
if (node_id) {
node_id_mapping.emplace_back(*node_id, id);
} else {
// TODO (yic) Should wait until object is ready.
RAY_LOG(DEBUG) << "We only take care of put objects right now";
continue;
}
}

if (node_id_mapping.empty()) {
cb({});
} else {
auto in_flight = std::make_shared<size_t>(node_id_mapping.size());
auto successed_ids = std::make_shared<absl::flat_hash_set<ObjectID>>();
for (auto &v : node_id_mapping) {
auto node_info = gcs_client_->Nodes().Get(v.first);
auto grpc_client = rpc::NodeManagerWorkerClient::make(
node_info->node_manager_address(), node_info->node_manager_port(),
*client_call_manager_);
auto raylet_client = std::make_shared<raylet::RayletClient>(std::move(grpc_client));
raylet_client->PinObjectIDs(
to_addr, {v.second},
[this, to_addr, in_flight, successed_ids, id = v.second, node_id = v.first, cb](
auto &status, auto &pin_reply) mutable {
if (status.ok()) {
successed_ids->insert(id);
}
// TODO (yic): better with a barrier
if (--*in_flight == 0) {
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> results;
bool exception = false;
plasma_store_provider_->Get(*successed_ids, -1, worker_context_, &results,
&exception);
RAY_CHECK(!exception) << "Failed to get object from store";
google::protobuf::RepeatedPtrField<rpc::SharedObjectInfo> transferred_objs;
for (auto &result : results) {
RAY_CHECK(result.second->IsInPlasmaError())
<< "Inline objects are shared by passing value";
auto obj = transferred_objs.Add();
obj->set_object_id(result.first.Binary());
obj->set_object_size(result.second->GetSize());
obj->set_pinned_at_node(node_id.Binary());
}
cb(std::move(transferred_objs));
}
});
}
}
}

void CoreWorker::HandleShareOwnership(const rpc::ShareOwnershipRequest &request,
rpc::ShareOwnershipReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::vector<ObjectID> ids;
for (const auto &id : request.object_ids()) {
ids.push_back(ObjectID::FromBinary(id));
}
const auto &addr = request.new_owner_address();
ShareOwnershipInternal(addr, ids, [send_reply_callback, reply](auto ids) {
reply->mutable_shared_objs()->Swap(&ids);
send_reply_callback(Status::OK(), nullptr, nullptr);
});
}

void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct CoreWorkerOptions {
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results,
std::vector<ObjectID> *return_contained_ids,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes)>;

CoreWorkerOptions()
Expand Down Expand Up @@ -992,6 +993,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleShareOwnership(const rpc::ShareOwnershipRequest &request,
rpc::ShareOwnershipReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

///
/// Public methods related to async actor call. This should only be used when
/// the actor is (1) direct actor and (2) using asyncio mode.
Expand Down Expand Up @@ -1023,6 +1028,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
bool IsExiting() const;

private:
void ShareOwnershipInternal(
const rpc::Address &to_addr, const std::vector<ObjectID> &ids,
std::function<void(google::protobuf::RepeatedPtrField<rpc::SharedObjectInfo>)> cb);

void SetCurrentTaskId(const TaskID &task_id);

void SetActorId(const ActorID &actor_id);
Expand Down Expand Up @@ -1089,6 +1098,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
Status ExecuteTask(const TaskSpecification &task_spec,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects,
std::vector<ObjectID> *contained_ids,
ReferenceCounter::ReferenceTableProto *borrowed_refs);

/// Execute a local mode task (runs normal ExecuteTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<ObjectID> *return_contained_ids,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb) {
JNIEnv *env = GetJNIEnv();
RAY_CHECK(java_task_executor);
Expand Down Expand Up @@ -169,6 +170,11 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
return_objects[i]->HasData() ? return_objects[i]->GetData()->Size() : 0;
auto &metadata = return_objects[i]->GetMetadata();
auto &contained_object_id = return_objects[i]->GetNestedIds();
if (return_contained_ids != nullptr) {
return_contained_ids->insert(return_contained_ids->end(),
contained_object_id.begin(),
contained_object_id.end());
}
auto result_ptr = &(*results)[0];

RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
Expand Down
24 changes: 23 additions & 1 deletion src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,16 @@ void ReferenceCounter::CleanupBorrowersOnRefRemoved(
DeleteReferenceInternal(it, nullptr);
}

void ReferenceCounter::RemoveBorrower(const ObjectID &object_id,
const rpc::Address &address) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
RAY_CHECK(it != object_id_refs_.end()) << object_id;
if (it->second.borrowers.erase(address)) {
DeleteReferenceInternal(it, nullptr);
}
}

void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it,
const rpc::WorkerAddress &addr,
const ObjectID &contained_in_id) {
Expand Down Expand Up @@ -986,7 +996,7 @@ bool ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id,
}

absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations(
const ObjectID &object_id) {
const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
Expand All @@ -997,6 +1007,18 @@ absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations
return it->second.locations;
}

absl::optional<NodeID> ReferenceCounter::GetObjectPinnedLocation(
const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to get the object locations for an object " << object_id
<< " that doesn't exist in the reference table";
return absl::nullopt;
}
return it->second.pinned_at_raylet_id;
}

size_t ReferenceCounter::GetObjectSize(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
Expand Down
8 changes: 7 additions & 1 deletion src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[in] object_id The object that we were borrowing.
void HandleRefRemoved(const ObjectID &object_id) EXCLUSIVE_LOCKS_REQUIRED(mutex_);

void RemoveBorrower(const ObjectID &object_id, const rpc::Address &address)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

/// Returns the total number of ObjectIDs currently in scope.
size_t NumObjectIDsInScope() const LOCKS_EXCLUDED(mutex_);

Expand Down Expand Up @@ -404,7 +407,10 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \return The nodes that have the object if the reference exists, empty optional
/// otherwise.
absl::optional<absl::flat_hash_set<NodeID>> GetObjectLocations(
const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);
const ObjectID &object_id) LOCKS_EXCLUDED(mutex_) const;

absl::optional<NodeID> GetObjectPinnedLocation(const ObjectID &object_id)
LOCKS_EXCLUDED(mutex_) const;

/// Subscribe to object location changes that are more recent than the given version.
/// The provided callback will be invoked when new locations become available.
Expand Down
Loading