Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0d11207
decouple `hyper` from `trace_processor`
duncanista Sep 10, 2024
be1a1ae
add `handle_traces`
duncanista Sep 10, 2024
8525801
fix tests
duncanista Sep 10, 2024
002e9eb
removed unused import
duncanista Sep 10, 2024
2e37b81
Merge branch 'jordan.gonzalez/decouple-trace-processor' into jordan.g…
duncanista Sep 10, 2024
c7de652
move `invocation_context` to `invocation::context` module
duncanista Sep 13, 2024
7e1be93
add `new` and `get_sender_copy` to `trace_agent`
duncanista Sep 13, 2024
11e6ba1
add `get_canonical_resource_name` to `tags_provider`
duncanista Sep 13, 2024
68d12ac
add `get_function_name` to `lambda::tags`
duncanista Sep 13, 2024
5070e19
add `MS_TO_NS` constant
duncanista Sep 13, 2024
2d4d60c
add `invocation::processor`
duncanista Sep 13, 2024
7089693
update use of `invocation::context`
duncanista Sep 13, 2024
929741c
make `lifecycle::listener` to use `invocation::processor`
duncanista Sep 13, 2024
dcba48e
use `invocation::processor` in `main.rs`
duncanista Sep 13, 2024
807802b
Merge branch 'main' of ssh://github.com/DataDog/datadog-lambda-extens…
duncanista Sep 13, 2024
a4d1eb5
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Sep 13, 2024
57a155c
move `MS_TO_NS` to `invocation::processor`
duncanista Sep 13, 2024
8ffd181
remove unnecessary constant
duncanista Sep 13, 2024
c793148
add `Box::new` back to `trace_agent`
duncanista Sep 13, 2024
f815b22
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Sep 20, 2024
83762a0
add some comments
duncanista Sep 20, 2024
4f23d59
add unit tests for `context.rs`
duncanista Sep 20, 2024
7918a32
use `on_invocation_start`
duncanista Sep 20, 2024
fca14ec
rename `lambda_library_detected` to `tracer_detected`
duncanista Sep 20, 2024
9413e1c
fmt
duncanista Sep 23, 2024
f03ddd0
remove `current_request_id`
duncanista Sep 23, 2024
cfbbbde
add comment
duncanista Sep 23, 2024
6a8effa
fmt
duncanista Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 97 additions & 86 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use bottlecap::{
event_bus::bus::EventBus,
events::Event,
lifecycle::{
flush_control::FlushControl,
invocation_context::{InvocationContext, InvocationContextBuffer},
flush_control::FlushControl, invocation::processor::Processor as InvocationProcessor,
listener::Listener as LifecycleListener,
},
logger,
Expand Down Expand Up @@ -280,6 +279,11 @@ async fn extension_loop_active(
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
buffer: Arc::new(TokioMutex::new(Vec::new())),
});

