Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/envoy/mongo/bson.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Field {
STRING = 0x02,
DOCUMENT = 0x03,
ARRAY = 0x04,
BINARY = 0x05,
OBJECT_ID = 0x07,
BOOLEAN = 0x08,
DATETIME = 0x09,
Expand Down Expand Up @@ -58,6 +59,7 @@ class Field {
virtual const std::string& asString() const PURE;
virtual const Document& asDocument() const PURE;
virtual const Document& asArray() const PURE;
virtual const std::string& asBinary() const PURE;
virtual const ObjectId& asObjectId() const PURE;
virtual bool asBoolean() const PURE;
virtual int64_t asDatetime() const PURE;
Expand Down Expand Up @@ -87,6 +89,7 @@ class Document {
virtual DocumentPtr addString(const std::string& key, std::string&& value) PURE;
virtual DocumentPtr addDocument(const std::string& key, DocumentPtr value) PURE;
virtual DocumentPtr addArray(const std::string& key, DocumentPtr value) PURE;
virtual DocumentPtr addBinary(const std::string& key, std::string&& value) PURE;
virtual DocumentPtr addObjectId(const std::string& key, Field::ObjectId&& value) PURE;
virtual DocumentPtr addBoolean(const std::string& key, bool value) PURE;
virtual DocumentPtr addDatetime(const std::string& key, int64_t value) PURE;
Expand Down
40 changes: 39 additions & 1 deletion source/common/mongo/bson_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ std::string BufferHelper::removeString(Buffer::Instance& data) {
return ret;
}

std::string BufferHelper::removeBinary(Buffer::Instance& data) {
// Read out the subtype but do not store it for now.
int32_t length = removeInt32(data);
removeByte(data);
char* start = reinterpret_cast<char*>(data.linearize(length));
std::string ret(start, length);
data.drain(length);
return ret;
}

void BufferHelper::writeCString(Buffer::Instance& data, const std::string& value) {
data.add(value.c_str(), value.size() + 1);
}
Expand All @@ -113,6 +123,14 @@ void BufferHelper::writeString(Buffer::Instance& data, const std::string& value)
data.add(value.c_str(), value.size() + 1);
}

void BufferHelper::writeBinary(Buffer::Instance& data, const std::string& value) {
// Right now we do not actually store the binary subtype and always use zero.
writeInt32(data, value.size());
uint8_t subtype = 0;
Copy link
Copy Markdown

@wgallagher wgallagher Sep 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is writeBinary used? This is modifying data, obviously, so it can't be used to proxy?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for testing currently. We don't emit any BSON ourselves right now.

data.add(&subtype, sizeof(subtype));
data.add(value.c_str(), value.size());
}

int32_t FieldImpl::byteSize() const {
// 1 byte type, cstring key, field.
int32_t total = 1 + key_.size() + 1;
Expand All @@ -134,6 +152,10 @@ int32_t FieldImpl::byteSize() const {
return total + value_.document_value_->byteSize();
}

case Type::BINARY: {
return total + 5 + value_.string_value_.size();
}

case Type::OBJECT_ID: {
return total + sizeof(ObjectId);
}
Expand Down Expand Up @@ -176,6 +198,10 @@ void FieldImpl::encode(Buffer::Instance& output) const {
return value_.document_value_->encode(output);
}

case Type::BINARY: {
return BufferHelper::writeBinary(output, value_.string_value_);
}

case Type::OBJECT_ID: {
return output.add(&value_.object_id_value_[0], value_.object_id_value_.size());
}
Expand Down Expand Up @@ -229,6 +255,10 @@ bool FieldImpl::operator==(const Field& rhs) const {
return asArray() == rhs.asArray();
}

case Type::BINARY: {
return asBinary() == rhs.asBinary();
}

case Type::OBJECT_ID: {
return asObjectId() == rhs.asObjectId();
}
Expand Down Expand Up @@ -271,7 +301,8 @@ std::string FieldImpl::toString() const {
return std::to_string(value_.double_value_);
}

case Type::STRING: {
case Type::STRING:
case Type::BINARY: {
return fmt::format("'{}'", value_.string_value_);
}

Expand Down Expand Up @@ -361,6 +392,13 @@ void DocumentImpl::fromBuffer(Buffer::Instance& data) {
break;
}

case Field::Type::BINARY: {
std::string value = BufferHelper::removeBinary(data);
log_trace("BSON binary: {}", value);
addBinary(key, std::move(value));
break;
}

case Field::Type::OBJECT_ID: {
Field::ObjectId value;
BufferHelper::removeBytes(data, &value[0], value.size());
Expand Down
17 changes: 15 additions & 2 deletions source/common/mongo/bson_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class BufferHelper {
static int32_t removeInt32(Buffer::Instance& data);
static int64_t removeInt64(Buffer::Instance& data);
static std::string removeString(Buffer::Instance& data);
static std::string removeBinary(Buffer::Instance& data);
static void writeCString(Buffer::Instance& data, const std::string& value);
static void writeInt32(Buffer::Instance& data, int32_t value);
static void writeInt64(Buffer::Instance& data, int64_t value);
static void writeDouble(Buffer::Instance& data, double value);
static void writeString(Buffer::Instance& data, const std::string& value);
static void writeBinary(Buffer::Instance& data, const std::string& value);
};

class FieldImpl : public Field {
Expand All @@ -34,7 +36,8 @@ class FieldImpl : public Field {
value_.double_value_ = value;
}

explicit FieldImpl(const std::string& key, std::string&& value) : type_(Type::STRING), key_(key) {
explicit FieldImpl(Type type, const std::string& key, std::string&& value)
: type_(type), key_(key) {
value_.string_value_ = std::move(value);
}

Expand Down Expand Up @@ -86,6 +89,11 @@ class FieldImpl : public Field {
return *value_.document_value_;
}

const std::string& asBinary() const override {
checkType(Type::BINARY);
return value_.string_value_;
}

const ObjectId& asObjectId() const override {
checkType(Type::OBJECT_ID);
return value_.object_id_value_;
Expand Down Expand Up @@ -173,7 +181,7 @@ class DocumentImpl : public Document,
}

DocumentPtr addString(const std::string& key, std::string&& value) override {
fields_.emplace_back(new FieldImpl(key, std::move(value)));
fields_.emplace_back(new FieldImpl(Field::Type::STRING, key, std::move(value)));
return shared_from_this();
}

Expand All @@ -187,6 +195,11 @@ class DocumentImpl : public Document,
return shared_from_this();
}

DocumentPtr addBinary(const std::string& key, std::string&& value) override {
fields_.emplace_back(new FieldImpl(Field::Type::BINARY, key, std::move(value)));
return shared_from_this();
}

DocumentPtr addObjectId(const std::string& key, Field::ObjectId&& value) override {
fields_.emplace_back(new FieldImpl(key, std::move(value)));
return shared_from_this();
Expand Down
51 changes: 25 additions & 26 deletions source/common/mongo/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,43 +83,43 @@ void ProxyFilter::decodeQuery(QueryMessagePtr&& message) {
stats_.op_query_exhaust_.inc();
}

std::string command = MessageUtility::queryCommand(*message);
if (!command.empty()) {
ActiveQueryPtr active_query(new ActiveQuery(*this, *message));
if (!active_query->query_info_.command().empty()) {
// First field key is the operation.
stat_store_.counter(fmt::format("{}cmd.{}.total", stat_prefix_, command)).inc();
stat_store_.counter(fmt::format("{}cmd.{}.total", stat_prefix_,
active_query->query_info_.command())).inc();
} else {
// Normal query, get stats on a per collection basis first.
std::string collection =
MessageUtility::collectionFromFullCollectionName(message->fullCollectionName());
std::string collection_stat_prefix = fmt::format("{}collection.{}", stat_prefix_, collection);
MessageUtility::QueryType query_type = MessageUtility::queryType(*message);
std::string collection_stat_prefix =
fmt::format("{}collection.{}", stat_prefix_, active_query->query_info_.collection());
QueryMessageInfo::QueryType query_type = active_query->query_info_.type();
chargeQueryStats(collection_stat_prefix, query_type);

// Callsite stats if we have it.
std::string callsite = MessageUtility::queryCallingFunction(*message);
if (!callsite.empty()) {
if (!active_query->query_info_.callsite().empty()) {
std::string callsite_stat_prefix =
fmt::format("{}collection.{}.callsite.{}", stat_prefix_, collection, callsite);
fmt::format("{}collection.{}.callsite.{}", stat_prefix_,
active_query->query_info_.collection(), active_query->query_info_.callsite());
chargeQueryStats(callsite_stat_prefix, query_type);
}

// Global stats.
if (query_type == MessageUtility::QueryType::ScatterGet) {
if (query_type == QueryMessageInfo::QueryType::ScatterGet) {
stats_.op_query_scatter_get_.inc();
} else if (query_type == MessageUtility::QueryType::MultiGet) {
} else if (query_type == QueryMessageInfo::QueryType::MultiGet) {
stats_.op_query_multi_get_.inc();
}
}

active_query_list_.emplace_back(new ActiveQuery(*this, std::move(message)));
active_query_list_.emplace_back(std::move(active_query));
}

void ProxyFilter::chargeQueryStats(const std::string& prefix,
MessageUtility::QueryType query_type) {
QueryMessageInfo::QueryType query_type) {
stat_store_.counter(fmt::format("{}.query.total", prefix)).inc();
if (query_type == MessageUtility::QueryType::ScatterGet) {
if (query_type == QueryMessageInfo::QueryType::ScatterGet) {
stat_store_.counter(fmt::format("{}.query.scatter_get", prefix)).inc();
} else if (query_type == MessageUtility::QueryType::MultiGet) {
} else if (query_type == QueryMessageInfo::QueryType::MultiGet) {
stat_store_.counter(fmt::format("{}.query.multi_get", prefix)).inc();
}
}
Expand All @@ -141,26 +141,25 @@ void ProxyFilter::decodeReply(ReplyMessagePtr&& message) {

for (auto i = active_query_list_.begin(); i != active_query_list_.end(); i++) {
ActiveQuery& active_query = **i;
if (active_query.query_->requestId() != message->responseTo()) {
if (active_query.query_info_.requestId() != message->responseTo()) {
continue;
}

std::string command = MessageUtility::queryCommand(*active_query.query_);
if (!command.empty()) {
std::string stat_prefix = fmt::format("{}cmd.{}", stat_prefix_, command);
if (!active_query.query_info_.command().empty()) {
std::string stat_prefix =
fmt::format("{}cmd.{}", stat_prefix_, active_query.query_info_.command());
chargeReplyStats(active_query, stat_prefix, *message);
} else {
// Collection stats first.
std::string collection = MessageUtility::collectionFromFullCollectionName(
active_query.query_->fullCollectionName());
std::string stat_prefix = fmt::format("{}collection.{}.query", stat_prefix_, collection);
std::string stat_prefix =
fmt::format("{}collection.{}.query", stat_prefix_, active_query.query_info_.collection());
chargeReplyStats(active_query, stat_prefix, *message);

// Callsite stats if we have it.
std::string callsite = MessageUtility::queryCallingFunction(*active_query.query_);
if (!callsite.empty()) {
if (!active_query.query_info_.callsite().empty()) {
std::string callsite_stat_prefix =
fmt::format("{}collection.{}.callsite.{}.query", stat_prefix_, collection, callsite);
fmt::format("{}collection.{}.callsite.{}.query", stat_prefix_,
active_query.query_info_.collection(), active_query.query_info_.callsite());
chargeReplyStats(active_query, callsite_stat_prefix, *message);
}
}
Expand Down
8 changes: 4 additions & 4 deletions source/common/mongo/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ class ProxyFilter : public Network::Filter,

private:
struct ActiveQuery {
ActiveQuery(ProxyFilter& parent, QueryMessagePtr&& query)
: parent_(parent), query_(std::move(query)), start_time_(std::chrono::system_clock::now()) {
ActiveQuery(ProxyFilter& parent, const QueryMessage& query)
: parent_(parent), query_info_(query), start_time_(std::chrono::system_clock::now()) {
parent_.stats_.op_query_active_.inc();
}

~ActiveQuery() { parent_.stats_.op_query_active_.dec(); }

ProxyFilter& parent_;
QueryMessagePtr query_;
QueryMessageInfo query_info_;
SystemTime start_time_;
};

Expand All @@ -127,7 +127,7 @@ class ProxyFilter : public Network::Filter,
POOL_TIMER_PREFIX(store, prefix))};
}

void chargeQueryStats(const std::string& prefix, MessageUtility::QueryType query_type);
void chargeQueryStats(const std::string& prefix, QueryMessageInfo::QueryType query_type);
void chargeReplyStats(ActiveQuery& active_query, const std::string& prefix,
const ReplyMessage& message);
void doDecode(Buffer::Instance& buffer);
Expand Down
Loading