diff --git a/aw-transform/src/chunk.rs b/aw-transform/src/chunk.rs index a8c134a1..341aa785 100644 --- a/aw-transform/src/chunk.rs +++ b/aw-transform/src/chunk.rs @@ -5,7 +5,7 @@ pub fn chunk_events_by_key(events: Vec, key: &str) -> Vec { for event in events { if chunked_events.is_empty() && event.data.get(key).is_some() { // TODO: Add sub-chunks - chunked_events.push(event.clone()); + chunked_events.push(event); } else { let val = match event.data.get(key) { None => continue, @@ -20,7 +20,7 @@ pub fn chunk_events_by_key(events: Vec, key: &str) -> Vec { chunked_events.push(last_event); if &last_val != val { // TODO: Add sub-chunks - chunked_events.push(event.clone()); + chunked_events.push(event); } } } diff --git a/aw-transform/src/filter_keyvals.rs b/aw-transform/src/filter_keyvals.rs index bcb3528c..d24d40fe 100644 --- a/aw-transform/src/filter_keyvals.rs +++ b/aw-transform/src/filter_keyvals.rs @@ -9,7 +9,7 @@ pub fn filter_keyvals(mut events: Vec, key: &str, vals: &[Value]) -> Vec< if let Some(v) = event.data.get(key) { for val in vals { if val == v { - filtered_events.push(event.clone()); + filtered_events.push(event); break; } } @@ -24,7 +24,7 @@ pub fn filter_keyvals_regex(mut events: Vec, key: &str, regex: &Regex) -> for event in events.drain(..) { if let Some(v) = event.data.get(key) { if regex.is_match(v.as_str().unwrap()) { - filtered_events.push(event.clone()); + filtered_events.push(event); } } } diff --git a/aw-transform/src/flood.rs b/aw-transform/src/flood.rs index c99521b7..b2b656b8 100644 --- a/aw-transform/src/flood.rs +++ b/aw-transform/src/flood.rs @@ -8,13 +8,15 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { let mut events_sorted = sort_by_timestamp(events); let mut e1_iter = events_sorted.drain(..).peekable(); let mut new_events = Vec::new(); - let mut drop_next = false; let mut gap_prev: Option = None; - while let Some(mut e1) = e1_iter.next() { - if drop_next { - drop_next = false; - continue; + let mut retry_e: Option = None; + while let Some(mut e1) = match retry_e { + Some(e) => { + retry_e = None; + Some(e) } + None => e1_iter.next(), + } { if let Some(gap) = gap_prev { e1.timestamp = e1.timestamp - (gap / 2); e1.duration = e1.duration + (gap / 2); @@ -36,17 +38,28 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { warn!("Gap was of negative duration ({}s), but could be safely merged. This error will only show once per batch.", gap); warned_negative_gap_safe = true; } - // Extend e1 to the middle between e1 and e2 - e1.duration = e2.calculate_endtime() - e1.timestamp; + // Choose the longest event and set the endtime to it + let e1_endtime = e1.calculate_endtime(); + let e2_endtime = e2.calculate_endtime(); + if e2_endtime > e1_endtime { + e1.duration = e2_endtime - e1.timestamp; + } else { + e1.duration = e1_endtime - e1.timestamp; + } // Drop next event since they are merged and flooded into e1 - drop_next = true; + e1_iter.next(); + // Retry this event again to give it a change to merge e1 + // with 'e3' + retry_e = Some(e1); + // Since we are retrying on this event we don't want to push it + // to the new_events vec + continue; } else { if chrono::Duration::seconds(0) > gap { if !warned_negative_gap_unsafe { warn!("Gap was of negative duration ({}s) and could NOT be safely merged. This error will only show once per batch.", gap); warned_negative_gap_unsafe = true; } - continue; } // Extend e1 to the middle between e1 and e2 e1.duration = e1.duration + (gap / 2); @@ -54,7 +67,7 @@ pub fn flood(events: Vec, pulsetime: chrono::Duration) -> Vec { gap_prev = Some(gap); } } - new_events.push(e1.clone()); + new_events.push(e1); } new_events } @@ -126,4 +139,88 @@ mod tests { assert_eq!(&res[0], &e1_expected); assert_eq!(&res[1], &e2_expected); } + + #[test] + fn test_flood_same_timestamp() { + // e1, stay same + // e2, base merge (longest duration, this should be the duration selected) + // e3, merge with e2 + // e4, stay same + let e1 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(), + duration: Duration::seconds(1), + data: json_map! {"status": "afk"}, + }; + let e2 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(), + duration: Duration::seconds(5), + data: json_map! {"status": "not-afk"}, + }; + let e3 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(), + duration: Duration::seconds(1), + data: json_map! {"status": "not-afk"}, + }; + let e4 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:06Z").unwrap(), + duration: Duration::seconds(1), + data: json_map! {"status": "afk"}, + }; + let res = flood( + vec![e1.clone(), e2.clone(), e3.clone(), e4.clone()], + Duration::seconds(5), + ); + assert_eq!(3, res.len()); + assert_eq!(&res[0], &e1); + assert_eq!(&res[1], &e2); + assert_eq!(&res[2], &e4); + + // e1, stay same + // e2, base merge + // e3, merge with e2 + // e4, merge with e2 (longest duration, this should be the duration selected) + // e5, stay same + let e1 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(), + duration: Duration::seconds(1), + data: json_map! {"status": "afk"}, + }; + let e2 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(), + duration: Duration::seconds(5), + data: json_map! {"status": "not-afk"}, + }; + let e3 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(), + duration: Duration::seconds(1), + data: json_map! {"status": "not-afk"}, + }; + let e4 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(), + duration: Duration::seconds(10), + data: json_map! {"status": "not-afk"}, + }; + let e5 = Event { + id: None, + timestamp: DateTime::from_str("2000-01-01T00:00:11Z").unwrap(), + duration: Duration::seconds(1), + data: json_map! {"status": "afk"}, + }; + let res = flood( + vec![e1.clone(), e2.clone(), e3.clone(), e4.clone(), e5.clone()], + Duration::seconds(5), + ); + assert_eq!(3, res.len()); + assert_eq!(&res[0], &e1); + assert_eq!(&res[1], &e4); + assert_eq!(&res[2], &e5); + } }