From 0752ecd08ae44995530600438d89837d326692a2 Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 11 Dec 2020 10:47:37 -0700 Subject: [PATCH 01/18] Elasticsearch exporter with basic tests --- exporters/CMakeLists.txt | 1 + exporters/elasticsearch/BUILD | 28 +++ exporters/elasticsearch/CMakeLists.txt | 14 ++ .../exporters/elasticsearch/es_log_exporter.h | 100 +++++++++ .../elasticsearch/src/es_log_exporter.cc | 196 ++++++++++++++++++ .../test/es_log_exporter_test.cc | 58 ++++++ 6 files changed, 397 insertions(+) create mode 100644 exporters/elasticsearch/BUILD create mode 100644 exporters/elasticsearch/CMakeLists.txt create mode 100644 exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h create mode 100644 exporters/elasticsearch/src/es_log_exporter.cc create mode 100644 exporters/elasticsearch/test/es_log_exporter_test.cc diff --git a/exporters/CMakeLists.txt b/exporters/CMakeLists.txt index 83bb91c6c4..05ff2c714c 100644 --- a/exporters/CMakeLists.txt +++ b/exporters/CMakeLists.txt @@ -4,6 +4,7 @@ endif() add_subdirectory(ostream) add_subdirectory(memory) +add_subdirectory(elasticsearch) if(WITH_PROMETHEUS) add_subdirectory(prometheus) diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD new file mode 100644 index 0000000000..8077eb22b1 --- /dev/null +++ b/exporters/elasticsearch/BUILD @@ -0,0 +1,28 @@ +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "es_log_exporter", + srcs = [ + "src/es_log_exporter.cc", + ], + hdrs = [ + "include/opentelemetry/exporters/elasticsearch/es_log_exporter.h", + "include/opentelemetry/exporters/elasticsearch/json.hpp", + ], + strip_include_prefix = "include", + deps = [ + "//sdk/src/logs", + "//ext:headers", + "@github_nlohmann_json//:json", + "@curl", + ], +) + +cc_test( + name = "es_log_exporter_test", + srcs = ["test/es_log_exporter_test.cc"], + deps = [ + ":es_log_exporter", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/exporters/elasticsearch/CMakeLists.txt b/exporters/elasticsearch/CMakeLists.txt new file mode 100644 index 0000000000..82faf1da6d --- /dev/null +++ b/exporters/elasticsearch/CMakeLists.txt @@ -0,0 +1,14 @@ +include_directories(include) + +add_library(opentelemetry_exporter_elasticsearch_logs src/es_log_exporter.cc) + +if(BUILD_TESTING) + add_executable(es_log_exporter_test test/es_log_exporter_test.cc) + + target_link_libraries( + es_log_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} + opentelemetry_exporter_elasticsearch_logs) + + gtest_add_tests(TARGET es_log_exporter_test TEST_PREFIX exporter. TEST_LIST + es_log_exporter_test) +endif() # BUILD_TESTING diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h new file mode 100644 index 0000000000..df52785a5c --- /dev/null +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -0,0 +1,100 @@ +#pragma once + +#include "opentelemetry/logs/log_record.h" +#include "opentelemetry/nostd/type_traits.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/ext//http/client/curl//http_client_curl.h" + +#include "opentelemetry/version.h" + +#include +#include +#include +#include +#include "opentelemetry/exporters/elasticsearch/json.hpp" + +namespace nostd = opentelemetry::nostd; +namespace sdklogs = opentelemetry::sdk::logs; +using json = nlohmann::json; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace logs +{ +/** + * Struct to hold Elasticsearch exporter configuration options. + */ +struct ElasticsearchExporterOptions +{ + // Configuration options to establish Elasticsearch connection + std::string host_; + int port_; + std::string index_; + + // Maximum time to wait for response after sending request to Elasticsearch + int response_timeout_; + + // Whether to print the status of the exporter in the console + bool console_debug_; + + /** + * + * @param host The host of the Elasticsearch instance + * @param port The port of the Elasticsearch instance + * @param index The index/shard that the logs will be written to + * @param response_timeout The maximum time the exporter should wait after sending a request to Elasticsearch + * @param console_debug Print the status of the exporter methods in the console + */ + ElasticsearchExporterOptions(std::string host = "localhost", int port = 9200, std::string index = "logs/_doc?pretty", int response_timeout = 30, bool console_debug = false) : + host_{host}, + port_{port}, + index_{index}, + response_timeout_{response_timeout}, + console_debug_{console_debug} + {} +}; + +/** + * The ElasticsearchLogExporter exports logs through an ostream to JSON format + */ +class ElasticsearchLogExporter final : public sdklogs::LogExporter +{ +public: + /** + * Create an ElasticsearchLogExporter with default exporter options. + */ + ElasticsearchLogExporter(); + + /** + * Create an ElasticsearchLogExporter with user specified options. + * @param options An object containing the user's configuration options. + */ + ElasticsearchLogExporter(const ElasticsearchExporterOptions &options); + + /** + * Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a + * timeout specified from the options passed from the constructor. + * @param records A list of log records to send to Elasticsearch. + */ + sdklogs::ExportResult Export(const nostd::span> + &records) noexcept override; + + /** + * Shutdown this exporter. + * @param timeout The maximum time to wait for the shutdown method to return + */ + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; +private: + bool isShutdown_ = false; + + // Configuration options for the exporter + ElasticsearchExporterOptions options_; + + // Connection related variables + opentelemetry::ext::http::client::curl::SessionManager session_manager_; + std::shared_ptr session_; +}; +} // namespace logs +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc new file mode 100644 index 0000000000..968b222ae4 --- /dev/null +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -0,0 +1,196 @@ +#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" + +#include + +namespace nostd = opentelemetry::nostd; +namespace sdklogs = opentelemetry::sdk::logs; +namespace http_client = opentelemetry::ext::http::client; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace logs +{ +/** + * This class handles the response message from the Elasticsearch request + */ +class ResponseHandler : public http_client::EventHandler +{ +public: + /** + * Automatically called when the response is received, store the body into a string and notify any threads blocked on this result + */ + void OnResponse(http_client::Response &response) noexcept override + { + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + + // Set the response_received_ flag to true and notify any threads waiting on this result + response_received_ = true; + cv_.notify_all(); + } + + /** + * A method the user calls to block their thread until the response is received, or the timeout is exceeded. + */ + bool waitForResponse(unsigned int timeoutSec = 1) + { + std::mutex mutex_; + std::unique_lock lk(mutex_); + cv_.wait_for(lk, std::chrono::milliseconds(1000 * timeoutSec)); + return response_received_; + } + + /** + * Returns the body of the response + */ + std::string GetResponseBody() + { + if(!response_received_) + return "No response"; + + return body_; + } + + // Virtual method definition that isn't used + void OnEvent(http_client::SessionState state, + opentelemetry::nostd::string_view reason) noexcept override + {} + + +private: + // Define a mutex and condition variable + std::condition_variable cv_; + + // Whether the response from Elasticsearch has been received + bool response_received_ = false; + + // A string to store the response body + std::string body_ = ""; +}; + +ElasticsearchLogExporter::ElasticsearchLogExporter() : + options_(ElasticsearchExporterOptions()) {} + +ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options) : + options_{options} +{ + // Create a connection to the ElasticSearch instance + // TODO: Figure out why not working + // session_ = session_manager_.CreateSession(options_.host_, options_.port_); +} + + + +sdklogs::ExportResult ElasticsearchLogExporter::Export( + const nostd::span> &records) noexcept +{ + // Return failure if this exporter has been shutdown + if (isShutdown_) + { + return sdklogs::ExportResult::kFailure; + } + + // Create a json array to store all the JSON log records + json logs = json::array(); + + for (auto &record : records) + { + // Convert the log record to a JSON object + json log; + log["timestamp"] = record->timestamp.time_since_epoch().count(); + log["severity"] = record->severity; + log["name"] = record->name; + log["body"] = record->body; + + char trace_buf[32]; + record->trace_id.ToLowerBase16(trace_buf); + log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); + + char span_buf[16]; + record->span_id.ToLowerBase16(span_buf); + log["spanid"] = std::string(span_buf, sizeof(span_buf)); + + char flag_buf[2]; + record->trace_flag.ToLowerBase16(flag_buf); + log["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); + + // TODO: Write resource and attributes into JSON + // log["resource"] = record->resource; + // log["attributes"] = record->attributes; + logs.emplace_back(log); + } + + // Create a connection to the ElasticSearch instance + auto session = session_manager_.CreateSession(options_.host_, options_.port_); + auto request = session->CreateRequest(); + + // Populate the request with headers and methods + request->SetUri(options_.index_ + "/_bulk?pretty"); + request->SetMethod(http_client::Method::Post); + request->AddHeader("Content-Type", "application/json"); + + // Add the request body + std::string body = ""; + for(int i = 0; i < logs.size(); i++) + { + // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified in URI + body += "{\"index\" : {}}\n"; + + // Add the context of the Log Record + body += logs[i].dump() + "\n"; + } + std::vector body_vec(body.begin(), body.end()); + request->SetBody(body_vec); + + // Send the request + std::unique_ptr handler(new ResponseHandler()); + session->SendRequest(*handler); + + // Wait for the response to be received + if(options_.console_debug_) + std::cout << "waiting for response from Elasticsearch (timeout = " + << options_.response_timeout_ << " seconds)" << std::endl; + bool receivedResponse = handler->waitForResponse(options_.response_timeout_); + + // If the response was never received + if(!receivedResponse) + { + // TODO: retry logic + + if(options_.console_debug_) + std::cout << "Request exceeded timeout, aborting..." << std::endl; + + return sdklogs::ExportResult::kFailure; + } + + // Parse the response output to determine if the request wasen't successful + std::string responseBody = handler->GetResponseBody(); + std::cout << responseBody << std::endl; + if(responseBody.find("\"failed\" : 0") == std::string::npos) + { + if(options_.console_debug_) + { + std::cout << "Logs were not written to Elasticsearch correctly, response body:" << std::endl; + std::cout << responseBody << std::endl; + } + + // TODO: Retry logic + return sdklogs::ExportResult::kFailure; + } + + return sdklogs::ExportResult::kSuccess; +} + +bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept +{ + //isShutdown_ = true; + + //session_->FinishSession(); + + return true; +} + +} // namespace logs +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc new file mode 100644 index 0000000000..4097e0e163 --- /dev/null +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -0,0 +1,58 @@ +#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" +#include "opentelemetry/logs/log_record.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/logs/provider.h" +#include "opentelemetry/sdk/logs/logger_provider.h" +#include "opentelemetry/sdk/logs/simple_log_processor.h" + +#include +#include + +namespace sdklogs = opentelemetry::sdk::logs; +namespace logs_api = opentelemetry::logs; +namespace nostd = opentelemetry::nostd; +namespace logs_exporter = opentelemetry::exporter::logs; + +// Attempt to write a log to an invalid host/port, test that the timeout works properly +TEST(ElasticsearchLogsExporter, InvalidEndpoint) { + // Create options for the elasticsearch exporter + logs_exporter::ElasticsearchExporterOptions options("randominvalidhost", 3000); + options.response_timeout_ = 10; // Wait 10 seconds to receive a response + + // Create an elasticsearch exporter + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); + + // Create a log record + auto record = std::unique_ptr(new logs_api::LogRecord()); + record->name = "Timeout Log"; + + // Write the log record to the exporter, and time the duration + nostd::span> batch(&record, 1); + auto t1 = std::chrono::high_resolution_clock::now(); + auto result = exporter->Export(batch); + auto t2 = std::chrono::high_resolution_clock::now(); + + // Ensure the timeout is within the range of the timeout specified ([10, 10 + 1] seconds) + auto duration = std::chrono::duration_cast(t2 - t1).count(); + ASSERT_TRUE((duration >= options.response_timeout_) && (duration < options.response_timeout_ + 1)); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +} + +// Test that when the exporter is shutdown, any call to Export should return failure +TEST(ElasticsearchLogsExporter, Shutdown) { + // Create an elasticsearch exporter and immediately shut it down + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); + exporter->Shutdown(); + + // Write a log to the shutdown exporter + auto record = std::unique_ptr(new logs_api::LogRecord()); + nostd::span> batch(&record, 1); + auto result = exporter->Export(batch); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +} From 405cc817a057bd494c2d1694595be0cf20021c0c Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 11 Dec 2020 10:55:54 -0700 Subject: [PATCH 02/18] Removed local dependancy on nlohmann:json --- exporters/elasticsearch/BUILD | 1 - .../opentelemetry/exporters/elasticsearch/es_log_exporter.h | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD index 8077eb22b1..7b247dfc7f 100644 --- a/exporters/elasticsearch/BUILD +++ b/exporters/elasticsearch/BUILD @@ -7,7 +7,6 @@ cc_library( ], hdrs = [ "include/opentelemetry/exporters/elasticsearch/es_log_exporter.h", - "include/opentelemetry/exporters/elasticsearch/json.hpp", ], strip_include_prefix = "include", deps = [ diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index df52785a5c..aa26c4b51a 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -1,5 +1,6 @@ #pragma once +#include "nlohmann/json.hpp" #include "opentelemetry/logs/log_record.h" #include "opentelemetry/nostd/type_traits.h" #include "opentelemetry/sdk/logs/exporter.h" @@ -11,7 +12,6 @@ #include #include #include -#include "opentelemetry/exporters/elasticsearch/json.hpp" namespace nostd = opentelemetry::nostd; namespace sdklogs = opentelemetry::sdk::logs; From 99969747dfe905168eb3f4c57af5b9ccc4466654 Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 11 Dec 2020 13:10:34 -0700 Subject: [PATCH 03/18] Storing attributes/resource in JSON, helper methods --- .../exporters/elasticsearch/es_log_exporter.h | 42 +++++++++ .../elasticsearch/src/es_log_exporter.cc | 90 +++++++++++++------ .../test/es_log_exporter_test.cc | 2 +- 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index aa26c4b51a..4994ddcded 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -86,6 +86,7 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter */ bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; private: + // Stores if this exporter had its Shutdown() method called bool isShutdown_ = false; // Configuration options for the exporter @@ -94,6 +95,47 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter // Connection related variables opentelemetry::ext::http::client::curl::SessionManager session_manager_; std::shared_ptr session_; + + /** + * Converts a log record into a nlohmann::json object. + */ + json RecordToJSON(std::unique_ptr record); + + /** + * Converts a common::AttributeValue into a string, which is used for parsing the attributes + * and resource KeyValueIterables + */ + const std::string ValueToString(const common::AttributeValue &value) + { + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + return (opentelemetry::nostd::get(value) ? "true" : "false"); + break; + case common::AttributeType::TYPE_INT: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_INT64: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_UINT: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_UINT64: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_DOUBLE: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_STRING: + case common::AttributeType::TYPE_CSTRING: + return opentelemetry::nostd::get(value).data(); + break; + default: + return "Invalid type"; + break; + } + } }; } // namespace logs } // namespace exporter diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 968b222ae4..fd41c729f0 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -80,8 +80,6 @@ ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOp // session_ = session_manager_.CreateSession(options_.host_, options_.port_); } - - sdklogs::ExportResult ElasticsearchLogExporter::Export( const nostd::span> &records) noexcept { @@ -97,28 +95,8 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( for (auto &record : records) { // Convert the log record to a JSON object - json log; - log["timestamp"] = record->timestamp.time_since_epoch().count(); - log["severity"] = record->severity; - log["name"] = record->name; - log["body"] = record->body; - - char trace_buf[32]; - record->trace_id.ToLowerBase16(trace_buf); - log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); - - char span_buf[16]; - record->span_id.ToLowerBase16(span_buf); - log["spanid"] = std::string(span_buf, sizeof(span_buf)); - - char flag_buf[2]; - record->trace_flag.ToLowerBase16(flag_buf); - log["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); - - // TODO: Write resource and attributes into JSON - // log["resource"] = record->resource; - // log["attributes"] = record->attributes; - logs.emplace_back(log); + + logs.emplace_back(RecordToJSON(std::move(record))); } // Create a connection to the ElasticSearch instance @@ -184,13 +162,73 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { - //isShutdown_ = true; - + isShutdown_ = true; //session_->FinishSession(); return true; } +json ElasticsearchLogExporter::RecordToJSON(std::unique_ptr record) +{ + // Create a json object to store the LogRecord information in + json log; + + // Write the simple fields + log["timestamp"] = record->timestamp.time_since_epoch().count(); + log["name"] = record->name; + log["body"] = record->body; + + // Write the severity by converting it from its enum to a string + static opentelemetry::nostd::string_view severityStringMap_[25] = { + "kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2", + "kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn", + "kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4", + "kFatal", "kFatal2", "kFatal3", "kFatal4"}; + log["severity"] = severityStringMap_[static_cast(record->severity)]; + + //Convert the resources into a JSON object + json resource; + if (record->resource != nullptr) + { + record->resource->ForEachKeyValue([&](nostd::string_view key, + common::AttributeValue value) noexcept { + resource[key.data()] = ValueToString(value); + return true; + }); + + //Push the attributes JSON object into the main JSON under the "resource" key + log.push_back({"resource", resource}); + } + + //Convert the attributes into a JSON object + json attributes; + if (record->attributes != nullptr) + { + record->attributes->ForEachKeyValue([&](nostd::string_view key, + common::AttributeValue value) noexcept { + attributes[key.data()] = ValueToString(value); + return true; + }); + + //Push the attributes JSON object into the main JSON under the "attributes" key + log.push_back({"attributes", attributes}); + } + + char trace_buf[32]; + record->trace_id.ToLowerBase16(trace_buf); + log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); + + char span_buf[16]; + record->span_id.ToLowerBase16(span_buf); + log["spanid"] = std::string(span_buf, sizeof(span_buf)); + + char flag_buf[2]; + record->trace_flags.ToLowerBase16(flag_buf); + log["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); + + return log; +} + } // namespace logs } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 4097e0e163..961fb0f582 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -16,7 +16,7 @@ namespace logs_exporter = opentelemetry::exporter::logs; // Attempt to write a log to an invalid host/port, test that the timeout works properly TEST(ElasticsearchLogsExporter, InvalidEndpoint) { // Create options for the elasticsearch exporter - logs_exporter::ElasticsearchExporterOptions options("randominvalidhost", 3000); + logs_exporter::ElasticsearchExporterOptions options("invalidhost", -1); options.response_timeout_ = 10; // Wait 10 seconds to receive a response // Create an elasticsearch exporter From aa1485c9945b035cd3941c8244fcec8f41383adb Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 11 Dec 2020 13:52:49 -0700 Subject: [PATCH 04/18] Mock server partial completion --- .../elasticsearch/src/es_log_exporter.cc | 1 + .../test/es_log_exporter_test.cc | 94 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index fd41c729f0..dec652df86 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -214,6 +214,7 @@ json ElasticsearchLogExporter::RecordToJSON(std::unique_ptrtrace_id.ToLowerBase16(trace_buf); log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 961fb0f582..0439fbbbb3 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -4,6 +4,7 @@ #include "opentelemetry/logs/provider.h" #include "opentelemetry/sdk/logs/logger_provider.h" #include "opentelemetry/sdk/logs/simple_log_processor.h" +#include "opentelemetry/ext/http/server/http_server.h" #include #include @@ -13,8 +14,70 @@ namespace logs_api = opentelemetry::logs; namespace nostd = opentelemetry::nostd; namespace logs_exporter = opentelemetry::exporter::logs; +#define HTTP_PORT 19000 + +/** + * Create a class that mocks an elasticsearch instance + */ +class ElasticsearchLogsExporterTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRequestCallback +{ +protected: + HTTP_SERVER_NS::HttpServer server_; + std::string server_address_; + std::atomic is_setup_; + std::atomic is_running_; + std::vector received_requests_; + std::mutex mtx_requests; + std::condition_variable cv_got_events; + std::mutex cv_m; + +public: + ElasticsearchLogsExporterTests() : is_setup_(false), is_running_(false){}; + + virtual void SetUp() override + { + if (is_setup_.exchange(true)) + { + return; + } + int port = server_.addListeningPort(HTTP_PORT); + std::ostringstream os; + os << "localhost:" << port; + server_address_ = "http://" + os.str() + "/simple/"; + server_.setServerName(os.str()); + server_.setKeepalive(false); + server_.addHandler("/simple/", *this); + server_.addHandler("/get/", *this); + server_.addHandler("/post/", *this); + server_.start(); + is_running_ = true; + } + + virtual void TearDown() override + { + if (!is_setup_.exchange(false)) + return; + server_.stop(); + is_running_ = false; + } + + virtual int onHttpRequest(HTTP_SERVER_NS::HttpRequest const &request, + HTTP_SERVER_NS::HttpResponse &response) override + { + if (request.uri == "/logs/") + { + std::unique_lock lk(mtx_requests); + received_requests_.push_back(request); + response.headers["Content-Type"] = "application/json"; + response.body = "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"; + return 200; + } + return 404; + } +}; + // Attempt to write a log to an invalid host/port, test that the timeout works properly -TEST(ElasticsearchLogsExporter, InvalidEndpoint) { +TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) { // Create options for the elasticsearch exporter logs_exporter::ElasticsearchExporterOptions options("invalidhost", -1); options.response_timeout_ = 10; // Wait 10 seconds to receive a response @@ -42,11 +105,12 @@ TEST(ElasticsearchLogsExporter, InvalidEndpoint) { } // Test that when the exporter is shutdown, any call to Export should return failure -TEST(ElasticsearchLogsExporter, Shutdown) { +TEST(ElasticsearchLogsExporterTests, Shutdown) { // Create an elasticsearch exporter and immediately shut it down auto exporter = std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); - exporter->Shutdown(); + bool shutdownResult = exporter->Shutdown(); + ASSERT_TRUE(shutdownResult); // Write a log to the shutdown exporter auto record = std::unique_ptr(new logs_api::LogRecord()); @@ -56,3 +120,27 @@ TEST(ElasticsearchLogsExporter, Shutdown) { // Ensure the return value is failure ASSERT_EQ(result, sdklogs::ExportResult::kFailure); } + +// Write a log to the mock server, but gets a failure response +TEST(ElasticsearchLogsExporterTests, FailureResponseCode) { + // Create an elasticsearch exporter with config options that specify the endpoint + // - host = localhost + // - port = HTTP_PORT + // - index = logs + // - timeout = 5 seconds + logs_exporter::ElasticsearchExporterOptions options("localhost", HTTP_PORT, "logs", 5); + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); + + // Create a log record with a name that the mock server is programmed to reject + auto record = std::unique_ptr(new logs_api::LogRecord()); + record->name = "Bad Record!"; + nostd::span> batch(&record, 1); + + // Send the record to the mock server + auto result = exporter->Export(batch); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +} + From 31fe565d5f7eb9c2af1eeab6797184239a1dcb78 Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 11 Dec 2020 16:05:53 -0700 Subject: [PATCH 05/18] Formatting and remove cmake build --- exporters/CMakeLists.txt | 2 +- exporters/elasticsearch/BUILD | 5 +- exporters/elasticsearch/CMakeLists.txt | 3 +- .../exporters/elasticsearch/es_log_exporter.h | 102 +++++++------ .../elasticsearch/src/es_log_exporter.cc | 95 ++++++------ .../test/es_log_exporter_test.cc | 138 +++++++++--------- 6 files changed, 179 insertions(+), 166 deletions(-) diff --git a/exporters/CMakeLists.txt b/exporters/CMakeLists.txt index 05ff2c714c..78036bfe98 100644 --- a/exporters/CMakeLists.txt +++ b/exporters/CMakeLists.txt @@ -4,7 +4,7 @@ endif() add_subdirectory(ostream) add_subdirectory(memory) -add_subdirectory(elasticsearch) +# add_subdirectory(elasticsearch) if(WITH_PROMETHEUS) add_subdirectory(prometheus) diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD index 7b247dfc7f..5c308f4781 100644 --- a/exporters/elasticsearch/BUILD +++ b/exporters/elasticsearch/BUILD @@ -10,10 +10,10 @@ cc_library( ], strip_include_prefix = "include", deps = [ - "//sdk/src/logs", "//ext:headers", - "@github_nlohmann_json//:json", + "//sdk/src/logs", "@curl", + "@github_nlohmann_json//:json", ], ) @@ -22,6 +22,7 @@ cc_test( srcs = ["test/es_log_exporter_test.cc"], deps = [ ":es_log_exporter", + "@curl", "@com_google_googletest//:gtest_main", ], ) diff --git a/exporters/elasticsearch/CMakeLists.txt b/exporters/elasticsearch/CMakeLists.txt index 82faf1da6d..45cdf3ea2b 100644 --- a/exporters/elasticsearch/CMakeLists.txt +++ b/exporters/elasticsearch/CMakeLists.txt @@ -1,4 +1,5 @@ include_directories(include) +include_directories(${CMAKE_SOURCE_DIR}/ext/include) add_library(opentelemetry_exporter_elasticsearch_logs src/es_log_exporter.cc) @@ -11,4 +12,4 @@ if(BUILD_TESTING) gtest_add_tests(TARGET es_log_exporter_test TEST_PREFIX exporter. TEST_LIST es_log_exporter_test) -endif() # BUILD_TESTING +endif() # BUILD_TESTING \ No newline at end of file diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 4994ddcded..8b96a09dc9 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -1,10 +1,10 @@ #pragma once #include "nlohmann/json.hpp" +#include "opentelemetry/ext//http/client/curl//http_client_curl.h" #include "opentelemetry/logs/log_record.h" #include "opentelemetry/nostd/type_traits.h" #include "opentelemetry/sdk/logs/exporter.h" -#include "opentelemetry/ext//http/client/curl//http_client_curl.h" #include "opentelemetry/version.h" @@ -15,7 +15,7 @@ namespace nostd = opentelemetry::nostd; namespace sdklogs = opentelemetry::sdk::logs; -using json = nlohmann::json; +using json = nlohmann::json; OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -39,19 +39,24 @@ struct ElasticsearchExporterOptions bool console_debug_; /** - * + * * @param host The host of the Elasticsearch instance - * @param port The port of the Elasticsearch instance + * @param port The port of the Elasticsearch instance * @param index The index/shard that the logs will be written to - * @param response_timeout The maximum time the exporter should wait after sending a request to Elasticsearch + * @param response_timeout The maximum time the exporter should wait after sending a request to + * Elasticsearch * @param console_debug Print the status of the exporter methods in the console */ - ElasticsearchExporterOptions(std::string host = "localhost", int port = 9200, std::string index = "logs/_doc?pretty", int response_timeout = 30, bool console_debug = false) : - host_{host}, - port_{port}, - index_{index}, - response_timeout_{response_timeout}, - console_debug_{console_debug} + ElasticsearchExporterOptions(std::string host = "localhost", + int port = 9200, + std::string index = "logs/_doc?pretty", + int response_timeout = 30, + bool console_debug = false) + : host_{host}, + port_{port}, + index_{index}, + response_timeout_{response_timeout}, + console_debug_{console_debug} {} }; @@ -73,22 +78,23 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter ElasticsearchLogExporter(const ElasticsearchExporterOptions &options); /** - * Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a + * Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a * timeout specified from the options passed from the constructor. * @param records A list of log records to send to Elasticsearch. */ - sdklogs::ExportResult Export(const nostd::span> - &records) noexcept override; + sdklogs::ExportResult Export(const nostd::span> + &records) noexcept override; /** - * Shutdown this exporter. + * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return */ bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + private: // Stores if this exporter had its Shutdown() method called bool isShutdown_ = false; - + // Configuration options for the exporter ElasticsearchExporterOptions options_; @@ -98,44 +104,44 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter /** * Converts a log record into a nlohmann::json object. - */ + */ json RecordToJSON(std::unique_ptr record); /** * Converts a common::AttributeValue into a string, which is used for parsing the attributes * and resource KeyValueIterables - */ + */ const std::string ValueToString(const common::AttributeValue &value) - { - switch (value.index()) - { - case common::AttributeType::TYPE_BOOL: - return (opentelemetry::nostd::get(value) ? "true" : "false"); - break; - case common::AttributeType::TYPE_INT: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_INT64: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_UINT: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_UINT64: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_DOUBLE: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_STRING: - case common::AttributeType::TYPE_CSTRING: - return opentelemetry::nostd::get(value).data(); - break; - default: - return "Invalid type"; - break; - } - } + { + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + return (opentelemetry::nostd::get(value) ? "true" : "false"); + break; + case common::AttributeType::TYPE_INT: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_INT64: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_UINT: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_UINT64: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_DOUBLE: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_STRING: + case common::AttributeType::TYPE_CSTRING: + return opentelemetry::nostd::get(value).data(); + break; + default: + return "Invalid type"; + break; + } + } }; } // namespace logs } // namespace exporter diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index dec652df86..a958803acb 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -2,8 +2,8 @@ #include -namespace nostd = opentelemetry::nostd; -namespace sdklogs = opentelemetry::sdk::logs; +namespace nostd = opentelemetry::nostd; +namespace sdklogs = opentelemetry::sdk::logs; namespace http_client = opentelemetry::ext::http::client; OPENTELEMETRY_BEGIN_NAMESPACE @@ -17,8 +17,9 @@ namespace logs class ResponseHandler : public http_client::EventHandler { public: - /** - * Automatically called when the response is received, store the body into a string and notify any threads blocked on this result + /** + * Automatically called when the response is received, store the body into a string and notify any + * threads blocked on this result */ void OnResponse(http_client::Response &response) noexcept override { @@ -29,9 +30,10 @@ class ResponseHandler : public http_client::EventHandler response_received_ = true; cv_.notify_all(); } - - /** - * A method the user calls to block their thread until the response is received, or the timeout is exceeded. + + /** + * A method the user calls to block their thread until the response is received, or the timeout is + * exceeded. */ bool waitForResponse(unsigned int timeoutSec = 1) { @@ -41,22 +43,21 @@ class ResponseHandler : public http_client::EventHandler return response_received_; } - /** + /** * Returns the body of the response */ std::string GetResponseBody() { - if(!response_received_) + if (!response_received_) return "No response"; - + return body_; } // Virtual method definition that isn't used void OnEvent(http_client::SessionState state, - opentelemetry::nostd::string_view reason) noexcept override + opentelemetry::nostd::string_view reason) noexcept override {} - private: // Define a mutex and condition variable @@ -69,11 +70,10 @@ class ResponseHandler : public http_client::EventHandler std::string body_ = ""; }; -ElasticsearchLogExporter::ElasticsearchLogExporter() : - options_(ElasticsearchExporterOptions()) {} +ElasticsearchLogExporter::ElasticsearchLogExporter() : options_(ElasticsearchExporterOptions()) {} -ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options) : - options_{options} +ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options) + : options_{options} { // Create a connection to the ElasticSearch instance // TODO: Figure out why not working @@ -94,7 +94,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( for (auto &record : records) { - // Convert the log record to a JSON object + // Convert the log record to a JSON object logs.emplace_back(RecordToJSON(std::move(record))); } @@ -110,9 +110,10 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // Add the request body std::string body = ""; - for(int i = 0; i < logs.size(); i++) + for (int i = 0; i < logs.size(); i++) { - // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified in URI + // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified + // in URI body += "{\"index\" : {}}\n"; // Add the context of the Log Record @@ -120,23 +121,23 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( } std::vector body_vec(body.begin(), body.end()); request->SetBody(body_vec); - + // Send the request std::unique_ptr handler(new ResponseHandler()); session->SendRequest(*handler); - + // Wait for the response to be received - if(options_.console_debug_) - std::cout << "waiting for response from Elasticsearch (timeout = " - << options_.response_timeout_ << " seconds)" << std::endl; + if (options_.console_debug_) + std::cout << "waiting for response from Elasticsearch (timeout = " << options_.response_timeout_ + << " seconds)" << std::endl; bool receivedResponse = handler->waitForResponse(options_.response_timeout_); // If the response was never received - if(!receivedResponse) - { + if (!receivedResponse) + { // TODO: retry logic - if(options_.console_debug_) + if (options_.console_debug_) std::cout << "Request exceeded timeout, aborting..." << std::endl; return sdklogs::ExportResult::kFailure; @@ -145,9 +146,9 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // Parse the response output to determine if the request wasen't successful std::string responseBody = handler->GetResponseBody(); std::cout << responseBody << std::endl; - if(responseBody.find("\"failed\" : 0") == std::string::npos) + if (responseBody.find("\"failed\" : 0") == std::string::npos) { - if(options_.console_debug_) + if (options_.console_debug_) { std::cout << "Logs were not written to Elasticsearch correctly, response body:" << std::endl; std::cout << responseBody << std::endl; @@ -163,7 +164,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { isShutdown_ = true; - //session_->FinishSession(); + // session_->FinishSession(); return true; } @@ -171,36 +172,36 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc json ElasticsearchLogExporter::RecordToJSON(std::unique_ptr record) { // Create a json object to store the LogRecord information in - json log; + json log; // Write the simple fields - log["timestamp"] = record->timestamp.time_since_epoch().count(); - log["name"] = record->name; - log["body"] = record->body; + log["timestamp"] = record->timestamp.time_since_epoch().count(); + log["name"] = record->name; + log["body"] = record->body; // Write the severity by converting it from its enum to a string static opentelemetry::nostd::string_view severityStringMap_[25] = { - "kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2", - "kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn", - "kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4", - "kFatal", "kFatal2", "kFatal3", "kFatal4"}; - log["severity"] = severityStringMap_[static_cast(record->severity)]; + "kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2", + "kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn", + "kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4", + "kFatal", "kFatal2", "kFatal3", "kFatal4"}; + log["severity"] = severityStringMap_[static_cast(record->severity)]; - //Convert the resources into a JSON object + // Convert the resources into a JSON object json resource; if (record->resource != nullptr) { record->resource->ForEachKeyValue([&](nostd::string_view key, - common::AttributeValue value) noexcept { + common::AttributeValue value) noexcept { resource[key.data()] = ValueToString(value); return true; }); - //Push the attributes JSON object into the main JSON under the "resource" key + // Push the attributes JSON object into the main JSON under the "resource" key log.push_back({"resource", resource}); } - //Convert the attributes into a JSON object + // Convert the attributes into a JSON object json attributes; if (record->attributes != nullptr) { @@ -210,22 +211,22 @@ json ElasticsearchLogExporter::RecordToJSON(std::unique_ptrtrace_id.ToLowerBase16(trace_buf); - log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); + log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); char span_buf[16]; record->span_id.ToLowerBase16(span_buf); - log["spanid"] = std::string(span_buf, sizeof(span_buf)); + log["spanid"] = std::string(span_buf, sizeof(span_buf)); char flag_buf[2]; record->trace_flags.ToLowerBase16(flag_buf); - log["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); + log["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); return log; } diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 0439fbbbb3..46726467f5 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -1,25 +1,26 @@ #include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" +#include "opentelemetry/ext/http/server/http_server.h" #include "opentelemetry/logs/log_record.h" -#include "opentelemetry/sdk/logs/exporter.h" #include "opentelemetry/logs/provider.h" +#include "opentelemetry/sdk/logs/exporter.h" #include "opentelemetry/sdk/logs/logger_provider.h" #include "opentelemetry/sdk/logs/simple_log_processor.h" -#include "opentelemetry/ext/http/server/http_server.h" #include #include -namespace sdklogs = opentelemetry::sdk::logs; -namespace logs_api = opentelemetry::logs; -namespace nostd = opentelemetry::nostd; +namespace sdklogs = opentelemetry::sdk::logs; +namespace logs_api = opentelemetry::logs; +namespace nostd = opentelemetry::nostd; namespace logs_exporter = opentelemetry::exporter::logs; #define HTTP_PORT 19000 /** * Create a class that mocks an elasticsearch instance - */ -class ElasticsearchLogsExporterTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRequestCallback + */ +class ElasticsearchLogsExporterTests : public ::testing::Test, + public HTTP_SERVER_NS::HttpRequestCallback { protected: HTTP_SERVER_NS::HttpServer server_; @@ -77,70 +78,73 @@ class ElasticsearchLogsExporterTests : public ::testing::Test, public HTTP_SERVE }; // Attempt to write a log to an invalid host/port, test that the timeout works properly -TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) { - // Create options for the elasticsearch exporter - logs_exporter::ElasticsearchExporterOptions options("invalidhost", -1); - options.response_timeout_ = 10; // Wait 10 seconds to receive a response - - // Create an elasticsearch exporter - auto exporter = - std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); - - // Create a log record - auto record = std::unique_ptr(new logs_api::LogRecord()); - record->name = "Timeout Log"; - - // Write the log record to the exporter, and time the duration - nostd::span> batch(&record, 1); - auto t1 = std::chrono::high_resolution_clock::now(); - auto result = exporter->Export(batch); - auto t2 = std::chrono::high_resolution_clock::now(); - - // Ensure the timeout is within the range of the timeout specified ([10, 10 + 1] seconds) - auto duration = std::chrono::duration_cast(t2 - t1).count(); - ASSERT_TRUE((duration >= options.response_timeout_) && (duration < options.response_timeout_ + 1)); - - // Ensure the return value is failure - ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) +{ + // Create options for the elasticsearch exporter + logs_exporter::ElasticsearchExporterOptions options("invalidhost", -1); + options.response_timeout_ = 10; // Wait 10 seconds to receive a response + + // Create an elasticsearch exporter + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); + + // Create a log record + auto record = std::unique_ptr(new logs_api::LogRecord()); + record->name = "Timeout Log"; + + // Write the log record to the exporter, and time the duration + nostd::span> batch(&record, 1); + auto t1 = std::chrono::high_resolution_clock::now(); + auto result = exporter->Export(batch); + auto t2 = std::chrono::high_resolution_clock::now(); + + // Ensure the timeout is within the range of the timeout specified ([10, 10 + 1] seconds) + auto duration = std::chrono::duration_cast(t2 - t1).count(); + ASSERT_TRUE((duration >= options.response_timeout_) && + (duration < options.response_timeout_ + 1)); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); } // Test that when the exporter is shutdown, any call to Export should return failure -TEST(ElasticsearchLogsExporterTests, Shutdown) { - // Create an elasticsearch exporter and immediately shut it down - auto exporter = - std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); - bool shutdownResult = exporter->Shutdown(); - ASSERT_TRUE(shutdownResult); - - // Write a log to the shutdown exporter - auto record = std::unique_ptr(new logs_api::LogRecord()); - nostd::span> batch(&record, 1); - auto result = exporter->Export(batch); - - // Ensure the return value is failure - ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +TEST(ElasticsearchLogsExporterTests, Shutdown) +{ + // Create an elasticsearch exporter and immediately shut it down + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); + bool shutdownResult = exporter->Shutdown(); + ASSERT_TRUE(shutdownResult); + + // Write a log to the shutdown exporter + auto record = std::unique_ptr(new logs_api::LogRecord()); + nostd::span> batch(&record, 1); + auto result = exporter->Export(batch); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); } // Write a log to the mock server, but gets a failure response -TEST(ElasticsearchLogsExporterTests, FailureResponseCode) { - // Create an elasticsearch exporter with config options that specify the endpoint - // - host = localhost - // - port = HTTP_PORT - // - index = logs - // - timeout = 5 seconds - logs_exporter::ElasticsearchExporterOptions options("localhost", HTTP_PORT, "logs", 5); - auto exporter = - std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); - - // Create a log record with a name that the mock server is programmed to reject - auto record = std::unique_ptr(new logs_api::LogRecord()); - record->name = "Bad Record!"; - nostd::span> batch(&record, 1); - - // Send the record to the mock server - auto result = exporter->Export(batch); - - // Ensure the return value is failure - ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +TEST(ElasticsearchLogsExporterTests, FailureResponseCode) +{ + // Create an elasticsearch exporter with config options that specify the endpoint + // - host = localhost + // - port = HTTP_PORT + // - index = logs + // - timeout = 5 seconds + logs_exporter::ElasticsearchExporterOptions options("localhost", HTTP_PORT, "logs", 5); + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); + + // Create a log record with a name that the mock server is programmed to reject + auto record = std::unique_ptr(new logs_api::LogRecord()); + record->name = "Bad Record!"; + nostd::span> batch(&record, 1); + + // Send the record to the mock server + auto result = exporter->Export(batch); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); } - From 0033b6d4ce989c302dd967219439aecfdb88aef6 Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 11 Dec 2020 16:23:33 -0700 Subject: [PATCH 06/18] formatting --- exporters/elasticsearch/BUILD | 2 +- exporters/elasticsearch/CMakeLists.txt | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD index 5c308f4781..31df4398ad 100644 --- a/exporters/elasticsearch/BUILD +++ b/exporters/elasticsearch/BUILD @@ -22,7 +22,7 @@ cc_test( srcs = ["test/es_log_exporter_test.cc"], deps = [ ":es_log_exporter", - "@curl", "@com_google_googletest//:gtest_main", + "@curl", ], ) diff --git a/exporters/elasticsearch/CMakeLists.txt b/exporters/elasticsearch/CMakeLists.txt index 45cdf3ea2b..7dee9a6a27 100644 --- a/exporters/elasticsearch/CMakeLists.txt +++ b/exporters/elasticsearch/CMakeLists.txt @@ -10,6 +10,8 @@ if(BUILD_TESTING) es_log_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_exporter_elasticsearch_logs) - gtest_add_tests(TARGET es_log_exporter_test TEST_PREFIX exporter. TEST_LIST - es_log_exporter_test) -endif() # BUILD_TESTING \ No newline at end of file + gtest_add_tests( + TARGET es_log_exporter_test + TEST_PREFIX exporter. + TEST_LIST es_log_exporter_test) +endif() # BUILD_TESTING From 22f12662085e203907fa9cf2689047f7a1f6560a Mon Sep 17 00:00:00 2001 From: Seufert Date: Sun, 13 Dec 2020 13:18:11 -0700 Subject: [PATCH 07/18] thread sanatization fix, formatting --- exporters/elasticsearch/CMakeLists.txt | 4 +- .../exporters/elasticsearch/es_log_exporter.h | 33 ++++--- .../elasticsearch/src/es_log_exporter.cc | 40 ++++++--- .../test/es_log_exporter_test.cc | 90 +------------------ 4 files changed, 54 insertions(+), 113 deletions(-) diff --git a/exporters/elasticsearch/CMakeLists.txt b/exporters/elasticsearch/CMakeLists.txt index 7dee9a6a27..3fc96fea57 100644 --- a/exporters/elasticsearch/CMakeLists.txt +++ b/exporters/elasticsearch/CMakeLists.txt @@ -11,7 +11,7 @@ if(BUILD_TESTING) opentelemetry_exporter_elasticsearch_logs) gtest_add_tests( - TARGET es_log_exporter_test - TEST_PREFIX exporter. + TARGET es_log_exporter_test + TEST_PREFIX exporter. TEST_LIST es_log_exporter_test) endif() # BUILD_TESTING diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 8b96a09dc9..4278e1db82 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -1,17 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #pragma once #include "nlohmann/json.hpp" -#include "opentelemetry/ext//http/client/curl//http_client_curl.h" +#include "opentelemetry/ext/http/client/curl/http_client_curl.h" #include "opentelemetry/logs/log_record.h" #include "opentelemetry/nostd/type_traits.h" #include "opentelemetry/sdk/logs/exporter.h" -#include "opentelemetry/version.h" - #include #include -#include -#include namespace nostd = opentelemetry::nostd; namespace sdklogs = opentelemetry::sdk::logs; @@ -39,7 +51,8 @@ struct ElasticsearchExporterOptions bool console_debug_; /** - * + * Constructor for the ElasticsearchExporterOptions. By default, the endpoint is + * localhost:9200/logs with a timeout of 30 seconds and disabled console debugging * @param host The host of the Elasticsearch instance * @param port The port of the Elasticsearch instance * @param index The index/shard that the logs will be written to @@ -49,7 +62,7 @@ struct ElasticsearchExporterOptions */ ElasticsearchExporterOptions(std::string host = "localhost", int port = 9200, - std::string index = "logs/_doc?pretty", + std::string index = "logs", int response_timeout = 30, bool console_debug = false) : host_{host}, @@ -61,7 +74,7 @@ struct ElasticsearchExporterOptions }; /** - * The ElasticsearchLogExporter exports logs through an ostream to JSON format + * The ElasticsearchLogExporter exports logs to Elasticsearch in JSON format */ class ElasticsearchLogExporter final : public sdklogs::LogExporter { @@ -98,10 +111,6 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter // Configuration options for the exporter ElasticsearchExporterOptions options_; - // Connection related variables - opentelemetry::ext::http::client::curl::SessionManager session_manager_; - std::shared_ptr session_; - /** * Converts a log record into a nlohmann::json object. */ diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index a958803acb..810f1a5437 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -1,6 +1,20 @@ -#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -#include +#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" namespace nostd = opentelemetry::nostd; namespace sdklogs = opentelemetry::sdk::logs; @@ -74,11 +88,7 @@ ElasticsearchLogExporter::ElasticsearchLogExporter() : options_(ElasticsearchExp ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options) : options_{options} -{ - // Create a connection to the ElasticSearch instance - // TODO: Figure out why not working - // session_ = session_manager_.CreateSession(options_.host_, options_.port_); -} +{} sdklogs::ExportResult ElasticsearchLogExporter::Export( const nostd::span> &records) noexcept @@ -86,6 +96,9 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // Return failure if this exporter has been shutdown if (isShutdown_) { + if (options_.console_debug_) + std::cout << "Export failed, exporter is shutdown" << std::endl; + return sdklogs::ExportResult::kFailure; } @@ -94,19 +107,20 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( for (auto &record : records) { - // Convert the log record to a JSON object - + // Convert the log record to a JSON object, and store in json array logs.emplace_back(RecordToJSON(std::move(record))); } // Create a connection to the ElasticSearch instance - auto session = session_manager_.CreateSession(options_.host_, options_.port_); + opentelemetry::ext::http::client::curl::SessionManager session_manager; + auto session = session_manager.CreateSession(options_.host_, options_.port_); auto request = session->CreateRequest(); // Populate the request with headers and methods request->SetUri(options_.index_ + "/_bulk?pretty"); request->SetMethod(http_client::Method::Post); request->AddHeader("Content-Type", "application/json"); + request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_)); // Add the request body std::string body = ""; @@ -132,6 +146,11 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( << " seconds)" << std::endl; bool receivedResponse = handler->waitForResponse(options_.response_timeout_); + // End the session + session->FinishSession(); + session_manager.CancelAllSessions(); + session_manager.FinishAllSessions(); + // If the response was never received if (!receivedResponse) { @@ -164,7 +183,6 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { isShutdown_ = true; - // session_->FinishSession(); return true; } diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 46726467f5..8548d1bbdc 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -14,74 +14,11 @@ namespace logs_api = opentelemetry::logs; namespace nostd = opentelemetry::nostd; namespace logs_exporter = opentelemetry::exporter::logs; -#define HTTP_PORT 19000 - -/** - * Create a class that mocks an elasticsearch instance - */ -class ElasticsearchLogsExporterTests : public ::testing::Test, - public HTTP_SERVER_NS::HttpRequestCallback -{ -protected: - HTTP_SERVER_NS::HttpServer server_; - std::string server_address_; - std::atomic is_setup_; - std::atomic is_running_; - std::vector received_requests_; - std::mutex mtx_requests; - std::condition_variable cv_got_events; - std::mutex cv_m; - -public: - ElasticsearchLogsExporterTests() : is_setup_(false), is_running_(false){}; - - virtual void SetUp() override - { - if (is_setup_.exchange(true)) - { - return; - } - int port = server_.addListeningPort(HTTP_PORT); - std::ostringstream os; - os << "localhost:" << port; - server_address_ = "http://" + os.str() + "/simple/"; - server_.setServerName(os.str()); - server_.setKeepalive(false); - server_.addHandler("/simple/", *this); - server_.addHandler("/get/", *this); - server_.addHandler("/post/", *this); - server_.start(); - is_running_ = true; - } - - virtual void TearDown() override - { - if (!is_setup_.exchange(false)) - return; - server_.stop(); - is_running_ = false; - } - - virtual int onHttpRequest(HTTP_SERVER_NS::HttpRequest const &request, - HTTP_SERVER_NS::HttpResponse &response) override - { - if (request.uri == "/logs/") - { - std::unique_lock lk(mtx_requests); - received_requests_.push_back(request); - response.headers["Content-Type"] = "application/json"; - response.body = "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"; - return 200; - } - return 404; - } -}; - // Attempt to write a log to an invalid host/port, test that the timeout works properly TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) { // Create options for the elasticsearch exporter - logs_exporter::ElasticsearchExporterOptions options("invalidhost", -1); + logs_exporter::ElasticsearchExporterOptions options("localhost", -1, "logs", 5, true); options.response_timeout_ = 10; // Wait 10 seconds to receive a response // Create an elasticsearch exporter @@ -91,6 +28,7 @@ TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) // Create a log record auto record = std::unique_ptr(new logs_api::LogRecord()); record->name = "Timeout Log"; + std::map m = {{"key1", "value1"}, {"key2", "value2"}}; // Write the log record to the exporter, and time the duration nostd::span> batch(&record, 1); @@ -124,27 +62,3 @@ TEST(ElasticsearchLogsExporterTests, Shutdown) // Ensure the return value is failure ASSERT_EQ(result, sdklogs::ExportResult::kFailure); } - -// Write a log to the mock server, but gets a failure response -TEST(ElasticsearchLogsExporterTests, FailureResponseCode) -{ - // Create an elasticsearch exporter with config options that specify the endpoint - // - host = localhost - // - port = HTTP_PORT - // - index = logs - // - timeout = 5 seconds - logs_exporter::ElasticsearchExporterOptions options("localhost", HTTP_PORT, "logs", 5); - auto exporter = - std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); - - // Create a log record with a name that the mock server is programmed to reject - auto record = std::unique_ptr(new logs_api::LogRecord()); - record->name = "Bad Record!"; - nostd::span> batch(&record, 1); - - // Send the record to the mock server - auto result = exporter->Export(batch); - - // Ensure the return value is failure - ASSERT_EQ(result, sdklogs::ExportResult::kFailure); -} From faa8de51a12807e0a200760b6d53765f895a203e Mon Sep 17 00:00:00 2001 From: Seufert Date: Mon, 14 Dec 2020 10:55:47 -0700 Subject: [PATCH 08/18] Fixed Windows Bazel build issue --- exporters/elasticsearch/BUILD | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD index 31df4398ad..d712bb8788 100644 --- a/exporters/elasticsearch/BUILD +++ b/exporters/elasticsearch/BUILD @@ -8,6 +8,17 @@ cc_library( hdrs = [ "include/opentelemetry/exporters/elasticsearch/es_log_exporter.h", ], + copts = [ + "-DCURL_STATICLIB", + ], + linkopts = select({ + "//bazel:windows": [ + "-DEFAULTLIB:advapi32.lib", + "-DEFAULTLIB:crypt32.lib", + "-DEFAULTLIB:Normaliz.lib", + ], + "//conditions:default": [], + }), strip_include_prefix = "include", deps = [ "//ext:headers", From b358555311d1e5a3e1aba47f45c6d7d25fb35253 Mon Sep 17 00:00:00 2001 From: Seufert Date: Mon, 14 Dec 2020 12:27:53 -0700 Subject: [PATCH 09/18] Made sessionmanager a private member --- .../exporters/elasticsearch/es_log_exporter.h | 3 +++ exporters/elasticsearch/src/es_log_exporter.cc | 18 +++++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 4278e1db82..c9f8cd1d1c 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -111,6 +111,9 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter // Configuration options for the exporter ElasticsearchExporterOptions options_; + // Object that stores the HTTP sessions that have been created + std::unique_ptr session_manager_; + /** * Converts a log record into a nlohmann::json object. */ diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 810f1a5437..6faf148c12 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -74,7 +74,7 @@ class ResponseHandler : public http_client::EventHandler {} private: - // Define a mutex and condition variable + // Define a condition variable used for blocking std::condition_variable cv_; // Whether the response from Elasticsearch has been received @@ -84,10 +84,13 @@ class ResponseHandler : public http_client::EventHandler std::string body_ = ""; }; -ElasticsearchLogExporter::ElasticsearchLogExporter() : options_(ElasticsearchExporterOptions()) {} +ElasticsearchLogExporter::ElasticsearchLogExporter() + : options_{ElasticsearchExporterOptions()}, + session_manager_{new ext::http::client::curl::SessionManager()} +{} ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options) - : options_{options} + : options_{options}, session_manager_{new ext::http::client::curl::SessionManager()} {} sdklogs::ExportResult ElasticsearchLogExporter::Export( @@ -112,8 +115,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( } // Create a connection to the ElasticSearch instance - opentelemetry::ext::http::client::curl::SessionManager session_manager; - auto session = session_manager.CreateSession(options_.host_, options_.port_); + auto session = session_manager_->CreateSession(options_.host_, options_.port_); auto request = session->CreateRequest(); // Populate the request with headers and methods @@ -148,8 +150,6 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // End the session session->FinishSession(); - session_manager.CancelAllSessions(); - session_manager.FinishAllSessions(); // If the response was never received if (!receivedResponse) @@ -184,6 +184,10 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc { isShutdown_ = true; + // Shutdown the session manager + session_manager_->CancelAllSessions(); + session_manager_->FinishAllSessions(); + return true; } From c2ef1af08941015f22fd83105e427ab2a112739c Mon Sep 17 00:00:00 2001 From: Seufert Date: Mon, 14 Dec 2020 21:29:44 -0700 Subject: [PATCH 10/18] Add JSON Recordable --- exporters/elasticsearch/BUILD | 1 + .../exporters/elasticsearch/es_log_exporter.h | 52 +---- .../elasticsearch/es_log_recordable.h | 177 ++++++++++++++++++ .../elasticsearch/src/es_log_exporter.cc | 82 ++------ .../test/es_log_exporter_test.cc | 20 +- 5 files changed, 209 insertions(+), 123 deletions(-) create mode 100644 exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD index d712bb8788..bebe4f960f 100644 --- a/exporters/elasticsearch/BUILD +++ b/exporters/elasticsearch/BUILD @@ -7,6 +7,7 @@ cc_library( ], hdrs = [ "include/opentelemetry/exporters/elasticsearch/es_log_exporter.h", + "include/opentelemetry/exporters/elasticsearch/es_log_recordable.h", ], copts = [ "-DCURL_STATICLIB", diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index c9f8cd1d1c..7ddc690e90 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -18,9 +18,9 @@ #include "nlohmann/json.hpp" #include "opentelemetry/ext/http/client/curl/http_client_curl.h" -#include "opentelemetry/logs/log_record.h" #include "opentelemetry/nostd/type_traits.h" #include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/log_record.h" #include #include @@ -90,13 +90,18 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter */ ElasticsearchLogExporter(const ElasticsearchExporterOptions &options); + /** + * Creates a recordable that stores the data in a JSON object + */ + std::unique_ptr MakeRecordable() noexcept override; + /** * Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a * timeout specified from the options passed from the constructor. * @param records A list of log records to send to Elasticsearch. */ - sdklogs::ExportResult Export(const nostd::span> - &records) noexcept override; + sdklogs::ExportResult Export( + const nostd::span> &records) noexcept override; /** * Shutdown this exporter. @@ -113,47 +118,6 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter // Object that stores the HTTP sessions that have been created std::unique_ptr session_manager_; - - /** - * Converts a log record into a nlohmann::json object. - */ - json RecordToJSON(std::unique_ptr record); - - /** - * Converts a common::AttributeValue into a string, which is used for parsing the attributes - * and resource KeyValueIterables - */ - const std::string ValueToString(const common::AttributeValue &value) - { - switch (value.index()) - { - case common::AttributeType::TYPE_BOOL: - return (opentelemetry::nostd::get(value) ? "true" : "false"); - break; - case common::AttributeType::TYPE_INT: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_INT64: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_UINT: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_UINT64: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_DOUBLE: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_STRING: - case common::AttributeType::TYPE_CSTRING: - return opentelemetry::nostd::get(value).data(); - break; - default: - return "Invalid type"; - break; - } - } }; } // namespace logs } // namespace exporter diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h new file mode 100644 index 0000000000..a042bc740f --- /dev/null +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h @@ -0,0 +1,177 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "nlohmann/json.hpp" +#include "opentelemetry/sdk/common/attribute_utils.h" // same as traces/attribute_utils +#include "opentelemetry/sdk/logs/recordable.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace logs +{ + +/** + * An Elasticsearch Recordable implemenation that stores the 10 fields of the Log Data Model inside + * a JSON object, + */ +class ElasticSearchRecordable final : public sdk::logs::Recordable +{ +private: + // Define a JSON object that will be populated with the log data + nlohmann::json json_; + + /** + * Converts a common::AttributeValue into a string, which is used for converting a + * common::AttributeValue into a string + */ + const std::string ValueToString(const common::AttributeValue &value) + { + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + return (opentelemetry::nostd::get(value) ? "true" : "false"); + break; + case common::AttributeType::TYPE_INT: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_INT64: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_UINT: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_UINT64: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_DOUBLE: + return std::to_string(opentelemetry::nostd::get(value)); + break; + case common::AttributeType::TYPE_STRING: + case common::AttributeType::TYPE_CSTRING: + return opentelemetry::nostd::get(value).data(); + break; + default: + return "Invalid type"; + break; + } + } + +public: + /** + * Set the severity for this log. + * @param severity the severity of the event + */ + void SetSeverity(opentelemetry::logs::Severity severity) noexcept override + { + // Convert the severity enum to a string + static opentelemetry::nostd::string_view severityStringMap_[25] = { + "kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2", + "kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn", + "kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4", + "kFatal", "kFatal2", "kFatal3", "kFatal4"}; + json_["severity"] = severityStringMap_[static_cast(severity)]; + } + + /** + * Set name for this log + * @param name the name to set + */ + void SetName(nostd::string_view name) noexcept override { json_["name"] = name.data(); } + + /** + * Set body field for this log. + * @param message the body to set + */ + void SetBody(nostd::string_view message) noexcept override { json_["body"] = message.data(); } + + /** + * Set a resource for this log. + * @param name the name of the resource + * @param value the resource value + */ + void SetResource(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override + { + json_["resource"][key.data()] = ValueToString(value); + } + + /** + * Set an attribute of a log. + * @param key the key of the attribute + * @param value the attribute value + */ + void SetAttribute(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override + { + json_["attributes"][key.data()] = ValueToString(value); + } + + /** + * Set trace id for this log. + * @param trace_id the trace id to set + */ + void SetTraceId(opentelemetry::trace::TraceId trace_id) noexcept override + { + char trace_buf[32]; + trace_id.ToLowerBase16(trace_buf); + json_["traceid"] = std::string(trace_buf, sizeof(trace_buf)); + } + + /** + * Set span id for this log. + * @param span_id the span id to set + */ + virtual void SetSpanId(opentelemetry::trace::SpanId span_id) noexcept override + { + char span_buf[16]; + span_id.ToLowerBase16(span_buf); + json_["spanid"] = std::string(span_buf, sizeof(span_buf)); + } + + /** + * Inject a trace_flags for this log. + * @param trace_flags the span id to set + */ + void SetTraceFlags(opentelemetry::trace::TraceFlags trace_flags) noexcept override + { + char flag_buf[2]; + trace_flags.ToLowerBase16(flag_buf); + json_["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); + } + + /** + * Set the timestamp for this log. + * @param timestamp the timestamp of the event + */ + void SetTimestamp(core::SystemTimestamp timestamp) noexcept override + { + json_["timestamp"] = timestamp.time_since_epoch().count(); + } + + /** + * Returns a JSON object contain the log information + */ + nlohmann::json GetJSON() noexcept { return json_; }; +}; +} // namespace logs +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 6faf148c12..e9aace4276 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -15,6 +15,7 @@ */ #include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" +#include "opentelemetry/exporters/elasticsearch/es_log_recordable.h" namespace nostd = opentelemetry::nostd; namespace sdklogs = opentelemetry::sdk::logs; @@ -93,8 +94,13 @@ ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOp : options_{options}, session_manager_{new ext::http::client::curl::SessionManager()} {} +std::unique_ptr ElasticsearchLogExporter::MakeRecordable() noexcept +{ + return std::unique_ptr(new ElasticSearchRecordable); +} + sdklogs::ExportResult ElasticsearchLogExporter::Export( - const nostd::span> &records) noexcept + const nostd::span> &records) noexcept { // Return failure if this exporter has been shutdown if (isShutdown_) @@ -111,7 +117,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( for (auto &record : records) { // Convert the log record to a JSON object, and store in json array - logs.emplace_back(RecordToJSON(std::move(record))); + // logs.emplace_back(RecordToJSON(std::move(record))); } // Create a connection to the ElasticSearch instance @@ -126,14 +132,16 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // Add the request body std::string body = ""; - for (int i = 0; i < logs.size(); i++) + for (auto &record : records) { // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified // in URI body += "{\"index\" : {}}\n"; - // Add the context of the Log Record - body += logs[i].dump() + "\n"; + // Add the context of the Recordable + auto json_record = std::unique_ptr( + static_cast(record.release())); + body += json_record->GetJSON().dump() + "\n"; } std::vector body_vec(body.begin(), body.end()); request->SetBody(body_vec); @@ -164,7 +172,6 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // Parse the response output to determine if the request wasen't successful std::string responseBody = handler->GetResponseBody(); - std::cout << responseBody << std::endl; if (responseBody.find("\"failed\" : 0") == std::string::npos) { if (options_.console_debug_) @@ -190,69 +197,6 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc return true; } - -json ElasticsearchLogExporter::RecordToJSON(std::unique_ptr record) -{ - // Create a json object to store the LogRecord information in - json log; - - // Write the simple fields - log["timestamp"] = record->timestamp.time_since_epoch().count(); - log["name"] = record->name; - log["body"] = record->body; - - // Write the severity by converting it from its enum to a string - static opentelemetry::nostd::string_view severityStringMap_[25] = { - "kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2", - "kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn", - "kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4", - "kFatal", "kFatal2", "kFatal3", "kFatal4"}; - log["severity"] = severityStringMap_[static_cast(record->severity)]; - - // Convert the resources into a JSON object - json resource; - if (record->resource != nullptr) - { - record->resource->ForEachKeyValue([&](nostd::string_view key, - common::AttributeValue value) noexcept { - resource[key.data()] = ValueToString(value); - return true; - }); - - // Push the attributes JSON object into the main JSON under the "resource" key - log.push_back({"resource", resource}); - } - - // Convert the attributes into a JSON object - json attributes; - if (record->attributes != nullptr) - { - record->attributes->ForEachKeyValue([&](nostd::string_view key, - common::AttributeValue value) noexcept { - attributes[key.data()] = ValueToString(value); - return true; - }); - - // Push the attributes JSON object into the main JSON under the "attributes" key - log.push_back({"attributes", attributes}); - } - - // Convert traceid, spanid, and traceflags into strings - char trace_buf[32]; - record->trace_id.ToLowerBase16(trace_buf); - log["traceid"] = std::string(trace_buf, sizeof(trace_buf)); - - char span_buf[16]; - record->span_id.ToLowerBase16(span_buf); - log["spanid"] = std::string(span_buf, sizeof(span_buf)); - - char flag_buf[2]; - record->trace_flags.ToLowerBase16(flag_buf); - log["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); - - return log; -} - } // namespace logs } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 8548d1bbdc..158b1ca075 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -1,8 +1,8 @@ #include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" #include "opentelemetry/ext/http/server/http_server.h" -#include "opentelemetry/logs/log_record.h" #include "opentelemetry/logs/provider.h" #include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/log_record.h" #include "opentelemetry/sdk/logs/logger_provider.h" #include "opentelemetry/sdk/logs/simple_log_processor.h" @@ -26,14 +26,15 @@ TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); // Create a log record - auto record = std::unique_ptr(new logs_api::LogRecord()); - record->name = "Timeout Log"; - std::map m = {{"key1", "value1"}, {"key2", "value2"}}; + auto record = exporter->MakeRecordable(); + record->SetName("Timeout Log"); + record->SetSeverity(logs_api::Severity::kFatal); + record->SetAttribute("key1", "value1"); + record->SetAttribute("key2", "value2"); // Write the log record to the exporter, and time the duration - nostd::span> batch(&record, 1); auto t1 = std::chrono::high_resolution_clock::now(); - auto result = exporter->Export(batch); + auto result = exporter->Export(nostd::span>(&record, 1)); auto t2 = std::chrono::high_resolution_clock::now(); // Ensure the timeout is within the range of the timeout specified ([10, 10 + 1] seconds) @@ -55,10 +56,9 @@ TEST(ElasticsearchLogsExporterTests, Shutdown) ASSERT_TRUE(shutdownResult); // Write a log to the shutdown exporter - auto record = std::unique_ptr(new logs_api::LogRecord()); - nostd::span> batch(&record, 1); - auto result = exporter->Export(batch); + auto record = exporter->MakeRecordable(); + auto result = exporter->Export(nostd::span>(&record, 1)); // Ensure the return value is failure ASSERT_EQ(result, sdklogs::ExportResult::kFailure); -} +} \ No newline at end of file From 8b6404d238571f2f222c852f5c4a66aa90554e20 Mon Sep 17 00:00:00 2001 From: Seufert Date: Tue, 15 Dec 2020 10:24:51 -0700 Subject: [PATCH 11/18] JSON attributes, other nits --- .../exporters/elasticsearch/es_log_exporter.h | 8 +- .../elasticsearch/es_log_recordable.h | 101 ++++++++++-------- .../elasticsearch/src/es_log_exporter.cc | 29 ++--- .../test/es_log_exporter_test.cc | 6 +- 4 files changed, 75 insertions(+), 69 deletions(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 7ddc690e90..798a3156ac 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -56,9 +56,9 @@ struct ElasticsearchExporterOptions * @param host The host of the Elasticsearch instance * @param port The port of the Elasticsearch instance * @param index The index/shard that the logs will be written to - * @param response_timeout The maximum time the exporter should wait after sending a request to - * Elasticsearch - * @param console_debug Print the status of the exporter methods in the console + * @param response_timeout The maximum time in seconds the exporter should wait for a response + * from elasticsearch + * @param console_debug If true, print the status of the exporter methods in the console */ ElasticsearchExporterOptions(std::string host = "localhost", int port = 9200, @@ -111,7 +111,7 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter private: // Stores if this exporter had its Shutdown() method called - bool isShutdown_ = false; + bool is_shutdown_ = false; // Configuration options for the exporter ElasticsearchExporterOptions options_; diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h index a042bc740f..570a8b8b2b 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h @@ -39,42 +39,6 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable // Define a JSON object that will be populated with the log data nlohmann::json json_; - /** - * Converts a common::AttributeValue into a string, which is used for converting a - * common::AttributeValue into a string - */ - const std::string ValueToString(const common::AttributeValue &value) - { - switch (value.index()) - { - case common::AttributeType::TYPE_BOOL: - return (opentelemetry::nostd::get(value) ? "true" : "false"); - break; - case common::AttributeType::TYPE_INT: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_INT64: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_UINT: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_UINT64: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_DOUBLE: - return std::to_string(opentelemetry::nostd::get(value)); - break; - case common::AttributeType::TYPE_STRING: - case common::AttributeType::TYPE_CSTRING: - return opentelemetry::nostd::get(value).data(); - break; - default: - return "Invalid type"; - break; - } - } - public: /** * Set the severity for this log. @@ -83,12 +47,7 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable void SetSeverity(opentelemetry::logs::Severity severity) noexcept override { // Convert the severity enum to a string - static opentelemetry::nostd::string_view severityStringMap_[25] = { - "kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2", - "kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn", - "kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4", - "kFatal", "kFatal2", "kFatal3", "kFatal4"}; - json_["severity"] = severityStringMap_[static_cast(severity)]; + json_["severity"] = opentelemetry::logs::SeverityNumToText[static_cast(severity)]; } /** @@ -111,7 +70,34 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable void SetResource(nostd::string_view key, const opentelemetry::common::AttributeValue &value) noexcept override { - json_["resource"][key.data()] = ValueToString(value); + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + json_["resource"][key.data()] = opentelemetry::nostd::get(value) ? true : false; + break; + case common::AttributeType::TYPE_INT: + json_["resource"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_INT64: + json_["resource"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_UINT: + json_["resource"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_UINT64: + json_["resource"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_DOUBLE: + json_["resource"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_STRING: + case common::AttributeType::TYPE_CSTRING: + json_["resource"][key.data()] = + opentelemetry::nostd::get(value).data(); + break; + default: + break; + } } /** @@ -122,7 +108,34 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable void SetAttribute(nostd::string_view key, const opentelemetry::common::AttributeValue &value) noexcept override { - json_["attributes"][key.data()] = ValueToString(value); + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + json_["attributes"][key.data()] = opentelemetry::nostd::get(value) ? true : false; + break; + case common::AttributeType::TYPE_INT: + json_["attributes"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_INT64: + json_["attributes"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_UINT: + json_["attributes"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_UINT64: + json_["attributes"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_DOUBLE: + json_["attributes"][key.data()] = opentelemetry::nostd::get(value); + break; + case common::AttributeType::TYPE_STRING: + case common::AttributeType::TYPE_CSTRING: + json_["attributes"][key.data()] = + opentelemetry::nostd::get(value).data(); + break; + default: + break; + } } /** diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index e9aace4276..520e545fbd 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -61,13 +61,7 @@ class ResponseHandler : public http_client::EventHandler /** * Returns the body of the response */ - std::string GetResponseBody() - { - if (!response_received_) - return "No response"; - - return body_; - } + std::string GetResponseBody() { return body_; } // Virtual method definition that isn't used void OnEvent(http_client::SessionState state, @@ -103,23 +97,16 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( const nostd::span> &records) noexcept { // Return failure if this exporter has been shutdown - if (isShutdown_) + if (is_shutdown_) { if (options_.console_debug_) + { std::cout << "Export failed, exporter is shutdown" << std::endl; + } return sdklogs::ExportResult::kFailure; } - // Create a json array to store all the JSON log records - json logs = json::array(); - - for (auto &record : records) - { - // Convert the log record to a JSON object, and store in json array - // logs.emplace_back(RecordToJSON(std::move(record))); - } - // Create a connection to the ElasticSearch instance auto session = session_manager_->CreateSession(options_.host_, options_.port_); auto request = session->CreateRequest(); @@ -130,7 +117,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( request->AddHeader("Content-Type", "application/json"); request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_)); - // Add the request body + // Create the request body std::string body = ""; for (auto &record : records) { @@ -152,8 +139,10 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // Wait for the response to be received if (options_.console_debug_) + { std::cout << "waiting for response from Elasticsearch (timeout = " << options_.response_timeout_ << " seconds)" << std::endl; + } bool receivedResponse = handler->waitForResponse(options_.response_timeout_); // End the session @@ -165,7 +154,9 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( // TODO: retry logic if (options_.console_debug_) + { std::cout << "Request exceeded timeout, aborting..." << std::endl; + } return sdklogs::ExportResult::kFailure; } @@ -189,7 +180,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept { - isShutdown_ = true; + is_shutdown_ = true; // Shutdown the session manager session_manager_->CancelAllSessions(); diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 158b1ca075..6501be00c4 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -29,8 +29,10 @@ TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) auto record = exporter->MakeRecordable(); record->SetName("Timeout Log"); record->SetSeverity(logs_api::Severity::kFatal); - record->SetAttribute("key1", "value1"); - record->SetAttribute("key2", "value2"); + record->SetAttribute("key0", false); + record->SetAttribute("key1", "1"); + record->SetAttribute("key2", 2); + record->SetAttribute("key3", 3.142); // Write the log record to the exporter, and time the duration auto t1 = std::chrono::high_resolution_clock::now(); From b48638da2c006252018d88e7e59b363ba260c7fa Mon Sep 17 00:00:00 2001 From: Seufert Date: Tue, 15 Dec 2020 11:18:48 -0700 Subject: [PATCH 12/18] CMakeList and nit --- CMakeLists.txt | 3 +++ exporters/CMakeLists.txt | 5 ++++- .../opentelemetry/exporters/elasticsearch/es_log_exporter.h | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c67451ec01..6b183b0cfe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,9 @@ option(WITH_OTLP "Whether to include the OpenTelemetry Protocol in the SDK" OFF) option(WITH_PROMETHEUS "Whether to include the Prometheus Client in the SDK" OFF) +option(WITH_ELASTICSEARCH + "Whether to include the Elasticsearch Client in the SDK" OFF) + option(BUILD_TESTING "Whether to enable tests" ON) option(WITH_EXAMPLES "Whether to build examples" ON) diff --git a/exporters/CMakeLists.txt b/exporters/CMakeLists.txt index 78036bfe98..68ea177c38 100644 --- a/exporters/CMakeLists.txt +++ b/exporters/CMakeLists.txt @@ -4,8 +4,11 @@ endif() add_subdirectory(ostream) add_subdirectory(memory) -# add_subdirectory(elasticsearch) if(WITH_PROMETHEUS) add_subdirectory(prometheus) endif() + +if(WITH_ELASTICSEARCH) + add_subdirectory(elasticsearch) +endif() diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 798a3156ac..960a4b6a0b 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -107,7 +107,8 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return */ - bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; private: // Stores if this exporter had its Shutdown() method called From c8b4225d1469961b8af3de8a7b83c7c90b1cb250 Mon Sep 17 00:00:00 2001 From: Seufert Date: Tue, 15 Dec 2020 12:55:18 -0700 Subject: [PATCH 13/18] Added OnEvent() callback implementation --- .../elasticsearch/src/es_log_exporter.cc | 61 +++++++++++++------ .../test/es_log_exporter_test.cc | 47 ++++++++------ 2 files changed, 71 insertions(+), 37 deletions(-) diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 520e545fbd..b2c6366fd4 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -32,6 +32,11 @@ namespace logs class ResponseHandler : public http_client::EventHandler { public: + /** + * Creates a response handler, that by default doesn't display to console + */ + ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} + /** * Automatically called when the response is received, store the body into a string and notify any * threads blocked on this result @@ -47,14 +52,14 @@ class ResponseHandler : public http_client::EventHandler } /** - * A method the user calls to block their thread until the response is received, or the timeout is - * exceeded. + * A method the user calls to block their thread until the response is received. The longest + * duration is the timeout of the request, set by SetTimeoutMs() */ - bool waitForResponse(unsigned int timeoutSec = 1) + bool waitForResponse() { std::mutex mutex_; std::unique_lock lk(mutex_); - cv_.wait_for(lk, std::chrono::milliseconds(1000 * timeoutSec)); + cv_.wait(lk); return response_received_; } @@ -63,10 +68,35 @@ class ResponseHandler : public http_client::EventHandler */ std::string GetResponseBody() { return body_; } - // Virtual method definition that isn't used + // Callback method when an http event occurs void OnEvent(http_client::SessionState state, opentelemetry::nostd::string_view reason) noexcept override - {} + { + // If any failure event occurs, release the condition variable to unblock main thread + switch (state) + { + case http_client::SessionState::ConnectFailed: + if (console_debug_) + std::cout << "Connection to elasticsearch failed\n"; + cv_.notify_all(); + break; + case http_client::SessionState::SendFailed: + if (console_debug_) + std::cout << "Request failed to be sent to elasticsearch\n"; + cv_.notify_all(); + break; + case http_client::SessionState::TimedOut: + if (console_debug_) + std::cout << "Request to elasticsearch timed out\n"; + cv_.notify_all(); + break; + case http_client::SessionState::NetworkError: + if (console_debug_) + std::cout << "Network error to elasticsearch\n"; + cv_.notify_all(); + break; + } + } private: // Define a condition variable used for blocking @@ -77,6 +107,9 @@ class ResponseHandler : public http_client::EventHandler // A string to store the response body std::string body_ = ""; + + // Whether to print the results from the callback + bool console_debug_ = false; }; ElasticsearchLogExporter::ElasticsearchLogExporter() @@ -134,7 +167,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( request->SetBody(body_vec); // Send the request - std::unique_ptr handler(new ResponseHandler()); + std::unique_ptr handler(new ResponseHandler(options_.console_debug_)); session->SendRequest(*handler); // Wait for the response to be received @@ -143,25 +176,19 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export( std::cout << "waiting for response from Elasticsearch (timeout = " << options_.response_timeout_ << " seconds)" << std::endl; } - bool receivedResponse = handler->waitForResponse(options_.response_timeout_); + bool write_successful = handler->waitForResponse(); // End the session session->FinishSession(); - // If the response was never received - if (!receivedResponse) + // If an error occured with the HTTP request + if (!write_successful) { // TODO: retry logic - - if (options_.console_debug_) - { - std::cout << "Request exceeded timeout, aborting..." << std::endl; - } - return sdklogs::ExportResult::kFailure; } - // Parse the response output to determine if the request wasen't successful + // Parse the response output to determine if Elasticsearch consumed it correctly std::string responseBody = handler->GetResponseBody(); if (responseBody.find("\"failed\" : 0") == std::string::npos) { diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc index 6501be00c4..7e545f5bac 100644 --- a/exporters/elasticsearch/test/es_log_exporter_test.cc +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -14,35 +14,19 @@ namespace logs_api = opentelemetry::logs; namespace nostd = opentelemetry::nostd; namespace logs_exporter = opentelemetry::exporter::logs; -// Attempt to write a log to an invalid host/port, test that the timeout works properly +// Attempt to write a log to an invalid host/port, test that the Export() returns failure TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) { - // Create options for the elasticsearch exporter - logs_exporter::ElasticsearchExporterOptions options("localhost", -1, "logs", 5, true); - options.response_timeout_ = 10; // Wait 10 seconds to receive a response + // Create invalid connection options for the elasticsearch exporter + logs_exporter::ElasticsearchExporterOptions options("localhost", -1); // Create an elasticsearch exporter auto exporter = std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); - // Create a log record + // Create a log record and send to the exporter auto record = exporter->MakeRecordable(); - record->SetName("Timeout Log"); - record->SetSeverity(logs_api::Severity::kFatal); - record->SetAttribute("key0", false); - record->SetAttribute("key1", "1"); - record->SetAttribute("key2", 2); - record->SetAttribute("key3", 3.142); - - // Write the log record to the exporter, and time the duration - auto t1 = std::chrono::high_resolution_clock::now(); auto result = exporter->Export(nostd::span>(&record, 1)); - auto t2 = std::chrono::high_resolution_clock::now(); - - // Ensure the timeout is within the range of the timeout specified ([10, 10 + 1] seconds) - auto duration = std::chrono::duration_cast(t2 - t1).count(); - ASSERT_TRUE((duration >= options.response_timeout_) && - (duration < options.response_timeout_ + 1)); // Ensure the return value is failure ASSERT_EQ(result, sdklogs::ExportResult::kFailure); @@ -63,4 +47,27 @@ TEST(ElasticsearchLogsExporterTests, Shutdown) // Ensure the return value is failure ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +} + +// Test the elasticsearch recordable object +TEST(ElasticsearchLogsExporterTests, RecordableCreation) +{ + // Create an elasticsearch exporter + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); + + // Create a recordable + auto record = exporter->MakeRecordable(); + record->SetName("Timeout Log"); + record->SetSeverity(logs_api::Severity::kFatal); + record->SetTimestamp(std::chrono::system_clock::now()); + record->SetBody("Body of the log message"); + + // Attributes and resource support different types + record->SetAttribute("key0", false); + record->SetAttribute("key1", "1"); + record->SetResource("key2", 2); + record->SetResource("key3", 3.142); + + exporter->Export(nostd::span>(&record, 1)); } \ No newline at end of file From 1e735b98c0ab17ce2c6df6f3f4a0deb969b9188b Mon Sep 17 00:00:00 2001 From: Seufert Date: Tue, 15 Dec 2020 13:40:40 -0700 Subject: [PATCH 14/18] Removed unneeded using from header --- .../opentelemetry/exporters/elasticsearch/es_log_exporter.h | 1 - 1 file changed, 1 deletion(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h index 960a4b6a0b..48269c81c1 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -27,7 +27,6 @@ namespace nostd = opentelemetry::nostd; namespace sdklogs = opentelemetry::sdk::logs; -using json = nlohmann::json; OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter From 3b651365471757141acb79e1eabcff21166e5ca2 Mon Sep 17 00:00:00 2001 From: Seufert Date: Wed, 16 Dec 2020 13:36:23 -0700 Subject: [PATCH 15/18] Recordable attributes and resource duplicate code put into function --- .../elasticsearch/es_log_recordable.h | 96 ++++++++----------- 1 file changed, 40 insertions(+), 56 deletions(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h index 570a8b8b2b..0b5d45f9af 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h @@ -39,6 +39,44 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable // Define a JSON object that will be populated with the log data nlohmann::json json_; + /** + * A helper method that writes a key/value pair under a specified name, the two names used here + * being "attributes" and "resources" + */ + void WriteKeyValue(nostd::string_view key, + const opentelemetry::common::AttributeValue &value, + std::string name) + { + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + json_[name][key.data()] = opentelemetry::nostd::get(value) ? true : false; + return; + case common::AttributeType::TYPE_INT: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_INT64: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_UINT: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_UINT64: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_DOUBLE: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_STRING: + case common::AttributeType::TYPE_CSTRING: + json_[name][key.data()] = + opentelemetry::nostd::get(value).data(); + return; + default: + return; + } + } + public: /** * Set the severity for this log. @@ -70,34 +108,7 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable void SetResource(nostd::string_view key, const opentelemetry::common::AttributeValue &value) noexcept override { - switch (value.index()) - { - case common::AttributeType::TYPE_BOOL: - json_["resource"][key.data()] = opentelemetry::nostd::get(value) ? true : false; - break; - case common::AttributeType::TYPE_INT: - json_["resource"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_INT64: - json_["resource"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_UINT: - json_["resource"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_UINT64: - json_["resource"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_DOUBLE: - json_["resource"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_STRING: - case common::AttributeType::TYPE_CSTRING: - json_["resource"][key.data()] = - opentelemetry::nostd::get(value).data(); - break; - default: - break; - } + WriteKeyValue(key, value, "resource"); } /** @@ -108,34 +119,7 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable void SetAttribute(nostd::string_view key, const opentelemetry::common::AttributeValue &value) noexcept override { - switch (value.index()) - { - case common::AttributeType::TYPE_BOOL: - json_["attributes"][key.data()] = opentelemetry::nostd::get(value) ? true : false; - break; - case common::AttributeType::TYPE_INT: - json_["attributes"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_INT64: - json_["attributes"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_UINT: - json_["attributes"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_UINT64: - json_["attributes"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_DOUBLE: - json_["attributes"][key.data()] = opentelemetry::nostd::get(value); - break; - case common::AttributeType::TYPE_STRING: - case common::AttributeType::TYPE_CSTRING: - json_["attributes"][key.data()] = - opentelemetry::nostd::get(value).data(); - break; - default: - break; - } + WriteKeyValue(key, value, "attributes"); } /** From 0f57f835846adb6f4919c9b82d095cb4000641ac Mon Sep 17 00:00:00 2001 From: Seufert Date: Fri, 18 Dec 2020 17:30:07 -0700 Subject: [PATCH 16/18] Added mutex as private member --- .../elasticsearch/src/es_log_exporter.cc | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index b2c6366fd4..89eb2adc76 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -43,11 +43,16 @@ class ResponseHandler : public http_client::EventHandler */ void OnResponse(http_client::Response &response) noexcept override { - // Store the body of the request - body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + // Lock the private members so they can't be read while being modified + { + std::unique_lock lk(mutex_); + + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); - // Set the response_received_ flag to true and notify any threads waiting on this result - response_received_ = true; + // Set the response_received_ flag to true and notify any threads waiting on this result + response_received_ = true; + } cv_.notify_all(); } @@ -57,7 +62,6 @@ class ResponseHandler : public http_client::EventHandler */ bool waitForResponse() { - std::mutex mutex_; std::unique_lock lk(mutex_); cv_.wait(lk); return response_received_; @@ -66,7 +70,12 @@ class ResponseHandler : public http_client::EventHandler /** * Returns the body of the response */ - std::string GetResponseBody() { return body_; } + std::string GetResponseBody() + { + // Lock so that body_ can't be written to while returning it + std::unique_lock lk(mutex_); + return body_; + } // Callback method when an http event occurs void OnEvent(http_client::SessionState state, @@ -99,8 +108,9 @@ class ResponseHandler : public http_client::EventHandler } private: - // Define a condition variable used for blocking + // Define a condition variable and mutex std::condition_variable cv_; + std::mutex mutex_; // Whether the response from Elasticsearch has been received bool response_received_ = false; From f05aeadd48d6fd3e50637bfa3cc8028f522fcacb Mon Sep 17 00:00:00 2001 From: Seufert Date: Mon, 21 Dec 2020 12:31:49 -0500 Subject: [PATCH 17/18] formatting --- exporters/elasticsearch/src/es_log_exporter.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc index 89eb2adc76..4e21acd6ad 100644 --- a/exporters/elasticsearch/src/es_log_exporter.cc +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -70,11 +70,11 @@ class ResponseHandler : public http_client::EventHandler /** * Returns the body of the response */ - std::string GetResponseBody() + std::string GetResponseBody() { // Lock so that body_ can't be written to while returning it std::unique_lock lk(mutex_); - return body_; + return body_; } // Callback method when an http event occurs From 05260509c08559bd03397dcd8c26f680926fd1ca Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Tue, 22 Dec 2020 04:38:19 -0500 Subject: [PATCH 18/18] Use updated attribute utils to fix failing bazel CI --- .../opentelemetry/exporters/elasticsearch/es_log_recordable.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h index 0b5d45f9af..1745db2618 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h @@ -68,7 +68,9 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable json_[name][key.data()] = opentelemetry::nostd::get(value); return; case common::AttributeType::TYPE_STRING: +#ifdef HAVE_CSTRING_TYPE case common::AttributeType::TYPE_CSTRING: +#endif json_[name][key.data()] = opentelemetry::nostd::get(value).data(); return;