Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions ark/api/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <mscclpp/core.hpp>
#include <mscclpp/proxy_channel.hpp>
#include <mscclpp/sm_channel.hpp>
#include <tuple>

#include "ark/data_type.hpp"
#include "ark/model.hpp"
Expand All @@ -24,6 +25,7 @@
#include "gpu/gpu_manager.h"
#include "logging.h"
#include "model/model_buffer.hpp"
#include "model_buffer_manager.hpp"
#include "model/model_data_type.hpp"
#include "model/model_tensor.hpp"
#include "utils/utils_net.hpp"
Expand Down Expand Up @@ -234,8 +236,15 @@ void Executor::Impl::init(const std::string &plan) {
std::to_string(kv.first) + ": " + std::to_string(kv.second) + ", ";
}

codegen_ =
std::make_shared<CodeGenerator>(plan_json_, buffer_id_to_offset_, name_);
ModelBufferManager &buffer_manager = ModelBufferManager::get_instance();

if (!buffer_manager.is_empty()) {
codegen_ = std::make_shared<CodeGenerator>(
plan_json_, buffer_id_to_offset_, name, &buffer_manager);
} else {
codegen_ = std::make_shared<CodeGenerator>(plan_json_,
buffer_id_to_offset_, name);
}

auto gpu_manager = GpuManager::get_instance(gpu_id_);
timer_begin_ = gpu_manager->create_event();
Expand Down Expand Up @@ -367,7 +376,16 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
}
continue;
}
buffer_id_to_offset[buf_info->buffer->id()] = offset;
if (buf_info->buffer->is_external()) {
if (buf_info->buffer->device_id() != gpu_id_) {
ERR(InvalidUsageError,
"PyTorch tensor and model execution are on different GPUs");
}
continue;
} else {
buffer_id_to_offset[buf_info->buffer->id()] = offset;
offset += buf_info->bytes;
}
for (const auto &tag_info : buf_info->buffer->send_tags()) {
remote_rank_to_send_tags_and_offsets[tag_info.first]
.first.push_back(tag_info.second);
Expand All @@ -380,7 +398,6 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
remote_rank_to_recv_tags_and_offsets[tag_info.first]
.second.push_back(offset);
}
offset += buf_info->bytes;
}
total_bytes_ = offset;

Expand Down Expand Up @@ -456,7 +473,11 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
bootstrap->recv(tags.data(), len * sizeof(int), remote_rank, 1);
bootstrap->recv(offsets.data(), len * sizeof(size_t), remote_rank, 2);
for (int i = 0; i < len; ++i) {
buffer_id_to_offset[send_tag_to_buffer_id[tags[i]]] = offsets[i];
if (!buffer_id_to_info[send_tag_to_buffer_id[tags[i]]]
->buffer->is_external()) {
buffer_id_to_offset[send_tag_to_buffer_id[tags[i]]] =
offsets[i];
}
}
}
for (auto &kv : remote_rank_to_recv_tag_to_buffer_id) {
Expand All @@ -472,10 +493,13 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
bootstrap->recv(tags.data(), len * sizeof(int), remote_rank, 4);
bootstrap->recv(offsets.data(), len * sizeof(size_t), remote_rank, 5);
for (int i = 0; i < len; ++i) {
buffer_id_to_offset[recv_tag_to_buffer_id[tags[i]]] = offsets[i];
if (!buffer_id_to_info[recv_tag_to_buffer_id[tags[i]]]
->buffer->is_external()) {
buffer_id_to_offset[recv_tag_to_buffer_id[tags[i]]] =
offsets[i];
}
}
}

return buffer_id_to_offset;
}

