Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b2ba2d0
add some helper functions to `invocation::lifecycle` mod
duncanista Nov 12, 2024
5a11b09
create cold start span on processor
duncanista Nov 12, 2024
15237bc
move `generate_span_id` to father module
duncanista Nov 12, 2024
c071d45
send `platform_init_start` data to processor
duncanista Nov 12, 2024
9896c5f
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 12, 2024
a3512b0
send `PlatformInitStart` to main bus
duncanista Nov 12, 2024
1aca553
update cold start `parent_id`
duncanista Nov 12, 2024
3eda969
fix start time of cold start span
duncanista Nov 12, 2024
29fb2da
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 13, 2024
6bf8c86
enhanced metrics now have a `dynamic_value_tags` for tags which we ha…
duncanista Nov 13, 2024
d1dec53
`AwsConfig` now has a `sandbox_init_time` value
duncanista Nov 13, 2024
5d8a561
add `is_empty` to `ContextBuffer`
duncanista Nov 13, 2024
c45abdf
calculate init tags on invoke
duncanista Nov 13, 2024
2277f8a
restart init tags on set
duncanista Nov 13, 2024
a74fb30
set tags properly for proactive init
duncanista Nov 13, 2024
583a79c
fix unit test
duncanista Nov 13, 2024
816e966
remove debug line
duncanista Nov 13, 2024
128802e
make sure `cold_start` tag is only set in one place
duncanista Nov 14, 2024
73596e8
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 15, 2024
3ac5c74
add service mapping config serializer
duncanista Nov 15, 2024
86d6fb8
add `service_mapping.rs`
duncanista Nov 15, 2024
14cca31
add `ServiceNameResolver` interface
duncanista Nov 15, 2024
7c6c060
implement interface in every trigger
duncanista Nov 15, 2024
b8d38bd
send `service_mapping` lookup table to span enricher
duncanista Nov 15, 2024
af6508c
create `SpanInferrer` with `service_mapping` config
duncanista Nov 15, 2024
c32678e
fmt
duncanista Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod flush_strategy;
pub mod log_level;
pub mod processing_rule;
pub mod service_mapping;
pub mod trace_propagation_style;

