Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aw-transform/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub fn chunk_events_by_key(events: Vec<Event>, key: &str) -> Vec<Event> {
for event in events {
if chunked_events.is_empty() && event.data.get(key).is_some() {
// TODO: Add sub-chunks
chunked_events.push(event.clone());
chunked_events.push(event);
} else {
let val = match event.data.get(key) {
None => continue,
Expand All @@ -20,7 +20,7 @@ pub fn chunk_events_by_key(events: Vec<Event>, key: &str) -> Vec<Event> {
chunked_events.push(last_event);
if &last_val != val {
// TODO: Add sub-chunks
chunked_events.push(event.clone());
chunked_events.push(event);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions aw-transform/src/filter_keyvals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub fn filter_keyvals(mut events: Vec<Event>, key: &str, vals: &[Value]) -> Vec<
if let Some(v) = event.data.get(key) {
for val in vals {
if val == v {
filtered_events.push(event.clone());
filtered_events.push(event);
break;
}
}
Expand All @@ -24,7 +24,7 @@ pub fn filter_keyvals_regex(mut events: Vec<Event>, key: &str, regex: &Regex) ->
for event in events.drain(..) {
if let Some(v) = event.data.get(key) {
if regex.is_match(v.as_str().unwrap()) {
filtered_events.push(event.clone());
filtered_events.push(event);
}
}
}
Expand Down
117 changes: 107 additions & 10 deletions aw-transform/src/flood.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {
let mut events_sorted = sort_by_timestamp(events);
let mut e1_iter = events_sorted.drain(..).peekable();
let mut new_events = Vec::new();
let mut drop_next = false;
let mut gap_prev: Option<chrono::Duration> = None;
while let Some(mut e1) = e1_iter.next() {
if drop_next {
drop_next = false;
continue;
let mut retry_e: Option<Event> = None;
while let Some(mut e1) = match retry_e {
Some(e) => {
retry_e = None;
Some(e)
}
None => e1_iter.next(),
} {
if let Some(gap) = gap_prev {
e1.timestamp = e1.timestamp - (gap / 2);
e1.duration = e1.duration + (gap / 2);
Expand All @@ -36,25 +38,36 @@ pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {
warn!("Gap was of negative duration ({}s), but could be safely merged. This error will only show once per batch.", gap);
warned_negative_gap_safe = true;
}
// Extend e1 to the middle between e1 and e2
e1.duration = e2.calculate_endtime() - e1.timestamp;
// Choose the longest event and set the endtime to it
let e1_endtime = e1.calculate_endtime();
let e2_endtime = e2.calculate_endtime();
if e2_endtime > e1_endtime {
e1.duration = e2_endtime - e1.timestamp;
} else {
e1.duration = e1_endtime - e1.timestamp;
}
// Drop next event since they are merged and flooded into e1
drop_next = true;
e1_iter.next();
// Retry this event again to give it a change to merge e1
// with 'e3'
retry_e = Some(e1);
// Since we are retrying on this event we don't want to push it
// to the new_events vec
continue;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Took me a while to understand what's going on with retry_e, but looks performant 😃

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly to make the borrow checker happy, there are more readable ways to implement it if you write "unsafe" code.

} else {
if chrono::Duration::seconds(0) > gap {
if !warned_negative_gap_unsafe {
warn!("Gap was of negative duration ({}s) and could NOT be safely merged. This error will only show once per batch.", gap);
warned_negative_gap_unsafe = true;
}
continue;
}
// Extend e1 to the middle between e1 and e2
e1.duration = e1.duration + (gap / 2);
// Make sure next event is pre-extended
gap_prev = Some(gap);
}
}
new_events.push(e1.clone());
new_events.push(e1);
}
new_events
}
Expand Down Expand Up @@ -126,4 +139,88 @@ mod tests {
assert_eq!(&res[0], &e1_expected);
assert_eq!(&res[1], &e2_expected);
}

#[test]
fn test_flood_same_timestamp() {
// e1, stay same
// e2, base merge (longest duration, this should be the duration selected)
// e3, merge with e2
// e4, stay same
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
data: json_map! {"status": "afk"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(5),
data: json_map! {"status": "not-afk"},
};
let e3 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
data: json_map! {"status": "not-afk"},
};
let e4 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:06Z").unwrap(),
duration: Duration::seconds(1),
data: json_map! {"status": "afk"},
};
let res = flood(
vec![e1.clone(), e2.clone(), e3.clone(), e4.clone()],
Duration::seconds(5),
);
assert_eq!(3, res.len());
assert_eq!(&res[0], &e1);
assert_eq!(&res[1], &e2);
assert_eq!(&res[2], &e4);

// e1, stay same
// e2, base merge
// e3, merge with e2
// e4, merge with e2 (longest duration, this should be the duration selected)
// e5, stay same
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
data: json_map! {"status": "afk"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(5),
data: json_map! {"status": "not-afk"},
};
let e3 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
data: json_map! {"status": "not-afk"},
};
let e4 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(10),
data: json_map! {"status": "not-afk"},
};
let e5 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:11Z").unwrap(),
duration: Duration::seconds(1),
data: json_map! {"status": "afk"},
};
let res = flood(
vec![e1.clone(), e2.clone(), e3.clone(), e4.clone(), e5.clone()],
Duration::seconds(5),
);
assert_eq!(3, res.len());
assert_eq!(&res[0], &e1);
assert_eq!(&res[1], &e4);
assert_eq!(&res[2], &e5);
}
}