Expand Down Expand Up @@ -742,6 +766,11 @@ uintptr_t Executor::Impl::tensor_address(const Tensor tensor) const {
void Executor::Impl::tensor_read(const Tensor tensor, void *data, size_t bytes,
bool is_d2d) const {
GLOG(gpuSetDevice(gpu_id_));
if (tensor.ref()->buffer()->is_external()) {
ERR(InvalidUsageError,
"Reading data from a tensor preallocated by PyTorch is not "
"supported. Use PyTorch's native methods.");
}
size_t tensor_data_bytes =
tensor.shape().nelems() * tensor.data_type().bytes();
if (bytes != tensor_data_bytes) {
Expand Down Expand Up @@ -779,6 +808,11 @@ void Executor::Impl::tensor_read(const Tensor tensor, void *data, size_t bytes,
void Executor::Impl::tensor_write(const Tensor tensor, const void *data,
size_t bytes, bool is_d2d) const {
GLOG(gpuSetDevice(gpu_id_));
if (tensor.ref()->buffer()->is_external()) {
ERR(InvalidUsageError,
"Writing data to a tensor preallocated by PyTorch is not "
"supported. Use PyTorch's native methods.");
}
size_t tensor_data_bytes =
tensor.shape().nelems() * tensor.data_type().bytes();
if (bytes != tensor_data_bytes) {
Expand Down Expand Up @@ -843,7 +877,10 @@ float Executor::stop(int64_t max_spin_count) {

void Executor::barrier() { impl_->barrier(); }

void Executor::destroy() { impl_.reset(nullptr); }
void Executor::destroy() {
ModelBufferManager::get_instance().clear_buffers();
impl_.reset(nullptr);
}

bool Executor::destroyed() const { return impl_.get() == nullptr; }

Expand Down
18 changes: 16 additions & 2 deletions ark/api/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@

#include "ark/tensor.hpp"

#include "model/model_buffer.hpp"
#include "model/model_data_type.hpp"
#include "model/model_tensor.hpp"

namespace ark {

Tensor::Tensor(void* data_ptr, int32_t device_id,
const std::vector<int64_t>& shape,
const DataType& dtype) {
size_t external_data_size = std::accumulate(shape.begin(), shape.end(), 1,
std::multiplies<int64_t>()) *
dtype.bytes();
auto buffer =
std::make_shared<ModelBuffer>(data_ptr, external_data_size, device_id);
auto tensor = std::make_shared<ModelTensor>(dtype.ref(), buffer, Dims(shape),
Dims(shape), Dims(), Dims());
ref_ = tensor;
}

size_t Tensor::id() const {
if (ref_) {
return ref_->id();
Expand Down Expand Up @@ -43,14 +57,14 @@ Dims Tensor::padded_shape() const {
return Dims();
}

const DataType &Tensor::data_type() const {
const DataType& Tensor::data_type() const {
if (ref_) {
return DataType::from_name(ref_->data_type()->type_name());
}
return NONE;
}

std::ostream &operator<<(std::ostream &os, const Tensor &tensor) {
std::ostream& operator<<(std::ostream& os, const Tensor& tensor) {
if (tensor.is_null()) {
os << "null";
} else {
Expand Down
36 changes: 26 additions & 10 deletions ark/codegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "file_io.h"
#include "logging.h"
#include "model/model_buffer.hpp"
#include "model_buffer_manager.hpp"
#include "model/model_data_type.hpp"
#include "model/model_op.hpp"
#include "model/model_tensor.hpp"
Expand Down Expand Up @@ -43,7 +44,7 @@ class CodeGenerator::Impl {
public:
Impl(const PlanJson &plan,
const std::map<size_t, size_t> &buffer_id_to_offset,
const std::string &name);
const std::string &name, ModelBufferManager *buffer_manager);
~Impl() = default;

private:
Expand All @@ -64,6 +65,8 @@ class CodeGenerator::Impl {

std::string sync_process_range(const Range<size_t> &ranges, int state_id);

ModelBufferManager *buffer_manager_;

protected:
friend class CodeGenerator;

Expand All @@ -78,14 +81,18 @@ class CodeGenerator::Impl {

CodeGenerator::Impl::Impl(const PlanJson &plan,
const std::map<size_t, size_t> &buffer_id_to_offset,
const std::string &name)
: buffer_id_to_offset_(buffer_id_to_offset), name_(name) {
const std::string &name,
ModelBufferManager *buffer_manager)
: buffer_id_to_offset_(buffer_id_to_offset),
name_(name),
buffer_manager_(buffer_manager) {
rank_ = plan.at("Rank");
world_size_ = plan.at("WorldSize");
num_procs_ = plan.at("NumProcessors");
num_warps_per_proc_ = plan.at("NumWarpsPerProcessor");

std::stringstream definitions_ss;

for (auto &task_json : plan.at("TaskInfos")) {
definitions_ss << this->def_task(task_json);
}
Expand Down Expand Up @@ -224,11 +231,19 @@ std::string CodeGenerator::Impl::def_task(const Json &task_json) {
auto &arg = impl_args[i];
if (arg.type_name() == "TENSOR") {
auto tns = arg.value<ModelTensorRef>();
size_t buffer_offset =
buffer_id_to_offset_.at(tns->buffer()->id());
size_t offset = buffer_offset + ModelOffset(tns).value();
ss << "(" << tns->data_type()->type_str() << "*)&_buf["
<< offset << "]";
if (tns->buffer()->is_external()) {
void *buf_addr =
ModelBufferManager::get_instance().get_buffer(
tns->buffer()->id());
ss << "(" << tns->data_type()->type_str() << "*)"
<< buf_addr;
} else {
size_t buffer_offset =
buffer_id_to_offset_.at(tns->buffer()->id());
size_t offset = buffer_offset + ModelOffset(tns).value();
ss << "(" << tns->data_type()->type_str() << "*)&_buf["
<< offset << "]";
}
} else if (arg.type_name() == "OFFSET") {
auto moff = arg.value<ModelOffset>();
size_t buffer_offset =
Expand Down Expand Up @@ -431,8 +446,9 @@ std::string CodeGenerator::Impl::sync_process_range(const Range<size_t> &range,

CodeGenerator::CodeGenerator(
const PlanJson &plan, const std::map<size_t, size_t> &buffer_id_to_offset,
const std::string &name)
: impl_(std::make_shared<Impl>(plan, buffer_id_to_offset, name)) {}
const std::string &name, ModelBufferManager *buffer_manager)
: impl_(std::make_shared<Impl>(plan, buffer_id_to_offset, name,
buffer_manager)) {}

std::string CodeGenerator::code() const { return impl_->code_; }

Expand Down
4 changes: 3 additions & 1 deletion ark/codegen.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <memory>
#include <string>

#include "model_buffer_manager.hpp"
#include "model/model_json.hpp"

namespace ark {
Expand All @@ -16,7 +17,8 @@ class CodeGenerator {
public:
CodeGenerator(const PlanJson &plan,
const std::map<size_t, size_t> &buffer_id_to_offset,
const std::string &name = "ark_kernel");
const std::string &name = "ark_kernel",
ModelBufferManager *buffer_manager = nullptr);

~CodeGenerator() = default;

Expand Down
2 changes: 2 additions & 0 deletions ark/include/ark/tensor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Tensor {
Tensor(ModelTensorRef ref) : ref_(ref) {}
Tensor(const Tensor &other) = default;
Tensor &operator=(const Tensor &other) = default;
Tensor(void *data_ptr, int32_t device_id, const std::vector<int64_t> &shape,
const DataType &dtype);

bool operator==(const Tensor &other) const { return ref_ == other.ref_; }
bool operator!=(const Tensor &other) const { return ref_ != other.ref_; }
Expand Down
55 changes: 51 additions & 4 deletions ark/model/model_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
#include "model_buffer.hpp"

#include "logging.h"
#include "model_buffer_manager.hpp"

namespace ark {

ModelBuffer::ModelBuffer(int rank) : rank_(rank) {
static size_t id = 0;
id_ = id++;
}
size_t ModelBuffer::curr_id = 0;

ModelBuffer::ModelBuffer(int rank) : rank_(rank) { id_ = curr_id++; }

ModelBuffer::ModelBuffer(size_t id, int rank,
const std::vector<TagInfo> &send_tags,
Expand All @@ -24,6 +24,23 @@ ModelBuffer::ModelBuffer(size_t id, int rank,
}
}

ModelBuffer::ModelBuffer(void *data, size_t size, int32_t device_id)
: rank_(-1),
external_data_(data),
external_data_size_(size),
device_id_(device_id),
is_external_(true) {
id_ = curr_id++;
}

ModelBuffer::ModelBuffer(size_t id, void *data, size_t size, int32_t device_id)
: id_(id),
rank_(-1),
external_data_(data),
external_data_size_(size),
device_id_(device_id),
is_external_(true) {}

void ModelBuffer::tag_send(int remote_rank, int tag) {
send_tags_.insert(TagInfo{remote_rank, tag});
}
Expand All @@ -46,6 +63,14 @@ Json ModelBuffer::serialize() const {
}
j["SendTags"] = send_tags;
j["RecvTags"] = recv_tags;
j["IsExternal"] = is_external_;
if (is_external_) {
ModelBufferManager::get_instance().register_buffer(id_, external_data_,
external_data_size_);
j["ExternalDataSize"] = external_data_size_;
j["DeviceId"] = device_id_;
}
// external_data_ptr_ is not included in JSON
return j;
}

Expand All @@ -62,6 +87,28 @@ std::shared_ptr<ModelBuffer> ModelBuffer::deserialize(const Json &serialized) {
} else if (!serialized.contains("RecvTags")) {
ERR(InvalidUsageError,
"ModelBuffer deserialization failed: missing RecvTags");
} else if (!serialized.contains("IsExternal")) {
ERR(InvalidUsageError,
"ModelBuffer deserialization failed: missing IsExternal");
}
if (serialized["IsExternal"]) {
if (!serialized.contains("ExternalDataSize")) {
ERR(InvalidUsageError,
"ModelBuffer deserialization failed: missing ExternalDataSize");
} else if (!serialized.contains("DeviceId")) {
ERR(InvalidUsageError,
"ModelBuffer deserialization failed: missing DeviceId");
}
void *data_ptr =
ModelBufferManager::get_instance().get_buffer(serialized["Id"]);
if (!data_ptr) {
ERR(InvalidUsageError,
"ModelBuffer deserialization failed: external buffer not found "
"in BufferManager");
}
return std::make_shared<ModelBuffer>(serialized["Id"], data_ptr,
serialized["ExternalDataSize"],
serialized["DeviceId"]);
}
return std::make_shared<ModelBuffer>(serialized["Id"], serialized["Rank"],
serialized["SendTags"],
Expand Down
15 changes: 15 additions & 0 deletions ark/model/model_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class ModelBuffer {
ModelBuffer(size_t id, int rank, const std::vector<TagInfo> &send_tags,
const std::vector<TagInfo> &recv_tags);

// externally managed buffer
ModelBuffer(void *data, size_t size, int32_t device_id);
ModelBuffer(size_t id, void *data, size_t size, int32_t device_id);

size_t id() const { return id_; }

int rank() const { return rank_; }
Expand All @@ -44,11 +48,22 @@ class ModelBuffer {

static std::shared_ptr<ModelBuffer> deserialize(const Json &serialized);

// external buffer management
size_t external_data_size() const { return external_data_size_; }
void *external_data() const { return external_data_; }
int32_t device_id() const { return device_id_; }
bool is_external() const { return is_external_; }

private:
static size_t curr_id;
size_t id_;
int rank_;
std::set<TagInfo> send_tags_;
std::set<TagInfo> recv_tags_;
void *external_data_ = nullptr;
size_t external_data_size_ = 0;
int32_t device_id_;
bool is_external_ = false;
};

} // namespace ark
Expand Down
Loading