Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/sinks/aws_cloudwatch_logs/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::{
template::Template,
};

// Estimated maximum size of InputLogEvent with an empty message
// Estimated maximum size of InputLogEvent is 50 bytes with an empty message
const EVENT_SIZE_OVERHEAD: usize = 50;
const MAX_EVENT_SIZE: usize = 256 * 1024;
const MAX_MESSAGE_SIZE: usize = MAX_EVENT_SIZE - EVENT_SIZE_OVERHEAD;
// Batch request overhead is added now that a single event can reach max batch size
const BATCH_SIZE_OVERHEAD: usize = 26;
const MAX_EVENT_SIZE: usize = 1024 * 1024;
const MAX_MESSAGE_SIZE: usize = MAX_EVENT_SIZE - EVENT_SIZE_OVERHEAD - BATCH_SIZE_OVERHEAD;

#[derive(Clone)]
pub struct CloudwatchRequest {
Expand Down Expand Up @@ -99,7 +101,7 @@ impl CloudwatchRequestBuilder {
}
let message = String::from_utf8_lossy(&message_bytes).to_string();

if message.len() > MAX_MESSAGE_SIZE {
if message.len() >= MAX_MESSAGE_SIZE {
emit!(AwsCloudwatchLogsMessageSizeError {
size: message.len(),
max_size: MAX_MESSAGE_SIZE,
Expand Down Expand Up @@ -127,7 +129,7 @@ impl CloudwatchRequestBuilder {
/// source: <https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html>
impl ByteSizeOf for CloudwatchRequest {
fn size_of(&self) -> usize {
self.message.len() + 26
self.message.len() + BATCH_SIZE_OVERHEAD
}

fn allocated_bytes(&self) -> usize {
Expand All @@ -141,7 +143,7 @@ mod tests {
use vector_lib::config::log_schema;
use vector_lib::event::LogEvent;

use super::CloudwatchRequestBuilder;
use super::{CloudwatchRequestBuilder, MAX_MESSAGE_SIZE};

#[test]
fn test() {
Expand All @@ -160,4 +162,22 @@ mod tests {
assert_eq!(request.timestamp, timestamp.timestamp_millis());
assert_eq!(&request.message, message);
}

#[test]
fn test_rejects_oversized_log_event() {
let mut request_builder = CloudwatchRequestBuilder {
group_template: "group".try_into().unwrap(),
stream_template: "stream".try_into().unwrap(),
transformer: Default::default(),
encoder: Default::default(),
};

let timestamp = Utc::now();
let oversized = "X".repeat(MAX_MESSAGE_SIZE + 1);
let mut event = LogEvent::from(oversized);
event.insert(log_schema().timestamp_key_target_path().unwrap(), timestamp);

let request = request_builder.build(event.into());
assert!(request.is_none(), "Expected None for oversized log event");
}
}