let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
)));
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(
obfuscation_config::ObfuscationConfig::new()
Expand All @@ -298,23 +302,28 @@ async fn extension_loop_active(
let trace_flusher_clone = trace_flusher.clone();
let stats_flusher_clone = stats_flusher.clone();

let trace_agent = Box::new(trace_agent::TraceAgent {
Comment thread
duncanista marked this conversation as resolved.
config: Arc::clone(config),
trace_processor,
trace_flusher: trace_flusher_clone,
stats_processor,
stats_flusher: stats_flusher_clone,
tags_provider: Arc::clone(&tags_provider),
});
let trace_agent = Box::new(
trace_agent::TraceAgent::new(
Arc::clone(config),
trace_processor.clone(),
trace_flusher_clone,
stats_processor,
stats_flusher_clone,
Arc::clone(&tags_provider),
)
.await,
);
let trace_agent_tx = trace_agent.get_sender_copy();

tokio::spawn(async move {
let res = trace_agent.start_trace_agent().await;
let res = trace_agent.start().await;
if let Err(e) = res {
error!("Error starting trace agent: {e:?}");
}
});

let lifecycle_listener = LifecycleListener {
tags_provider: Arc::clone(&tags_provider),
invocation_processor: Arc::clone(&invocation_processor),
};
// TODO(astuyve): deprioritize this task after the first request
tokio::spawn(async move {
Expand All @@ -332,7 +341,6 @@ async fn extension_loop_active(
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;

let flush_control = FlushControl::new(config.serverless_flush_strategy);
let mut invocation_context_buffer = InvocationContextBuffer::default();
let mut shutdown = false;

let mut flush_interval = flush_control.get_flush_interval();
Expand Down Expand Up @@ -375,92 +383,95 @@ async fn extension_loop_active(
Event::Metric(event) => {
debug!("Metric event: {:?}", event);
}
Event::Telemetry(event) => match event.record {
TelemetryRecord::PlatformStart { request_id, .. } => {
invocation_context_buffer.insert(InvocationContext {
request_id,
runtime_duration_ms: 0.0,
});
}
TelemetryRecord::PlatformInitReport {
initialization_type,
phase,
metrics,
} => {
debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics);
lambda_enhanced_metrics
.set_init_duration_metric(metrics.duration_ms);
}
TelemetryRecord::PlatformRuntimeDone {
request_id,
status,
metrics,
..
} => {
if let Some(metrics) = metrics {
invocation_context_buffer
.add_runtime_duration(&request_id, metrics.duration_ms);
Event::Telemetry(event) =>
match event.record {
TelemetryRecord::PlatformStart { request_id, .. } => {
let mut p = invocation_processor.lock().await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a lock here? this prevents us from processing any messages until it's acquired, what's it fully protecting here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking what improvements I can make here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our conversation, will move the lock out of this in another PR when the time comes, will add a ticket for this

p.on_platform_start(request_id, event.time);
drop(p);
}
TelemetryRecord::PlatformInitReport {
initialization_type,
phase,
metrics,
} => {
debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics);
lambda_enhanced_metrics
.set_runtime_duration_metric(metrics.duration_ms);
.set_init_duration_metric(metrics.duration_ms);
}
TelemetryRecord::PlatformRuntimeDone {
request_id,
status,
metrics,
..
} => {
let mut p = invocation_processor.lock().await;
if let Some(metrics) = metrics {
p.on_platform_runtime_done(
&request_id,
metrics.duration_ms,
config.clone(),
tags_provider.clone(),
trace_processor.clone(),
trace_agent_tx.clone()
).await;
lambda_enhanced_metrics
.set_runtime_duration_metric(metrics.duration_ms);
}
Comment on lines +418 to +420
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might want to delegate some of this metrics to the lifecycle processor eventually, maybe?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be good to push this out somewhere which can do the work asynchronously, you're blocking the loop right now

drop(p);

if status != Status::Success {
lambda_enhanced_metrics.increment_errors_metric();
if status == Status::Timeout {
lambda_enhanced_metrics.increment_timeout_metric();
if status != Status::Success {
lambda_enhanced_metrics.increment_errors_metric();
if status == Status::Timeout {
lambda_enhanced_metrics.increment_timeout_metric();
}
}
}
debug!(
"Runtime done for request_id: {:?} with status: {:?}",
request_id, status
);
// TODO(astuyve) it'll be easy to
// pass the invocation deadline to
// flush tasks here, so they can
// retry if we have more time
if flush_control.should_flush_end() {
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
debug!(
"Runtime done for request_id: {:?} with status: {:?}",
request_id, status
);

// TODO(astuyve) it'll be easy to
// pass the invocation deadline to
// flush tasks here, so they can
// retry if we have more time
if flush_control.should_flush_end() {
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
}
break;
}
break;
}
TelemetryRecord::PlatformReport {
request_id,
status,
metrics,
..
} => {
debug!(
"Platform report for request_id: {:?} with status: {:?}",
request_id, status
);
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
if let Some(invocation_context) =
invocation_context_buffer.remove(&request_id)
{
if invocation_context.runtime_duration_ms > 0.0 {
let post_runtime_duration_ms = metrics.duration_ms
- invocation_context.runtime_duration_ms;
TelemetryRecord::PlatformReport {
request_id,
status,
metrics,
..
} => {
debug!(
"Platform report for request_id: {:?} with status: {:?}",
request_id, status
);
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
let mut p = invocation_processor.lock().await;
if let Some(post_runtime_duration_ms) = p.on_platform_report(&request_id, metrics.duration_ms) {
lambda_enhanced_metrics.set_post_runtime_duration_metric(
post_runtime_duration_ms,
);
} else {
debug!("Impossible to compute post runtime duration for request_id: {:?}", request_id);
}
}
drop(p);

if shutdown {
break;
if shutdown {
break;
}
}
_ => {
debug!("Unforwarded Telemetry event: {:?}", event);
}
}
_ => {
debug!("Unforwarded Telemetry event: {:?}", event);
}
},
}
}
_ = flush_interval.tick() => {
Expand Down
Loading