Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ use bottlecap::{
listener::Listener as LifecycleListener,
},
logger,
logs::{agent::LogsAgent, flusher::LogsFlusher},
logs::{
agent::LogsAgent, aggregator_service::AggregatorService as LogsAggregatorService,
flusher::LogsFlusher,
},
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
proxy::{interceptor, should_start_proxy},
secrets::decrypt,
Expand Down Expand Up @@ -904,13 +907,17 @@ fn start_logs_agent(
tags_provider: &Arc<TagProvider>,
event_bus: Sender<Event>,
) -> (Sender<TelemetryEvent>, LogsFlusher) {
let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus);
let logs_agent_channel = logs_agent.get_sender_copy();
let logs_flusher = LogsFlusher::new(
api_key_factory,
Arc::clone(&logs_agent.aggregator),
config.clone(),
let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::default();
start_logs_aggregator(logs_aggr_service);

let mut logs_agent = LogsAgent::new(
Arc::clone(tags_provider),
Arc::clone(config),
event_bus,
logs_aggr_handle.clone(),
);
let logs_agent_channel = logs_agent.get_sender_copy();
let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, config.clone());
tokio::spawn(async move {
logs_agent.spin().await;
});
Expand Down Expand Up @@ -1084,6 +1091,12 @@ fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) {
});
}

fn start_logs_aggregator(aggr_service: LogsAggregatorService) {
tokio::spawn(async move {
aggr_service.run().await;
});
}

