@@ -15,8 +15,7 @@ use bottlecap::{
1515 event_bus:: bus:: EventBus ,
1616 events:: Event ,
1717 lifecycle:: {
18- flush_control:: FlushControl ,
19- invocation_context:: { InvocationContext , InvocationContextBuffer } ,
18+ flush_control:: FlushControl , invocation:: processor:: Processor as InvocationProcessor ,
2019 listener:: Listener as LifecycleListener ,
2120 } ,
2221 logger,
@@ -294,6 +293,11 @@ async fn extension_loop_active(
294293 let trace_flusher = Arc :: new ( trace_flusher:: ServerlessTraceFlusher {
295294 buffer : Arc :: new ( TokioMutex :: new ( Vec :: new ( ) ) ) ,
296295 } ) ;
296+
297+ let invocation_processor = Arc :: new ( TokioMutex :: new ( InvocationProcessor :: new (
298+ Arc :: clone ( & tags_provider) ,
299+ Arc :: clone ( config) ,
300+ ) ) ) ;
297301 let trace_processor = Arc :: new ( trace_processor:: ServerlessTraceProcessor {
298302 obfuscation_config : Arc :: new (
299303 obfuscation_config:: ObfuscationConfig :: new ( )
@@ -312,23 +316,28 @@ async fn extension_loop_active(
312316 let trace_flusher_clone = trace_flusher. clone ( ) ;
313317 let stats_flusher_clone = stats_flusher. clone ( ) ;
314318
315- let trace_agent = Box :: new ( trace_agent:: TraceAgent {
316- config : Arc :: clone ( config) ,
317- trace_processor,
318- trace_flusher : trace_flusher_clone,
319- stats_processor,
320- stats_flusher : stats_flusher_clone,
321- tags_provider : Arc :: clone ( & tags_provider) ,
322- } ) ;
319+ let trace_agent = Box :: new (
320+ trace_agent:: TraceAgent :: new (
321+ Arc :: clone ( config) ,
322+ trace_processor. clone ( ) ,
323+ trace_flusher_clone,
324+ stats_processor,
325+ stats_flusher_clone,
326+ Arc :: clone ( & tags_provider) ,
327+ )
328+ . await ,
329+ ) ;
330+ let trace_agent_tx = trace_agent. get_sender_copy ( ) ;
331+
323332 tokio:: spawn ( async move {
324- let res = trace_agent. start_trace_agent ( ) . await ;
333+ let res = trace_agent. start ( ) . await ;
325334 if let Err ( e) = res {
326335 error ! ( "Error starting trace agent: {e:?}" ) ;
327336 }
328337 } ) ;
329338
330339 let lifecycle_listener = LifecycleListener {
331- tags_provider : Arc :: clone ( & tags_provider ) ,
340+ invocation_processor : Arc :: clone ( & invocation_processor ) ,
332341 } ;
333342 // TODO(astuyve): deprioritize this task after the first request
334343 tokio:: spawn ( async move {
@@ -346,7 +355,6 @@ async fn extension_loop_active(
346355 setup_telemetry_client ( & r. extension_id , logs_agent_channel) . await ?;
347356
348357 let flush_control = FlushControl :: new ( config. serverless_flush_strategy ) ;
349- let mut invocation_context_buffer = InvocationContextBuffer :: default ( ) ;
350358 let mut shutdown = false ;
351359
352360 let mut flush_interval = flush_control. get_flush_interval ( ) ;
@@ -389,92 +397,95 @@ async fn extension_loop_active(
389397 Event :: Metric ( event) => {
390398 debug!( "Metric event: {:?}" , event) ;
391399 }
392- Event :: Telemetry ( event) => match event. record {
393- TelemetryRecord :: PlatformStart { request_id, .. } => {
394- invocation_context_buffer. insert( InvocationContext {
395- request_id,
396- runtime_duration_ms: 0.0 ,
397- } ) ;
398- }
399- TelemetryRecord :: PlatformInitReport {
400- initialization_type,
401- phase,
402- metrics,
403- } => {
404- debug!( "Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}" , initialization_type, phase, metrics) ;
405- lambda_enhanced_metrics
406- . set_init_duration_metric( metrics. duration_ms) ;
407- }
408- TelemetryRecord :: PlatformRuntimeDone {
409- request_id,
410- status,
411- metrics,
412- ..
413- } => {
414- if let Some ( metrics) = metrics {
415- invocation_context_buffer
416- . add_runtime_duration( & request_id, metrics. duration_ms) ;
400+ Event :: Telemetry ( event) =>
401+ match event. record {
402+ TelemetryRecord :: PlatformStart { request_id, .. } => {
403+ let mut p = invocation_processor. lock( ) . await ;
404+ p. on_platform_start( request_id, event. time) ;
405+ drop( p) ;
406+ }
407+ TelemetryRecord :: PlatformInitReport {
408+ initialization_type,
409+ phase,
410+ metrics,
411+ } => {
412+ debug!( "Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}" , initialization_type, phase, metrics) ;
417413 lambda_enhanced_metrics
418- . set_runtime_duration_metric ( metrics. duration_ms) ;
414+ . set_init_duration_metric ( metrics. duration_ms) ;
419415 }
416+ TelemetryRecord :: PlatformRuntimeDone {
417+ request_id,
418+ status,
419+ metrics,
420+ ..
421+ } => {
422+ let mut p = invocation_processor. lock( ) . await ;
423+ if let Some ( metrics) = metrics {
424+ p. on_platform_runtime_done(
425+ & request_id,
426+ metrics. duration_ms,
427+ config. clone( ) ,
428+ tags_provider. clone( ) ,
429+ trace_processor. clone( ) ,
430+ trace_agent_tx. clone( )
431+ ) . await ;
432+ lambda_enhanced_metrics
433+ . set_runtime_duration_metric( metrics. duration_ms) ;
434+ }
435+ drop( p) ;
420436
421- if status != Status :: Success {
422- lambda_enhanced_metrics. increment_errors_metric( ) ;
423- if status == Status :: Timeout {
424- lambda_enhanced_metrics. increment_timeout_metric( ) ;
437+ if status != Status :: Success {
438+ lambda_enhanced_metrics. increment_errors_metric( ) ;
439+ if status == Status :: Timeout {
440+ lambda_enhanced_metrics. increment_timeout_metric( ) ;
441+ }
425442 }
426- }
427- debug!(
428- "Runtime done for request_id: {:?} with status: {:?}" ,
429- request_id, status
430- ) ;
431- // TODO(astuyve) it'll be easy to
432- // pass the invocation deadline to
433- // flush tasks here, so they can
434- // retry if we have more time
435- if flush_control. should_flush_end( ) {
436- tokio:: join!(
437- logs_flusher. flush( ) ,
438- metrics_flusher. flush( ) ,
439- trace_flusher. manual_flush( ) ,
440- stats_flusher. manual_flush( )
443+ debug!(
444+ "Runtime done for request_id: {:?} with status: {:?}" ,
445+ request_id, status
441446 ) ;
447+
448+ // TODO(astuyve) it'll be easy to
449+ // pass the invocation deadline to
450+ // flush tasks here, so they can
451+ // retry if we have more time
452+ if flush_control. should_flush_end( ) {
453+ tokio:: join!(
454+ logs_flusher. flush( ) ,
455+ metrics_flusher. flush( ) ,
456+ trace_flusher. manual_flush( ) ,
457+ stats_flusher. manual_flush( )
458+ ) ;
459+ }
460+ break ;
442461 }
443- break ;
444- }
445- TelemetryRecord :: PlatformReport {
446- request_id,
447- status,
448- metrics,
449- ..
450- } => {
451- debug!(
452- "Platform report for request_id: {:?} with status: {:?}" ,
453- request_id, status
454- ) ;
455- lambda_enhanced_metrics. set_report_log_metrics( & metrics) ;
456- if let Some ( invocation_context) =
457- invocation_context_buffer. remove( & request_id)
458- {
459- if invocation_context. runtime_duration_ms > 0.0 {
460- let post_runtime_duration_ms = metrics. duration_ms
461- - invocation_context. runtime_duration_ms;
462+ TelemetryRecord :: PlatformReport {
463+ request_id,
464+ status,
465+ metrics,
466+ ..
467+ } => {
468+ debug!(
469+ "Platform report for request_id: {:?} with status: {:?}" ,
470+ request_id, status
471+ ) ;
472+ lambda_enhanced_metrics. set_report_log_metrics( & metrics) ;
473+ let mut p = invocation_processor. lock( ) . await ;
474+ if let Some ( post_runtime_duration_ms) = p. on_platform_report( & request_id, metrics. duration_ms) {
462475 lambda_enhanced_metrics. set_post_runtime_duration_metric(
463476 post_runtime_duration_ms,
464477 ) ;
465- } else {
466- debug!( "Impossible to compute post runtime duration for request_id: {:?}" , request_id) ;
467478 }
468- }
479+ drop ( p ) ;
469480
470- if shutdown {
471- break ;
481+ if shutdown {
482+ break ;
483+ }
484+ }
485+ _ => {
486+ debug!( "Unforwarded Telemetry event: {:?}" , event) ;
472487 }
473488 }
474- _ => {
475- debug!( "Unforwarded Telemetry event: {:?}" , event) ;
476- }
477- } ,
478489 }
479490 }
480491 _ = flush_interval. tick( ) => {
0 commit comments