diff --git a/Makefile b/Makefile index 4a376dfab9efc..17f7854b41eb9 100644 --- a/Makefile +++ b/Makefile @@ -130,6 +130,7 @@ define ENVIRONMENT_EXEC $(if $(findstring true,$(ENVIRONMENT_TTY)),--tty,) \ --init \ --interactive \ + --privileged \ --env INSIDE_ENVIRONMENT=true \ $(if $(ENVIRONMENT_NETWORK),--network $(ENVIRONMENT_NETWORK),) \ --mount type=bind,source=${CURRENT_DIR},target=/git/vectordotdev/vector \ diff --git a/src/transforms/detect_exceptions/exception_detector.rs b/src/transforms/detect_exceptions/exception_detector.rs index 1d77e44e0da15..2629012d580e7 100644 --- a/src/transforms/detect_exceptions/exception_detector.rs +++ b/src/transforms/detect_exceptions/exception_detector.rs @@ -3,10 +3,11 @@ use std::usize; use chrono::{DateTime, Utc}; use regex::Regex; use crate::{ - config::log_schema, event::LogEvent, event::Value, + event::LogEvent, event::Value, internal_events::detect_exceptions::DetectExceptionsStaleEventFlushed, transforms::detect_exceptions::*, }; +use vector_lib::lookup::path::OwnedTargetPath; #[derive(Debug, Clone)] pub struct RuleTarget { @@ -52,6 +53,7 @@ pub enum DetectionStatus { pub struct TraceAccumulator { max_bytes: usize, max_lines: usize, + message_key: OwnedTargetPath, multiline_flush_interval: Duration, first_event: LogEvent, buffer_size: usize, @@ -66,11 +68,13 @@ impl TraceAccumulator { multiline_flush_interval: Duration, max_bytes: usize, max_lines: usize, + message_key: OwnedTargetPath, ) -> TraceAccumulator { TraceAccumulator { buffer_size: 0, max_bytes, max_lines, + message_key, multiline_flush_interval, first_event: LogEvent::default(), buffer_start_time: Utc::now(), @@ -84,7 +88,7 @@ impl TraceAccumulator { pub fn push(&mut self, le: &LogEvent, output: &mut Vec) { let mut detection_status = DetectionStatus::NoTrace; - let message = le.get(log_schema().message_key_target_path().unwrap()); + let message = le.get(&self.message_key); let message_copy = message.clone(); match message { @@ -161,7 +165,7 @@ impl TraceAccumulator { } _ => { self.first_event.insert( - log_schema().message_key_target_path().unwrap(), + &self.message_key, self.accumulated_messages.join("\n"), ); output.push(Event::Log(self.first_event.clone())); @@ -241,7 +245,7 @@ mod exception_detector_tests { detector: &mut ExceptionDetector, expected_first: DetectionStatus, expected_last: DetectionStatus, - multiline: Vec<&str>, + multiline: Vec<&str> ) { let last_index = multiline.len() - 1; let mut index = 0; diff --git a/src/transforms/detect_exceptions/mod.rs b/src/transforms/detect_exceptions/mod.rs index 9ae4f24f0b670..ad3ded62b0875 100644 --- a/src/transforms/detect_exceptions/mod.rs +++ b/src/transforms/detect_exceptions/mod.rs @@ -16,10 +16,13 @@ use futures::{stream, Stream, StreamExt}; use std::{collections::HashMap, pin::Pin, time::Duration}; use vector_lib::{ configurable::configurable_component, - config::{LogNamespace, clone_input_definitions}, - enrichment + config::{log_schema,LogNamespace, clone_input_definitions}, + enrichment, }; +use vector_lib::lookup::path::OwnedTargetPath; +use vector_lib::lookup::path::parse_target_path; + /// ProgrammingLanguages #[configurable_component] #[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] @@ -63,7 +66,7 @@ pub enum ProgrammingLanguages { #[configurable_component(transform("detect_exceptions"))] #[derive(Debug, Clone)] #[serde(deny_unknown_fields, default)] -pub struct DetectExceptionsConfig { +pub struct DetectExceptionsConfig{ /// Programming Languages for which to detect Exceptions /// /// Supported languages are @@ -99,6 +102,9 @@ pub struct DetectExceptionsConfig { #[serde(default)] pub group_by: Vec, + /// The key path to use to find the message of a log event + pub message_key: String, + /// The interval of flushing the buffer for multiline exceptions. #[serde(default = "default_multiline_flush_interval_ms")] #[serde_as(as = "serde_with::DurationMilliSeconds")] @@ -122,6 +128,7 @@ impl Default for DetectExceptionsConfig { multiline_flush_interval_ms: default_multiline_flush_interval_ms(), max_bytes: default_max_bytes_size(), max_lines: default_max_lines_num(), + message_key: log_schema().owned_message_path().to_string(), group_by: vec![], } } @@ -188,6 +195,7 @@ pub struct DetectExceptions { multiline_flush_interval: Duration, max_bytes: usize, max_lines: usize, + message_key: OwnedTargetPath, group_by: Vec, } @@ -196,6 +204,14 @@ impl DetectExceptions { if config.languages.is_empty() { return Err("languages cannot be empty".into()); } + let owned_target_path: OwnedTargetPath; + match parse_target_path(config.message_key.as_str()){ + Err(e) => return Err(e.into()), + Ok(value) =>{ + owned_target_path = value + }, + }; + Ok(DetectExceptions { accumulators: HashMap::new(), languages: config.languages.clone(), @@ -204,6 +220,7 @@ impl DetectExceptions { multiline_flush_interval: config.multiline_flush_interval_ms, max_bytes: config.max_bytes, max_lines: config.max_lines, + message_key: owned_target_path, flush_period: config.flush_period_ms, }) } @@ -220,6 +237,7 @@ impl DetectExceptions { self.multiline_flush_interval, self.max_bytes, self.max_lines, + self.message_key.clone(), ), ); }