async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> CancellationToken {
let dogstatsd_config = DogStatsDConfig {
host: EXTENSION_HOST.to_string(),
Expand Down
23 changes: 11 additions & 12 deletions bottlecap/src/logs/agent.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
use std::sync::Arc;
use tokio::sync::{
Mutex,
mpsc::{self, Sender},
};
use tokio::sync::mpsc::{self, Sender};

use crate::event_bus::Event;
use crate::extension::telemetry::events::TelemetryEvent;
use crate::logs::{aggregator::Aggregator, processor::LogsProcessor};
use crate::logs::{aggregator_service::AggregatorHandle, processor::LogsProcessor};
use crate::tags;
use crate::{LAMBDA_RUNTIME_SLUG, config};

#[allow(clippy::module_name_repetitions)]
pub struct LogsAgent {
pub aggregator: Arc<Mutex<Aggregator>>,
tx: Sender<TelemetryEvent>,
rx: mpsc::Receiver<TelemetryEvent>,
processor: LogsProcessor,
aggregator_handle: AggregatorHandle,
}

impl LogsAgent {
Expand All @@ -24,8 +21,8 @@ impl LogsAgent {
tags_provider: Arc<tags::provider::Provider>,
datadog_config: Arc<config::Config>,
event_bus: Sender<Event>,
) -> LogsAgent {
let aggregator: Arc<Mutex<Aggregator>> = Arc::new(Mutex::new(Aggregator::default()));
aggregator_handle: AggregatorHandle,
) -> Self {
let processor = LogsProcessor::new(
Arc::clone(&datadog_config),
tags_provider,
Expand All @@ -35,23 +32,25 @@ impl LogsAgent {

let (tx, rx) = mpsc::channel::<TelemetryEvent>(1000);

LogsAgent {
aggregator,
Self {
tx,
rx,
processor,
aggregator_handle,
}
}

pub async fn spin(&mut self) {
while let Some(event) = self.rx.recv().await {
self.processor.process(event, &self.aggregator).await;
self.processor.process(event, &self.aggregator_handle).await;
}
}

pub async fn sync_consume(&mut self) {
if let Some(events) = self.rx.recv().await {
self.processor.process(events, &self.aggregator).await;
self.processor
.process(events, &self.aggregator_handle)
.await;
}
}

Expand Down
150 changes: 150 additions & 0 deletions bottlecap/src/logs/aggregator_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error};

use crate::logs::{aggregator::Aggregator, constants};

#[derive(Debug)]
pub enum AggregatorCommand {
InsertBatch(Vec<String>),
GetBatches(oneshot::Sender<Vec<Vec<u8>>>),
Shutdown,
}

#[derive(Clone, Debug)]
pub struct AggregatorHandle {
tx: mpsc::UnboundedSender<AggregatorCommand>,
}

impl AggregatorHandle {
pub fn insert_batch(
&self,
logs: Vec<String>,
) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::InsertBatch(logs))
}

pub async fn get_batches(&self) -> Result<Vec<Vec<u8>>, String> {
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send(AggregatorCommand::GetBatches(response_tx))
.map_err(|e| format!("Failed to send flush command: {e}"))?;

response_rx
.await
.map_err(|e| format!("Failed to receive flush response: {e}"))
}

pub fn shutdown(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::Shutdown)
}
}

pub struct AggregatorService {
aggregator: Aggregator,
rx: mpsc::UnboundedReceiver<AggregatorCommand>,
}

impl AggregatorService {
#[must_use]
#[allow(clippy::should_implement_trait)]
pub fn default() -> (Self, AggregatorHandle) {
Self::new(
constants::MAX_BATCH_ENTRIES_SIZE,
constants::MAX_CONTENT_SIZE_BYTES,
constants::MAX_LOG_SIZE_BYTES,
)
}

#[must_use]
pub fn new(
max_batch_entries_size: usize,
max_content_size_bytes: usize,
max_log_size_bytes: usize,
) -> (Self, AggregatorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let aggregator = Aggregator::new(
max_batch_entries_size,
max_content_size_bytes,
max_log_size_bytes,
);

let service = Self { aggregator, rx };
let handle = AggregatorHandle { tx };

(service, handle)
}

pub async fn run(mut self) {
debug!("Logs aggregator service started");

while let Some(command) = self.rx.recv().await {
match command {
AggregatorCommand::InsertBatch(logs) => {
self.aggregator.add_batch(logs);
}
AggregatorCommand::GetBatches(response_tx) => {
let mut batches = Vec::new();
let mut current_batch = self.aggregator.get_batch();
while !current_batch.is_empty() {
batches.push(current_batch);
current_batch = self.aggregator.get_batch();
}
if response_tx.send(batches).is_err() {
error!("Failed to send logs flush response - receiver dropped");
}
}
AggregatorCommand::Shutdown => {
debug!("Logs aggregator service shutting down");
break;
}
}
}
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::logs::lambda::{IntakeLog, Lambda, Message};

#[tokio::test]
async fn test_aggregator_service_insert_and_flush() {
let (service, handle) = AggregatorService::default();

let service_handle = tokio::spawn(async move {
service.run().await;
});

let log = IntakeLog {
message: Message {
message: "test".to_string(),
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
},
timestamp: 0,
status: "status".to_string(),
},
hostname: "hostname".to_string(),
service: "service".to_string(),
tags: "tags".to_string(),
source: "source".to_string(),
};
let serialized_log = serde_json::to_string(&log).unwrap();

handle.insert_batch(vec![serialized_log.clone()]).unwrap();

let batches = handle.get_batches().await.unwrap();
assert_eq!(batches.len(), 1);
let serialized_batch = format!("[{serialized_log}]");
assert_eq!(batches[0], serialized_batch.as_bytes());

handle
.shutdown()
.expect("Failed to shutdown aggregator service");
service_handle
.await
.expect("Aggregator service task failed");
}
}
43 changes: 22 additions & 21 deletions bottlecap/src/logs/flusher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::FLUSH_RETRY_COUNT;
use crate::config;
use crate::http::get_client;
use crate::logs::aggregator::Aggregator;
use crate::logs::aggregator_service::AggregatorHandle;
use dogstatsd::api_key::ApiKeyFactory;
use futures::future::join_all;
use hyper::StatusCode;
Expand All @@ -10,10 +10,7 @@ use std::error::Error;
use std::time::Instant;
use std::{io::Write, sync::Arc};
use thiserror::Error as ThisError;
use tokio::{
sync::{Mutex, OnceCell},
task::JoinSet,
};
use tokio::{sync::OnceCell, task::JoinSet};
use tracing::{debug, error};
use zstd::stream::write::Encoder;

Expand All @@ -28,24 +25,22 @@ pub struct FailedRequestError {
pub struct Flusher {
client: reqwest::Client,
endpoint: String,
aggregator: Arc<Mutex<Aggregator>>,
config: Arc<config::Config>,
api_key_factory: Arc<ApiKeyFactory>,
headers: OnceCell<HeaderMap>,
}

impl Flusher {
#[must_use]
pub fn new(
api_key_factory: Arc<ApiKeyFactory>,
endpoint: String,
aggregator: Arc<Mutex<Aggregator>>,
config: Arc<config::Config>,
) -> Self {
let client = get_client(&config);
Flusher {
client,
endpoint,
aggregator,
config,
api_key_factory,
headers: OnceCell::new(),
Expand Down Expand Up @@ -199,12 +194,13 @@ impl Flusher {
pub struct LogsFlusher {
config: Arc<config::Config>,
pub flushers: Vec<Flusher>,
aggregator_handle: AggregatorHandle,
}

impl LogsFlusher {
pub fn new(
api_key_factory: Arc<ApiKeyFactory>,
aggregator: Arc<Mutex<Aggregator>>,
aggregator_handle: AggregatorHandle,
config: Arc<config::Config>,
) -> Self {
let mut flushers = Vec::new();
Expand All @@ -222,22 +218,26 @@ impl LogsFlusher {
flushers.push(Flusher::new(
Arc::clone(&api_key_factory),
endpoint,
aggregator.clone(),
config.clone(),
));

// Create flushers for additional endpoints
for endpoint in &config.logs_config_additional_endpoints {
let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port);
let additional_api_key_factory =
Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str()));
flushers.push(Flusher::new(
Arc::clone(&api_key_factory),
additional_api_key_factory,
endpoint_url,
aggregator.clone(),
config.clone(),
));
}

LogsFlusher { config, flushers }
LogsFlusher {
config,
flushers,
aggregator_handle,
}
}

pub async fn flush(
Expand All @@ -261,16 +261,17 @@ impl LogsFlusher {
}
}
} else {
// Get batches from primary flusher's aggregator
let logs_batches = Arc::new({
let mut guard = self.flushers[0].aggregator.lock().await;
let mut batches = Vec::new();
let mut current_batch = guard.get_batch();
while !current_batch.is_empty() {
batches.push(self.compress(current_batch));
current_batch = guard.get_batch();
match self.aggregator_handle.get_batches().await {
Ok(batches) => batches
.into_iter()
.map(|batch| self.compress(batch))
.collect(),
Err(e) => {
debug!("Failed to flush from aggregator: {}", e);
Vec::new()
}
}
batches
});

// Send batches to each flusher
Expand Down
Loading
Loading