-
Notifications
You must be signed in to change notification settings - Fork 0
WIP TraceZ Processor #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1147049
f721a1c
51b16e9
b671912
d69a2b3
f32cf66
3f09ffb
1a0875f
af4a6d8
9e556f8
eb81ff6
2a8696e
2d061af
38cc77a
aa1b39c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,3 @@ | ||
| add_library(opentelemetry_exporter_otprotocol recordable.cc) | ||
| target_link_libraries(opentelemetry_exporter_otprotocol | ||
| $<TARGET_OBJECTS:opentelemetry_proto>) | ||
|
|
||
| add_executable(recordable_test recordable_test.cc) | ||
| target_link_libraries(recordable_test | ||
| ${GTEST_BOTH_LIBRARIES} | ||
| ${CMAKE_THREAD_LIBS_INIT} | ||
| opentelemetry_exporter_otprotocol | ||
| protobuf::libprotobuf) | ||
| gtest_add_tests(TARGET recordable_test TEST_PREFIX exporter. TEST_LIST recordable_test) |
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package(default_visibility = ["//visibility:public"]) | ||
|
|
||
| cc_library( | ||
| name = "headers", | ||
| hdrs = glob(["include/**/*.h"]), | ||
| strip_include_prefix = "include", | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| add_subdirectory(src) | ||
|
|
||
| if(BUILD_TESTING) | ||
| add_subdirectory(test) | ||
| endif() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| #pragma once | ||
|
|
||
| #include <chrono> | ||
| #include <memory> | ||
| #include <unordered_set> | ||
| #include <vector> | ||
| #include <utility> | ||
| #include <thread> | ||
| #include <iostream> | ||
|
|
||
| #include "opentelemetry/sdk/trace/recordable.h" | ||
| #include "opentelemetry/sdk/trace/span_data.h" | ||
| #include "opentelemetry/sdk/trace/exporter.h" | ||
| #include "opentelemetry/sdk/trace/processor.h" | ||
|
|
||
| OPENTELEMETRY_BEGIN_NAMESPACE | ||
| namespace ext | ||
| { | ||
| namespace zpages | ||
| { | ||
| /** | ||
| * The span processor passes finished recordables to be used by the | ||
| * Data Aggregator to store spans before aggregating for TraceZ. | ||
| * | ||
| */ | ||
| class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor { | ||
| public: | ||
|
|
||
| struct CollectedSpans { | ||
| std::unordered_set<opentelemetry::sdk::trace::Recordable*> running; | ||
| std::vector<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> completed; | ||
| }; | ||
|
|
||
| /** | ||
| * Initialize a span processor. | ||
| */ | ||
| explicit TracezSpanProcessor() noexcept {} | ||
|
|
||
| /** | ||
| * Create a span recordable | ||
| * @return a newly initialized recordable | ||
| */ | ||
| std::unique_ptr<opentelemetry::sdk::trace::Recordable> MakeRecordable() noexcept override | ||
| { | ||
| return std::unique_ptr<opentelemetry::sdk::trace::Recordable>(new opentelemetry::sdk::trace::SpanData); | ||
| } | ||
|
|
||
| /** | ||
| * OnStart is called when a span starts; that span is added to running_spans. | ||
| * @param span a recordable for a span that was just started | ||
| */ | ||
| void OnStart(opentelemetry::sdk::trace::Recordable &span) noexcept override; | ||
|
|
||
| /** | ||
| * OnEnd is called when a span ends; that span is moved from running_spans to | ||
| * completed_spans | ||
| * @param span a recordable for a span that was ended | ||
| */ | ||
| void OnEnd(std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept override; | ||
|
|
||
| /** | ||
| * Returns a snapshot of all spans stored. This snapshot has a copy of the | ||
| * stored running_spans and gives ownership of completed spans. Stored | ||
| * completed_spans are cleared from the processor. Currently, copy-on-write | ||
| * is utilized where possible to minimize contention, but locks may be added | ||
| * in the future. | ||
| * @return snapshot of all currently running spans and newly completed spans | ||
| * at the time that the function is called | ||
| */ | ||
| CollectedSpans GetSpanSnapshot() noexcept; | ||
|
|
||
| /** | ||
| * Send all ended spans that have not yet been sent. | ||
| * @param timeout an optional timeout, the default timeout of 0 means that no | ||
| * timeout is applied. | ||
| */ | ||
| void ForceFlush( | ||
| std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override | ||
| { | ||
| std::this_thread::sleep_for(timeout); | ||
| if (shutdown_signal_received_) return; | ||
| // TODO: figure out how to send spans to data aggregator when this is called? | ||
| // should running spans be forced to end, and their status reflect this | ||
| // accordingly? | ||
| } | ||
| /** | ||
| * Shut down the processor and do any cleanup required. Ended spans are | ||
| * send before shutdown. After the call to Shutdown, subsequent calls to | ||
| * OnStart, OnEnd, ForceFlush or Shutdown will return immediately without | ||
| * doing anything. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a flag to signal if Shutdown is called so it can exit early in those functions? Would it be better to throw an exception if any of those are called after Shutdown is called?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I can add that, thanks!! I also made the functions return immediately instead, based on the doc comments |
||
| * @param timeout an optional timeout, the default timeout of 0 means that no | ||
| * timeout is applied. | ||
| */ | ||
| void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override | ||
| { | ||
| ForceFlush(timeout); | ||
| shutdown_signal_received_ = true; // TODO: what cleanup do we need? | ||
| } | ||
|
|
||
| private: | ||
| CollectedSpans spans_; | ||
| bool shutdown_signal_received_ = false; | ||
| }; | ||
| } // namespace zpages | ||
| } // namespace ext | ||
| OPENTELEMETRY_END_NAMESPACE | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| add_subdirectory(zpages) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| # Copyright 2020, OpenTelemetry Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| package(default_visibility = ["//visibility:public"]) | ||
|
|
||
| cc_library( | ||
| name = "zpages", | ||
| srcs = glob(["**/*.cc"]), | ||
| hdrs = glob(["**/*.h"]), | ||
| include_prefix = "ext/zpages", | ||
| deps = [ | ||
| "//api", | ||
| "//sdk:headers", | ||
| "//ext:headers", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| add_library(opentelemetry_trace tracez_processer.cc) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| #include "opentelemetry/ext/zpages/tracez_processor.h" | ||
|
|
||
| OPENTELEMETRY_BEGIN_NAMESPACE | ||
| namespace ext { | ||
| namespace zpages { | ||
|
|
||
| void TracezSpanProcessor::OnStart(opentelemetry::sdk::trace::Recordable &span) noexcept { | ||
| if (shutdown_signal_received_) return; | ||
| spans_.running.insert(&span); | ||
| } | ||
|
|
||
| void TracezSpanProcessor::OnEnd(std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept { | ||
| if (shutdown_signal_received_ || span == nullptr) return; | ||
| auto completed_span = spans_.running.find(span.get()); | ||
| if (completed_span != spans_.running.end()) { | ||
| spans_.running.erase(completed_span); | ||
| spans_.completed.push_back(std::move(span)); | ||
| } | ||
| } | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One option is to collapse these two functions into one, and return some struct that bundles copies of the running and completed spans together. Something like that, that way it'll just give the Aggregator a consistent view while it's aggregating the spans.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, that sounds good! I should replace the running and completed getters in favor of returning them all, since the main goal is giving this information in a consistent manner and the aggregator should change its methods accordingly? |
||
|
|
||
| TracezSpanProcessor::CollectedSpans TracezSpanProcessor::GetSpanSnapshot() noexcept { | ||
| if (shutdown_signal_received_) return TracezSpanProcessor::CollectedSpans(); | ||
| CollectedSpans snapshot; | ||
| snapshot.running = spans_.running; | ||
| snapshot.completed = std::move(spans_.completed); | ||
| spans_.completed.clear(); | ||
| return snapshot; | ||
| } | ||
|
|
||
|
|
||
| } // namespace zpages | ||
| } // namespace ext | ||
| OPENTELEMETRY_END_NAMESPACE | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| add_subdirectory(zpages) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| cc_test( | ||
| name = "tracez_processor_tests", | ||
| srcs = [ | ||
| "tracez_processor_test.cc", | ||
| ], | ||
| deps = [ | ||
| "//sdk/src/trace", | ||
| "//ext/src/zpages", | ||
| "@com_google_googletest//:gtest_main", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| foreach(testname tracez_processer_test) | ||
| add_executable(${testname} "${testname}.cc") | ||
| target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} | ||
| ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace) | ||
| gtest_add_tests( | ||
| TARGET ${testname} | ||
| TEST_PREFIX trace. | ||
| TEST_LIST ${testname}) | ||
| endforeach() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth adding a TODO: Verify/handle a case when GetSnapshot is called when spans start/end to run.
This should be covered by copy on write. While we plan to add it, it is good to "declare" that it is our plan.