diff --git a/CMakeLists.txt b/CMakeLists.txt index 026c1de47e..afd4dafc66 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,9 @@ option(WITH_OTLP "Whether to include the OpenTelemetry Protocol in the SDK" OFF) option(WITH_PROMETHEUS "Whether to include the Prometheus Client in the SDK" OFF) +option(WITH_ELASTICSEARCH + "Whether to include the Elasticsearch Client in the SDK" OFF) + option(BUILD_TESTING "Whether to enable tests" ON) option(WITH_EXAMPLES "Whether to build examples" ON) diff --git a/exporters/CMakeLists.txt b/exporters/CMakeLists.txt index 83bb91c6c4..68ea177c38 100644 --- a/exporters/CMakeLists.txt +++ b/exporters/CMakeLists.txt @@ -8,3 +8,7 @@ add_subdirectory(memory) if(WITH_PROMETHEUS) add_subdirectory(prometheus) endif() + +if(WITH_ELASTICSEARCH) + add_subdirectory(elasticsearch) +endif() diff --git a/exporters/elasticsearch/BUILD b/exporters/elasticsearch/BUILD new file mode 100644 index 0000000000..bebe4f960f --- /dev/null +++ b/exporters/elasticsearch/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "es_log_exporter", + srcs = [ + "src/es_log_exporter.cc", + ], + hdrs = [ + "include/opentelemetry/exporters/elasticsearch/es_log_exporter.h", + "include/opentelemetry/exporters/elasticsearch/es_log_recordable.h", + ], + copts = [ + "-DCURL_STATICLIB", + ], + linkopts = select({ + "//bazel:windows": [ + "-DEFAULTLIB:advapi32.lib", + "-DEFAULTLIB:crypt32.lib", + "-DEFAULTLIB:Normaliz.lib", + ], + "//conditions:default": [], + }), + strip_include_prefix = "include", + deps = [ + "//ext:headers", + "//sdk/src/logs", + "@curl", + "@github_nlohmann_json//:json", + ], +) + +cc_test( + name = "es_log_exporter_test", + srcs = ["test/es_log_exporter_test.cc"], + deps = [ + ":es_log_exporter", + "@com_google_googletest//:gtest_main", + "@curl", + ], +) diff --git a/exporters/elasticsearch/CMakeLists.txt b/exporters/elasticsearch/CMakeLists.txt new file mode 100644 index 0000000000..3fc96fea57 --- /dev/null +++ b/exporters/elasticsearch/CMakeLists.txt @@ -0,0 +1,17 @@ +include_directories(include) +include_directories(${CMAKE_SOURCE_DIR}/ext/include) + +add_library(opentelemetry_exporter_elasticsearch_logs src/es_log_exporter.cc) + +if(BUILD_TESTING) + add_executable(es_log_exporter_test test/es_log_exporter_test.cc) + + target_link_libraries( + es_log_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} + opentelemetry_exporter_elasticsearch_logs) + + gtest_add_tests( + TARGET es_log_exporter_test + TEST_PREFIX exporter. + TEST_LIST es_log_exporter_test) +endif() # BUILD_TESTING diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h new file mode 100644 index 0000000000..48269c81c1 --- /dev/null +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h @@ -0,0 +1,124 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "nlohmann/json.hpp" +#include "opentelemetry/ext/http/client/curl/http_client_curl.h" +#include "opentelemetry/nostd/type_traits.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/log_record.h" + +#include +#include + +namespace nostd = opentelemetry::nostd; +namespace sdklogs = opentelemetry::sdk::logs; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace logs +{ +/** + * Struct to hold Elasticsearch exporter configuration options. + */ +struct ElasticsearchExporterOptions +{ + // Configuration options to establish Elasticsearch connection + std::string host_; + int port_; + std::string index_; + + // Maximum time to wait for response after sending request to Elasticsearch + int response_timeout_; + + // Whether to print the status of the exporter in the console + bool console_debug_; + + /** + * Constructor for the ElasticsearchExporterOptions. By default, the endpoint is + * localhost:9200/logs with a timeout of 30 seconds and disabled console debugging + * @param host The host of the Elasticsearch instance + * @param port The port of the Elasticsearch instance + * @param index The index/shard that the logs will be written to + * @param response_timeout The maximum time in seconds the exporter should wait for a response + * from elasticsearch + * @param console_debug If true, print the status of the exporter methods in the console + */ + ElasticsearchExporterOptions(std::string host = "localhost", + int port = 9200, + std::string index = "logs", + int response_timeout = 30, + bool console_debug = false) + : host_{host}, + port_{port}, + index_{index}, + response_timeout_{response_timeout}, + console_debug_{console_debug} + {} +}; + +/** + * The ElasticsearchLogExporter exports logs to Elasticsearch in JSON format + */ +class ElasticsearchLogExporter final : public sdklogs::LogExporter +{ +public: + /** + * Create an ElasticsearchLogExporter with default exporter options. + */ + ElasticsearchLogExporter(); + + /** + * Create an ElasticsearchLogExporter with user specified options. + * @param options An object containing the user's configuration options. + */ + ElasticsearchLogExporter(const ElasticsearchExporterOptions &options); + + /** + * Creates a recordable that stores the data in a JSON object + */ + std::unique_ptr MakeRecordable() noexcept override; + + /** + * Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a + * timeout specified from the options passed from the constructor. + * @param records A list of log records to send to Elasticsearch. + */ + sdklogs::ExportResult Export( + const nostd::span> &records) noexcept override; + + /** + * Shutdown this exporter. + * @param timeout The maximum time to wait for the shutdown method to return + */ + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + +private: + // Stores if this exporter had its Shutdown() method called + bool is_shutdown_ = false; + + // Configuration options for the exporter + ElasticsearchExporterOptions options_; + + // Object that stores the HTTP sessions that have been created + std::unique_ptr session_manager_; +}; +} // namespace logs +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h new file mode 100644 index 0000000000..1745db2618 --- /dev/null +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h @@ -0,0 +1,176 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "nlohmann/json.hpp" +#include "opentelemetry/sdk/common/attribute_utils.h" // same as traces/attribute_utils +#include "opentelemetry/sdk/logs/recordable.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace logs +{ + +/** + * An Elasticsearch Recordable implemenation that stores the 10 fields of the Log Data Model inside + * a JSON object, + */ +class ElasticSearchRecordable final : public sdk::logs::Recordable +{ +private: + // Define a JSON object that will be populated with the log data + nlohmann::json json_; + + /** + * A helper method that writes a key/value pair under a specified name, the two names used here + * being "attributes" and "resources" + */ + void WriteKeyValue(nostd::string_view key, + const opentelemetry::common::AttributeValue &value, + std::string name) + { + switch (value.index()) + { + case common::AttributeType::TYPE_BOOL: + json_[name][key.data()] = opentelemetry::nostd::get(value) ? true : false; + return; + case common::AttributeType::TYPE_INT: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_INT64: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_UINT: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_UINT64: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_DOUBLE: + json_[name][key.data()] = opentelemetry::nostd::get(value); + return; + case common::AttributeType::TYPE_STRING: +#ifdef HAVE_CSTRING_TYPE + case common::AttributeType::TYPE_CSTRING: +#endif + json_[name][key.data()] = + opentelemetry::nostd::get(value).data(); + return; + default: + return; + } + } + +public: + /** + * Set the severity for this log. + * @param severity the severity of the event + */ + void SetSeverity(opentelemetry::logs::Severity severity) noexcept override + { + // Convert the severity enum to a string + json_["severity"] = opentelemetry::logs::SeverityNumToText[static_cast(severity)]; + } + + /** + * Set name for this log + * @param name the name to set + */ + void SetName(nostd::string_view name) noexcept override { json_["name"] = name.data(); } + + /** + * Set body field for this log. + * @param message the body to set + */ + void SetBody(nostd::string_view message) noexcept override { json_["body"] = message.data(); } + + /** + * Set a resource for this log. + * @param name the name of the resource + * @param value the resource value + */ + void SetResource(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override + { + WriteKeyValue(key, value, "resource"); + } + + /** + * Set an attribute of a log. + * @param key the key of the attribute + * @param value the attribute value + */ + void SetAttribute(nostd::string_view key, + const opentelemetry::common::AttributeValue &value) noexcept override + { + WriteKeyValue(key, value, "attributes"); + } + + /** + * Set trace id for this log. + * @param trace_id the trace id to set + */ + void SetTraceId(opentelemetry::trace::TraceId trace_id) noexcept override + { + char trace_buf[32]; + trace_id.ToLowerBase16(trace_buf); + json_["traceid"] = std::string(trace_buf, sizeof(trace_buf)); + } + + /** + * Set span id for this log. + * @param span_id the span id to set + */ + virtual void SetSpanId(opentelemetry::trace::SpanId span_id) noexcept override + { + char span_buf[16]; + span_id.ToLowerBase16(span_buf); + json_["spanid"] = std::string(span_buf, sizeof(span_buf)); + } + + /** + * Inject a trace_flags for this log. + * @param trace_flags the span id to set + */ + void SetTraceFlags(opentelemetry::trace::TraceFlags trace_flags) noexcept override + { + char flag_buf[2]; + trace_flags.ToLowerBase16(flag_buf); + json_["traceflags"] = std::string(flag_buf, sizeof(flag_buf)); + } + + /** + * Set the timestamp for this log. + * @param timestamp the timestamp of the event + */ + void SetTimestamp(core::SystemTimestamp timestamp) noexcept override + { + json_["timestamp"] = timestamp.time_since_epoch().count(); + } + + /** + * Returns a JSON object contain the log information + */ + nlohmann::json GetJSON() noexcept { return json_; }; +}; +} // namespace logs +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/src/es_log_exporter.cc b/exporters/elasticsearch/src/es_log_exporter.cc new file mode 100644 index 0000000000..4e21acd6ad --- /dev/null +++ b/exporters/elasticsearch/src/es_log_exporter.cc @@ -0,0 +1,230 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" +#include "opentelemetry/exporters/elasticsearch/es_log_recordable.h" + +namespace nostd = opentelemetry::nostd; +namespace sdklogs = opentelemetry::sdk::logs; +namespace http_client = opentelemetry::ext::http::client; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter +{ +namespace logs +{ +/** + * This class handles the response message from the Elasticsearch request + */ +class ResponseHandler : public http_client::EventHandler +{ +public: + /** + * Creates a response handler, that by default doesn't display to console + */ + ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} + + /** + * Automatically called when the response is received, store the body into a string and notify any + * threads blocked on this result + */ + void OnResponse(http_client::Response &response) noexcept override + { + // Lock the private members so they can't be read while being modified + { + std::unique_lock lk(mutex_); + + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + + // Set the response_received_ flag to true and notify any threads waiting on this result + response_received_ = true; + } + cv_.notify_all(); + } + + /** + * A method the user calls to block their thread until the response is received. The longest + * duration is the timeout of the request, set by SetTimeoutMs() + */ + bool waitForResponse() + { + std::unique_lock lk(mutex_); + cv_.wait(lk); + return response_received_; + } + + /** + * Returns the body of the response + */ + std::string GetResponseBody() + { + // Lock so that body_ can't be written to while returning it + std::unique_lock lk(mutex_); + return body_; + } + + // Callback method when an http event occurs + void OnEvent(http_client::SessionState state, + opentelemetry::nostd::string_view reason) noexcept override + { + // If any failure event occurs, release the condition variable to unblock main thread + switch (state) + { + case http_client::SessionState::ConnectFailed: + if (console_debug_) + std::cout << "Connection to elasticsearch failed\n"; + cv_.notify_all(); + break; + case http_client::SessionState::SendFailed: + if (console_debug_) + std::cout << "Request failed to be sent to elasticsearch\n"; + cv_.notify_all(); + break; + case http_client::SessionState::TimedOut: + if (console_debug_) + std::cout << "Request to elasticsearch timed out\n"; + cv_.notify_all(); + break; + case http_client::SessionState::NetworkError: + if (console_debug_) + std::cout << "Network error to elasticsearch\n"; + cv_.notify_all(); + break; + } + } + +private: + // Define a condition variable and mutex + std::condition_variable cv_; + std::mutex mutex_; + + // Whether the response from Elasticsearch has been received + bool response_received_ = false; + + // A string to store the response body + std::string body_ = ""; + + // Whether to print the results from the callback + bool console_debug_ = false; +}; + +ElasticsearchLogExporter::ElasticsearchLogExporter() + : options_{ElasticsearchExporterOptions()}, + session_manager_{new ext::http::client::curl::SessionManager()} +{} + +ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options) + : options_{options}, session_manager_{new ext::http::client::curl::SessionManager()} +{} + +std::unique_ptr ElasticsearchLogExporter::MakeRecordable() noexcept +{ + return std::unique_ptr(new ElasticSearchRecordable); +} + +sdklogs::ExportResult ElasticsearchLogExporter::Export( + const nostd::span> &records) noexcept +{ + // Return failure if this exporter has been shutdown + if (is_shutdown_) + { + if (options_.console_debug_) + { + std::cout << "Export failed, exporter is shutdown" << std::endl; + } + + return sdklogs::ExportResult::kFailure; + } + + // Create a connection to the ElasticSearch instance + auto session = session_manager_->CreateSession(options_.host_, options_.port_); + auto request = session->CreateRequest(); + + // Populate the request with headers and methods + request->SetUri(options_.index_ + "/_bulk?pretty"); + request->SetMethod(http_client::Method::Post); + request->AddHeader("Content-Type", "application/json"); + request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_)); + + // Create the request body + std::string body = ""; + for (auto &record : records) + { + // Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified + // in URI + body += "{\"index\" : {}}\n"; + + // Add the context of the Recordable + auto json_record = std::unique_ptr( + static_cast(record.release())); + body += json_record->GetJSON().dump() + "\n"; + } + std::vector body_vec(body.begin(), body.end()); + request->SetBody(body_vec); + + // Send the request + std::unique_ptr handler(new ResponseHandler(options_.console_debug_)); + session->SendRequest(*handler); + + // Wait for the response to be received + if (options_.console_debug_) + { + std::cout << "waiting for response from Elasticsearch (timeout = " << options_.response_timeout_ + << " seconds)" << std::endl; + } + bool write_successful = handler->waitForResponse(); + + // End the session + session->FinishSession(); + + // If an error occured with the HTTP request + if (!write_successful) + { + // TODO: retry logic + return sdklogs::ExportResult::kFailure; + } + + // Parse the response output to determine if Elasticsearch consumed it correctly + std::string responseBody = handler->GetResponseBody(); + if (responseBody.find("\"failed\" : 0") == std::string::npos) + { + if (options_.console_debug_) + { + std::cout << "Logs were not written to Elasticsearch correctly, response body:" << std::endl; + std::cout << responseBody << std::endl; + } + + // TODO: Retry logic + return sdklogs::ExportResult::kFailure; + } + + return sdklogs::ExportResult::kSuccess; +} + +bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept +{ + is_shutdown_ = true; + + // Shutdown the session manager + session_manager_->CancelAllSessions(); + session_manager_->FinishAllSessions(); + + return true; +} +} // namespace logs +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/elasticsearch/test/es_log_exporter_test.cc b/exporters/elasticsearch/test/es_log_exporter_test.cc new file mode 100644 index 0000000000..7e545f5bac --- /dev/null +++ b/exporters/elasticsearch/test/es_log_exporter_test.cc @@ -0,0 +1,73 @@ +#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h" +#include "opentelemetry/ext/http/server/http_server.h" +#include "opentelemetry/logs/provider.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/log_record.h" +#include "opentelemetry/sdk/logs/logger_provider.h" +#include "opentelemetry/sdk/logs/simple_log_processor.h" + +#include +#include + +namespace sdklogs = opentelemetry::sdk::logs; +namespace logs_api = opentelemetry::logs; +namespace nostd = opentelemetry::nostd; +namespace logs_exporter = opentelemetry::exporter::logs; + +// Attempt to write a log to an invalid host/port, test that the Export() returns failure +TEST(ElasticsearchLogsExporterTests, InvalidEndpoint) +{ + // Create invalid connection options for the elasticsearch exporter + logs_exporter::ElasticsearchExporterOptions options("localhost", -1); + + // Create an elasticsearch exporter + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter(options)); + + // Create a log record and send to the exporter + auto record = exporter->MakeRecordable(); + auto result = exporter->Export(nostd::span>(&record, 1)); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +} + +// Test that when the exporter is shutdown, any call to Export should return failure +TEST(ElasticsearchLogsExporterTests, Shutdown) +{ + // Create an elasticsearch exporter and immediately shut it down + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); + bool shutdownResult = exporter->Shutdown(); + ASSERT_TRUE(shutdownResult); + + // Write a log to the shutdown exporter + auto record = exporter->MakeRecordable(); + auto result = exporter->Export(nostd::span>(&record, 1)); + + // Ensure the return value is failure + ASSERT_EQ(result, sdklogs::ExportResult::kFailure); +} + +// Test the elasticsearch recordable object +TEST(ElasticsearchLogsExporterTests, RecordableCreation) +{ + // Create an elasticsearch exporter + auto exporter = + std::unique_ptr(new logs_exporter::ElasticsearchLogExporter); + + // Create a recordable + auto record = exporter->MakeRecordable(); + record->SetName("Timeout Log"); + record->SetSeverity(logs_api::Severity::kFatal); + record->SetTimestamp(std::chrono::system_clock::now()); + record->SetBody("Body of the log message"); + + // Attributes and resource support different types + record->SetAttribute("key0", false); + record->SetAttribute("key1", "1"); + record->SetResource("key2", 2); + record->SetResource("key3", 3.142); + + exporter->Export(nostd::span>(&record, 1)); +} \ No newline at end of file