diff --git a/Cargo.toml b/Cargo.toml index a0354fcaa4ca3..1e0cca8d2037d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -258,6 +258,7 @@ transforms = [ "transforms-split", "transforms-swimlanes", "transforms-tokenizer", + "transforms-type_filter", ] transforms-add_fields = [] transforms-add_tags = [] @@ -282,6 +283,7 @@ transforms-sampler = ["seahash"] transforms-split = [] transforms-swimlanes = [] transforms-tokenizer = ["nom"] +transforms-type_filter = [] # Sinks sinks = [ diff --git a/src/event/mod.rs b/src/event/mod.rs index 6f86c9fcffd24..eddda6620a796 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -51,6 +51,16 @@ impl Event { }) } + pub fn new_empty_counter() -> Self { + Event::Metric(Metric { + name: "counter".into(), + timestamp: None, + tags: None, + kind: MetricKind::Incremental, + value: MetricValue::Counter { value: 1.0 }, + }) + } + pub fn as_log(&self) -> &LogEvent { match self { Event::Log(log) => log, diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index f43b610f3c471..64b0eb4ed88f9 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -47,6 +47,8 @@ pub mod split; pub mod swimlanes; #[cfg(feature = "transforms-tokenizer")] pub mod tokenizer; +#[cfg(feature = "transforms-type_filter")] +pub mod type_filter; use futures01::{sync::mpsc::Receiver, Stream}; diff --git a/src/transforms/type_filter.rs b/src/transforms/type_filter.rs new file mode 100644 index 0000000000000..7f6add4cf6071 --- /dev/null +++ b/src/transforms/type_filter.rs @@ -0,0 +1,88 @@ +use super::Transform; +use crate::{ + event::Event, + topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct TypeFilterConfig { + pub filter_type: FilterType, +} + +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum FilterType { + Log, + Metric, +} + +inventory::submit! { + TransformDescription::new_without_default::("type_filter") +} + +#[typetag::serde(name = "type_filter")] +impl TransformConfig for TypeFilterConfig { + fn build(&self, _cx: TransformContext) -> crate::Result> { + Ok(Box::new(TypeFilter::new(self.filter_type))) + } + + fn input_type(&self) -> DataType { + DataType::Any + } + + fn output_type(&self) -> DataType { + match self.filter_type { + FilterType::Log => DataType::Log, + FilterType::Metric => DataType::Metric, + } + } + + fn transform_type(&self) -> &'static str { + "type_filter" + } +} + +pub struct TypeFilter { + filter_type: FilterType, +} + +impl TypeFilter { + pub fn new(filter_type: FilterType) -> Self { + Self { filter_type } + } +} + +impl Transform for TypeFilter { + fn transform(&mut self, event: Event) -> Option { + match (&event, self.filter_type) { + (Event::Log(_), FilterType::Log) => Some(event), + (Event::Log(_), FilterType::Metric) => None, + (Event::Metric(_), FilterType::Log) => None, + (Event::Metric(_), FilterType::Metric) => Some(event), + } + } +} + +#[cfg(test)] +mod test { + use super::{FilterType, TypeFilter}; + use crate::{transforms::Transform, Event}; + + #[test] + fn filters_based_on_type() { + let log = Event::new_empty_log(); + let metric = Event::new_empty_counter(); + let mut log_filter = TypeFilter::new(FilterType::Log); + let mut metric_filter = TypeFilter::new(FilterType::Metric); + + assert_eq!(Some(log.clone()), log_filter.transform(log.clone())); + assert_eq!( + Some(metric.clone()), + metric_filter.transform(metric.clone()) + ); + assert_eq!(None, log_filter.transform(metric)); + assert_eq!(None, metric_filter.transform(log)); + } +}