use std::collections::HashMap;
use std::path::Path;
use std::time::Instant;
use std::vec;
Expand All @@ -15,6 +17,7 @@ use trace_propagation_style::{deserialize_trace_propagation_style, TracePropagat
use crate::config::flush_strategy::FlushStrategy;
use crate::config::log_level::{deserialize_log_level, LogLevel};
use crate::config::processing_rule::{deserialize_processing_rules, ProcessingRule};
use crate::config::service_mapping::deserialize_service_mapping;

/// `FailoverConfig` is a struct that represents fields that are not supported in the extension yet.
///
Expand Down Expand Up @@ -68,6 +71,8 @@ pub struct Config {
pub https_proxy: Option<String>,
pub capture_lambda_payload: bool,
pub capture_lambda_payload_max_depth: u32,
#[serde(deserialize_with = "deserialize_service_mapping")]
pub service_mapping: HashMap<String, String>,
// Trace Propagation
#[serde(deserialize_with = "deserialize_trace_propagation_style")]
pub trace_propagation_style: Vec<TracePropagationStyle>,
Expand Down Expand Up @@ -99,6 +104,7 @@ impl Default for Config {
https_proxy: None,
capture_lambda_payload: false,
capture_lambda_payload_max_depth: 10,
service_mapping: HashMap::new(),
// Trace Propagation
trace_propagation_style: vec![
TracePropagationStyle::Datadog,
Expand Down
35 changes: 35 additions & 0 deletions bottlecap/src/config/service_mapping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::collections::HashMap;

use serde::{Deserialize, Deserializer};
use tracing::debug;

#[allow(clippy::module_name_repetitions)]
pub fn deserialize_service_mapping<'de, D>(
deserializer: D,
) -> Result<HashMap<String, String>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = String::deserialize(deserializer)?;

let map = s
.split(',')
.map(|pair| {
let mut split = pair.split(':');

let service = split.next();
let to_map = split.next();

if let (Some(service), Some(to_map)) = (service, to_map) {
Ok((service.trim().to_string(), to_map.trim().to_string()))
} else {
debug!("Ignoring invalid service mapping pair: {pair}");
Err(serde::de::Error::custom(format!(
"Failed to deserialize service mapping for pair: {pair}"
)))
}
})
.collect();

map
}
5 changes: 1 addition & 4 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Processor {

Processor {
context_buffer: ContextBuffer::default(),
inferrer: SpanInferrer::default(),
inferrer: SpanInferrer::new(config.service_mapping.clone()),
span: create_empty_span(String::from("aws.lambda"), resource, service),
cold_start_span: None,
extracted_span_context: None,
Expand Down Expand Up @@ -266,10 +266,7 @@ impl Processor {
.meta
.insert("request_id".to_string(), request_id.clone());
// todo(duncanista): add missing tags
// - cold start, proactive init
// - language
// - function.request - capture lambda payload
// - function.response
// - metrics tags (for asm)

if let Some(offsets) = &context.enhanced_metric_data {
Expand Down
44 changes: 24 additions & 20 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::lifecycle::invocation::{
};
use crate::traces::{context::SpanContext, propagation::Propagator};

#[derive(Default)]
pub struct SpanInferrer {
service_mapping: HashMap<String, String>,
// Span inferred from the Lambda incoming request payload
pub inferred_span: Option<Span>,
// Nested span inferred from the Lambda incoming request payload
Expand All @@ -39,16 +41,11 @@ pub struct SpanInferrer {
trigger_tags: Option<HashMap<String, String>>,
}

impl Default for SpanInferrer {
fn default() -> Self {
Self::new()
}
}

impl SpanInferrer {
#[must_use]
pub fn new() -> Self {
pub fn new(service_mapping: HashMap<String, String>) -> Self {
Self {
service_mapping,
inferred_span: None,
wrapped_inferred_span: None,
is_async_span: false,
Expand Down Expand Up @@ -79,25 +76,25 @@ impl SpanInferrer {

if APIGatewayHttpEvent::is_match(payload_value) {
if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
} else if APIGatewayRestEvent::is_match(payload_value) {
if let Some(t) = APIGatewayRestEvent::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
} else if LambdaFunctionUrlEvent::is_match(payload_value) {
if let Some(t) = LambdaFunctionUrlEvent::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
} else if SqsRecord::is_match(payload_value) {
if let Some(t) = SqsRecord::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

// Check for SNS event wrapped in the SQS body
if let Ok(sns_entity) = serde_json::from_str::<SnsEntity>(&t.body) {
Expand All @@ -111,7 +108,7 @@ impl SpanInferrer {
sns: sns_entity,
event_subscription_arn: None,
};
wt.enrich_span(&mut wrapped_inferred_span);
wt.enrich_span(&mut wrapped_inferred_span, &self.service_mapping);
inferred_span.meta.extend(wt.get_tags());

wrapped_inferred_span.duration =
Expand All @@ -126,7 +123,8 @@ impl SpanInferrer {
..Default::default()
};

event_bridge_entity.enrich_span(&mut wrapped_inferred_span);
event_bridge_entity
.enrich_span(&mut wrapped_inferred_span, &self.service_mapping);
inferred_span.meta.extend(event_bridge_entity.get_tags());

wrapped_inferred_span.duration =
Expand All @@ -139,7 +137,7 @@ impl SpanInferrer {
}
} else if SnsRecord::is_match(payload_value) {
if let Some(t) = SnsRecord::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

if let Some(message) = &t.sns.message {
if let Ok(event_bridge_wrapper_message) =
Expand All @@ -150,7 +148,8 @@ impl SpanInferrer {
..Default::default()
};

event_bridge_wrapper_message.enrich_span(&mut wrapped_inferred_span);
event_bridge_wrapper_message
.enrich_span(&mut wrapped_inferred_span, &self.service_mapping);
inferred_span
.meta
.extend(event_bridge_wrapper_message.get_tags());
Expand All @@ -166,25 +165,25 @@ impl SpanInferrer {
}
} else if DynamoDbRecord::is_match(payload_value) {
if let Some(t) = DynamoDbRecord::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
} else if S3Record::is_match(payload_value) {
if let Some(t) = S3Record::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
} else if EventBridgeEvent::is_match(payload_value) {
if let Some(t) = EventBridgeEvent::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
} else if KinesisRecord::is_match(payload_value) {
if let Some(t) = KinesisRecord::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);
t.enrich_span(&mut inferred_span, &self.service_mapping);

trigger = Some(Box::new(t));
}
Expand Down Expand Up @@ -240,7 +239,6 @@ impl SpanInferrer {
}

// TODO: add status tag and other info from response
// TODO: add peer.service
pub fn complete_inferred_spans(&mut self, invocation_span: &Span) {
if let Some(s) = &mut self.inferred_span {
if let Some(ws) = &mut self.wrapped_inferred_span {
Expand All @@ -262,6 +260,8 @@ impl SpanInferrer {

// Set error
ws.error = invocation_span.error;
ws.meta
.insert(String::from("peer.service"), s.service.clone());

ws.trace_id = invocation_span.trace_id;
}
Expand All @@ -279,6 +279,10 @@ impl SpanInferrer {

// Set error
s.error = invocation_span.error;
s.meta.insert(
String::from("peer.service"),
invocation_span.service.clone(),
);

s.trace_id = invocation_span.trace_id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use tracing::debug;
use crate::lifecycle::invocation::{
processor::MS_TO_NS,
triggers::{
get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
get_aws_partition_by_region, lowercase_key, ServiceNameResolver, Trigger,
FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
},
};

Expand Down Expand Up @@ -63,7 +64,7 @@ impl Trigger for APIGatewayHttpEvent {
}

#[allow(clippy::cast_possible_truncation)]
fn enrich_span(&self, span: &mut Span) {
fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap<String, String>) {
debug!("Enriching an Inferred Span for an API Gateway HTTP Event");
let resource = if self.route_key.is_empty() {
format!(
Expand All @@ -81,8 +82,9 @@ impl Trigger for APIGatewayHttpEvent {
path = self.request_context.http.path
);
let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64;
// todo: service mapping
let service_name = self.request_context.domain_name.clone();

let service_name =
self.resolve_service_name(service_mapping, &self.request_context.domain_name);

span.name = "aws.httpapi".to_string();
span.service = service_name;
Expand Down Expand Up @@ -191,6 +193,15 @@ impl Trigger for APIGatewayHttpEvent {
}
}

impl ServiceNameResolver for APIGatewayHttpEvent {
fn get_specific_identifier(&self) -> String {
self.request_context.api_id.clone()
}

fn get_generic_identifier(&self) -> &'static str {
"lambda_api_gateway"
}
}
#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -268,7 +279,8 @@ mod tests {
let event =
APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent");
let mut span = Span::default();
event.enrich_span(&mut span);
let service_mapping = HashMap::new();
event.enrich_span(&mut span, &service_mapping);
assert_eq!(span.name, "aws.httpapi");
assert_eq!(
span.service,
Expand Down Expand Up @@ -331,7 +343,8 @@ mod tests {
let event =
APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent");
let mut span = Span::default();
event.enrich_span(&mut span);
let service_mapping = HashMap::new();
event.enrich_span(&mut span, &service_mapping);
assert_eq!(span.name, "aws.httpapi");
assert_eq!(
span.service,
Expand Down Expand Up @@ -393,4 +406,39 @@ mod tests {
"arn:aws:apigateway:sa-east-1::/restapis/x02yirxc7a/stages/$default"
);
}

#[test]
fn test_resolve_service_name() {
let json = read_json_file("api_gateway_http_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event =
APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent");

// Priority is given to the specific key
let specific_service_mapping = HashMap::from([
("x02yirxc7a".to_string(), "specific-service".to_string()),
(
"lambda_api_gateway".to_string(),
"generic-service".to_string(),
),
]);

assert_eq!(
event.resolve_service_name(
&specific_service_mapping,
&event.request_context.domain_name
),
"specific-service"
);

let generic_service_mapping = HashMap::from([(
"lambda_api_gateway".to_string(),
"generic-service".to_string(),
)]);
assert_eq!(
event
.resolve_service_name(&generic_service_mapping, &event.request_context.domain_name),
"generic-service"
);
}
}
Loading