Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ define ENVIRONMENT_EXEC
$(if $(findstring true,$(ENVIRONMENT_TTY)),--tty,) \
--init \
--interactive \
--privileged \
--env INSIDE_ENVIRONMENT=true \
$(if $(ENVIRONMENT_NETWORK),--network $(ENVIRONMENT_NETWORK),) \
--mount type=bind,source=${CURRENT_DIR},target=/git/vectordotdev/vector \
Expand Down
12 changes: 8 additions & 4 deletions src/transforms/detect_exceptions/exception_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::usize;
use chrono::{DateTime, Utc};
use regex::Regex;
use crate::{
config::log_schema, event::LogEvent, event::Value,
event::LogEvent, event::Value,
internal_events::detect_exceptions::DetectExceptionsStaleEventFlushed,
transforms::detect_exceptions::*,
};
use vector_lib::lookup::path::OwnedTargetPath;

#[derive(Debug, Clone)]
pub struct RuleTarget {
Expand Down Expand Up @@ -52,6 +53,7 @@ pub enum DetectionStatus {
pub struct TraceAccumulator {
max_bytes: usize,
max_lines: usize,
message_key: OwnedTargetPath,
multiline_flush_interval: Duration,
first_event: LogEvent,
buffer_size: usize,
Expand All @@ -66,11 +68,13 @@ impl TraceAccumulator {
multiline_flush_interval: Duration,
max_bytes: usize,
max_lines: usize,
message_key: OwnedTargetPath,
) -> TraceAccumulator {
TraceAccumulator {
buffer_size: 0,
max_bytes,
max_lines,
message_key,
multiline_flush_interval,
first_event: LogEvent::default(),
buffer_start_time: Utc::now(),
Expand All @@ -84,7 +88,7 @@ impl TraceAccumulator {

pub fn push(&mut self, le: &LogEvent, output: &mut Vec<Event>) {
let mut detection_status = DetectionStatus::NoTrace;
let message = le.get(log_schema().message_key_target_path().unwrap());
let message = le.get(&self.message_key);
let message_copy = message.clone();

match message {
Expand Down Expand Up @@ -161,7 +165,7 @@ impl TraceAccumulator {
}
_ => {
self.first_event.insert(
log_schema().message_key_target_path().unwrap(),
&self.message_key,
self.accumulated_messages.join("\n"),
);
output.push(Event::Log(self.first_event.clone()));
Expand Down Expand Up @@ -241,7 +245,7 @@ mod exception_detector_tests {
detector: &mut ExceptionDetector,
expected_first: DetectionStatus,
expected_last: DetectionStatus,
multiline: Vec<&str>,
multiline: Vec<&str>
) {
let last_index = multiline.len() - 1;
let mut index = 0;
Expand Down
24 changes: 21 additions & 3 deletions src/transforms/detect_exceptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use futures::{stream, Stream, StreamExt};
use std::{collections::HashMap, pin::Pin, time::Duration};
use vector_lib::{
configurable::configurable_component,
config::{LogNamespace, clone_input_definitions},
enrichment
config::{log_schema,LogNamespace, clone_input_definitions},
enrichment,
};

use vector_lib::lookup::path::OwnedTargetPath;
use vector_lib::lookup::path::parse_target_path;

/// ProgrammingLanguages
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
Expand Down Expand Up @@ -63,7 +66,7 @@ pub enum ProgrammingLanguages {
#[configurable_component(transform("detect_exceptions"))]
#[derive(Debug, Clone)]
#[serde(deny_unknown_fields, default)]
pub struct DetectExceptionsConfig {
pub struct DetectExceptionsConfig{
/// Programming Languages for which to detect Exceptions
///
/// Supported languages are
Expand Down Expand Up @@ -99,6 +102,9 @@ pub struct DetectExceptionsConfig {
#[serde(default)]
pub group_by: Vec<String>,

/// The key path to use to find the message of a log event
pub message_key: String,

/// The interval of flushing the buffer for multiline exceptions.
#[serde(default = "default_multiline_flush_interval_ms")]
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
Expand All @@ -122,6 +128,7 @@ impl Default for DetectExceptionsConfig {
multiline_flush_interval_ms: default_multiline_flush_interval_ms(),
max_bytes: default_max_bytes_size(),
max_lines: default_max_lines_num(),
message_key: log_schema().owned_message_path().to_string(),
group_by: vec![],
}
}
Expand Down Expand Up @@ -188,6 +195,7 @@ pub struct DetectExceptions {
multiline_flush_interval: Duration,
max_bytes: usize,
max_lines: usize,
message_key: OwnedTargetPath,
group_by: Vec<String>,
}

Expand All @@ -196,6 +204,14 @@ impl DetectExceptions {
if config.languages.is_empty() {
return Err("languages cannot be empty".into());
}
let owned_target_path: OwnedTargetPath;
match parse_target_path(config.message_key.as_str()){
Err(e) => return Err(e.into()),
Ok(value) =>{
owned_target_path = value
},
};

Ok(DetectExceptions {
accumulators: HashMap::new(),
languages: config.languages.clone(),
Expand All @@ -204,6 +220,7 @@ impl DetectExceptions {
multiline_flush_interval: config.multiline_flush_interval_ms,
max_bytes: config.max_bytes,
max_lines: config.max_lines,
message_key: owned_target_path,
flush_period: config.flush_period_ms,
})
}
Expand All @@ -220,6 +237,7 @@ impl DetectExceptions {
self.multiline_flush_interval,
self.max_bytes,
self.max_lines,
self.message_key.clone(),
),
);
}
Expand Down