From 89ba4f303aac800e6051f8096d5bf30c681b1b84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Tue, 26 Jun 2018 11:54:49 +0200 Subject: [PATCH 1/9] added methods (adjacent, gap, union) to TimePeriod and transform period_union --- aw_core/timeperiod.py | 33 ++++++++- aw_transform/__init__.py | 2 +- aw_transform/filter_period_intersect.py | 91 +++++++++++++++++-------- tests/test_timeperiod.py | 38 +++++++++++ tests/test_transforms.py | 23 +++++++ 5 files changed, 156 insertions(+), 31 deletions(-) diff --git a/aw_core/timeperiod.py b/aw_core/timeperiod.py index a42b4020..42bdb5a4 100644 --- a/aw_core/timeperiod.py +++ b/aw_core/timeperiod.py @@ -6,6 +6,7 @@ class TimePeriod: # Inspired by: http://www.codeproject.com/Articles/168662/Time-Period-Library-for-NET # TODO: Move to its own package def __init__(self, start: datetime, end: datetime) -> None: + # assert start <= end self.start = start self.end = end @@ -14,13 +15,14 @@ def duration(self) -> timedelta: return self.end - self.start def overlaps(self, other: "TimePeriod") -> bool: - # Checks if this event is overlapping partially with another event + """Checks if this event is overlapping partially or entirely with another event""" return self.start < other.start < self.end \ or self.start < other.end < self.end \ - or other.start < self.start and self.end < other.end + or other.start < self.start and self.end < other.end \ + or self == other def contains(self, other: Union[datetime, "TimePeriod"]) -> bool: - # Checks if this event contains the entirety of another event + """Checks if this event contains the entirety of another event""" if isinstance(other, TimePeriod): return self.start <= other.start and other.end <= self.end elif isinstance(other, datetime): @@ -31,7 +33,14 @@ def contains(self, other: Union[datetime, "TimePeriod"]) -> bool: def __contains__(self, other: Union[datetime, "TimePeriod"]) -> bool: return self.contains(other) + def __eq__(self, other: object) -> bool: + if isinstance(other, TimePeriod): + return self.start == other.start and self.end == other.end + else: + return False + def intersection(self, other: "TimePeriod") -> Optional["TimePeriod"]: + """Returns the timeperiod contained in both periods""" # https://stackoverflow.com/posts/3721426/revisions if self.contains(other): # Entirety of other is within self @@ -46,3 +55,21 @@ def intersection(self, other: "TimePeriod") -> Optional["TimePeriod"]: # Entirety of self is within other return self return None + + def adjacent(self, other: "TimePeriod") -> bool: + """Iff timeperiods are exactly next to each other, return True.""" + return self.start == other.end or self.end == other.start + + def gap(self, other: "TimePeriod") -> Optional["TimePeriod"]: + """If periods are separated by a non-zero gap, return the gap as a new timeperiod, else None""" + if not (self.overlaps(other) or self.adjacent(other)): + gap_start = min(self.end, other.end) + gap_end = max(self.start, other.start) + return TimePeriod(gap_start, gap_end) + return None + + def union(self, other: "TimePeriod") -> "TimePeriod": + if not self.gap(other): + return TimePeriod(min(self.start, other.start), max(self.end, other.end)) + else: + raise Exception("TimePeriods must be overlapping or adjacent to be unioned") diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index e0ead6fc..17782177 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -1,5 +1,5 @@ from .filter_keyvals import filter_keyvals, filter_keyvals_regex -from .filter_period_intersect import filter_period_intersect +from .filter_period_intersect import filter_period_intersect, period_union from .heartbeats import heartbeat_merge, heartbeat_reduce from .merge_events_by_keys import merge_events_by_keys from .chunk_events_by_key import chunk_events_by_key diff --git a/aw_transform/filter_period_intersect.py b/aw_transform/filter_period_intersect.py index e3dcbd56..269ab85e 100644 --- a/aw_transform/filter_period_intersect.py +++ b/aw_transform/filter_period_intersect.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import List, Iterable, Tuple from copy import deepcopy from aw_core.models import Event @@ -22,6 +22,44 @@ def _replace_event_period(event: Event, period: TimePeriod) -> Event: return e +def _earliest_event(e1, e2): + if e1.timestamp <= e2.timestamp: + return e1 + else: + return e2 + + +def _concurrent_eventpairs(events1, events2) -> Iterable[Tuple[Event, Event]]: + e1_i = 0 + e2_i = 0 + while e1_i < len(events1) and e2_i < len(events2): + e1 = events1[e1_i] + e2 = events2[e2_i] + e1_p = _get_event_period(e1) + e2_p = _get_event_period(e2) + + ip = e1_p.intersection(e2_p) + if ip: + # If events intersected, add event with intersected duration and try next event + yield (e1, e2) + if e1_p.end <= e2_p.end: + e1_i += 1 + else: + e2_i += 1 + else: + # No intersection, check if event is before/after filterevent + if e1_p.end <= e2_p.start: + # Event ended before filter event started + e1_i += 1 + elif e2_p.end <= e1_p.start: + # Event started after filter event ended + e2_i += 1 + else: + logger.warning("Should be reachable, skipping period") + e1_i += 1 + e2_i += 1 + + def filter_period_intersect(events: List[Event], filterevents: List[Event]) -> List[Event]: """ Filters away all events or time periods of events in which a @@ -40,33 +78,32 @@ def filter_period_intersect(events: List[Event], filterevents: List[Event]) -> L filterevents = sorted(filterevents, key=lambda e: e.timestamp) filtered_events = [] - e_i = 0 - f_i = 0 - while e_i < len(events) and f_i < len(filterevents): - event = events[e_i] - filterevent = filterevents[f_i] - ep = _get_event_period(event) - fp = _get_event_period(filterevent) + for (e1, e2) in _concurrent_eventpairs(events, filterevents): + e1_p = _get_event_period(e1) + e2_p = _get_event_period(e2) - ip = ep.intersection(fp) + ip = e1_p.intersection(e2_p) if ip: - # If events itersected, add event with intersected duration and try next event - filtered_events.append(_replace_event_period(event, ip)) - if ep.end <= fp.end: - e_i += 1 - else: - f_i += 1 - else: - # No intersection, check if event is before/after filterevent - if ep.end <= fp.start: - # Event ended before filter event started - e_i += 1 - elif fp.end <= ep.start: - # Event started after filter event ended - f_i += 1 - else: - logger.warning("Unclear if/how this could be reachable, skipping period") - e_i += 1 - f_i += 1 + # If events intersected, add event with intersected duration + filtered_events.append(_replace_event_period(e1, ip)) return filtered_events + + +def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: + events = sorted(events1 + events2, key=lambda e: e.timestamp) + merged_events = [] + if events: + merged_events.append(events.pop(0)) + for e in events: + last_event = merged_events[-1] + + e_p = _get_event_period(e) + le_p = _get_event_period(last_event) + + if not e_p.gap(le_p): + new_period = e_p.union(le_p) + merged_events[-1] = _replace_event_period(last_event, new_period) + else: + merged_events.append(e) + return merged_events diff --git a/tests/test_timeperiod.py b/tests/test_timeperiod.py index 0967887b..61fc0f19 100644 --- a/tests/test_timeperiod.py +++ b/tests/test_timeperiod.py @@ -76,3 +76,41 @@ def test_overlaps(): tp2 = TimePeriod(now - timedelta(hours=1), now + timedelta(hours=1)) assert tp1.overlaps(tp2) assert tp2.overlaps(tp1) + + +def test_adjacent(): + now = datetime.now() + tp1 = TimePeriod(now - timedelta(hours=1), now) + tp2 = TimePeriod(now, now + timedelta(hours=1)) + assert tp1.adjacent(tp2) + assert tp2.adjacent(tp1) + + +def test_union(): + now = datetime.now() + + # adjacent but not overlapping + tp1 = TimePeriod(now - timedelta(hours=1), now) + tp2 = TimePeriod(now, now + timedelta(hours=1)) + tp_union = tp1.union(tp2) + assert tp1 in tp_union + assert tp2 in tp_union + + # union with self + tp_union = tp1.union(tp1) + assert tp1 == tp_union + + # overlapping + tp1 = TimePeriod(now - timedelta(hours=1), now + timedelta(minutes=30)) + tp2 = TimePeriod(now, now + timedelta(hours=1)) + tp_union = tp1.union(tp2) + assert tp1 in tp_union + assert tp2 in tp_union + + # tp2 contained in tp1 + tp1 = TimePeriod(now - timedelta(minutes=30), now + timedelta(minutes=30)) + tp2 = TimePeriod(now, now + timedelta(minutes=10)) + tp_union = tp1.union(tp2) + assert tp1 in tp_union + assert tp2 in tp_union + assert tp1 == tp_union diff --git a/tests/test_transforms.py b/tests/test_transforms.py index ede290ed..3837fcbc 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -5,6 +5,7 @@ filter_period_intersect, filter_keyvals_regex, filter_keyvals, + period_union, sort_by_timestamp, sort_by_duration, sum_durations, @@ -94,6 +95,28 @@ def test_filter_period_intersect(): assert filtered_events[1].duration == timedelta(minutes=15) +def test_period_union(): + now = datetime.now(timezone.utc) + + # Events overlapping + events1 = [Event(timestamp=now, duration=timedelta(seconds=10))] + events2 = [Event(timestamp=now + timedelta(seconds=9), duration=timedelta(seconds=10))] + unioned_events = period_union(events1, events2) + assert len(unioned_events) == 1 + + # Events adjacent but not overlapping + events1 = [Event(timestamp=now, duration=timedelta(seconds=10))] + events2 = [Event(timestamp=now + timedelta(seconds=10), duration=timedelta(seconds=10))] + unioned_events = period_union(events1, events2) + assert len(unioned_events) == 1 + + # Events not overlapping or adjacent + events1 = [Event(timestamp=now, duration=timedelta(seconds=10))] + events2 = [Event(timestamp=now + timedelta(seconds=11), duration=timedelta(seconds=10))] + unioned_events = period_union(events1, events2) + assert len(unioned_events) == 2 + + def test_sort_by_timestamp(): now = datetime.now(timezone.utc) events = [] From cede270f9be569e0b034edaa98e8469e8d0c2283 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Tue, 26 Jun 2018 12:02:16 +0200 Subject: [PATCH 2/9] added period_union to query functions --- aw_analysis/query2_functions.py | 8 ++++++++ aw_core/timeperiod.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/aw_analysis/query2_functions.py b/aw_analysis/query2_functions.py index 8c6ec76a..eec2120b 100644 --- a/aw_analysis/query2_functions.py +++ b/aw_analysis/query2_functions.py @@ -11,6 +11,7 @@ filter_period_intersect, filter_keyvals, filter_keyvals_regex, + period_union, merge_events_by_keys, chunk_events_by_key, sort_by_timestamp, @@ -153,6 +154,13 @@ def q2_filter_period_intersect(events: list, filterevents: list) -> List[Event]: return filter_period_intersect(events, filterevents) +@q2_function +def q2_period_union(events1: list, events2: list) -> List[Event]: + _verify_variable_is_type(events1, list) + _verify_variable_is_type(events2, list) + return period_union(events1, events2) + + @q2_function def q2_limit_events(events: list, count: int) -> List[Event]: _verify_variable_is_type(events, list) diff --git a/aw_core/timeperiod.py b/aw_core/timeperiod.py index 42bdb5a4..14b97383 100644 --- a/aw_core/timeperiod.py +++ b/aw_core/timeperiod.py @@ -72,4 +72,4 @@ def union(self, other: "TimePeriod") -> "TimePeriod": if not self.gap(other): return TimePeriod(min(self.start, other.start), max(self.end, other.end)) else: - raise Exception("TimePeriods must be overlapping or adjacent to be unioned") + raise Exception("TimePeriods must not have a gap if they are to be unioned") From 6baff8de212c21ffa32e0551207aac3f53f1b841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Tue, 26 Jun 2018 12:05:24 +0200 Subject: [PATCH 3/9] removed unused function --- aw_transform/filter_period_intersect.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/aw_transform/filter_period_intersect.py b/aw_transform/filter_period_intersect.py index 269ab85e..7b656f4c 100644 --- a/aw_transform/filter_period_intersect.py +++ b/aw_transform/filter_period_intersect.py @@ -22,13 +22,6 @@ def _replace_event_period(event: Event, period: TimePeriod) -> Event: return e -def _earliest_event(e1, e2): - if e1.timestamp <= e2.timestamp: - return e1 - else: - return e2 - - def _concurrent_eventpairs(events1, events2) -> Iterable[Tuple[Event, Event]]: e1_i = 0 e2_i = 0 From 7638841c29040dd00a31daf3df54c711351d54df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Tue, 26 Jun 2018 12:09:34 +0200 Subject: [PATCH 4/9] fixed logging message --- aw_transform/filter_period_intersect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aw_transform/filter_period_intersect.py b/aw_transform/filter_period_intersect.py index 7b656f4c..d64c4b96 100644 --- a/aw_transform/filter_period_intersect.py +++ b/aw_transform/filter_period_intersect.py @@ -48,7 +48,7 @@ def _concurrent_eventpairs(events1, events2) -> Iterable[Tuple[Event, Event]]: # Event started after filter event ended e2_i += 1 else: - logger.warning("Should be reachable, skipping period") + logger.error("Should be unreachable, skipping period") e1_i += 1 e2_i += 1 From ae4fcfbc79e8f3b7ccea07826ce73ef5f9d451f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Wed, 27 Jun 2018 11:01:08 +0200 Subject: [PATCH 5/9] added __lt__ to Event and TimePeriod, cleaned up tests, improved documentation, improved typing --- aw_core/models.py | 29 +++++++++------ aw_core/timeperiod.py | 16 +++++++-- aw_transform/filter_period_intersect.py | 47 +++++++++++++++---------- tests/test_event.py | 29 +++++++++------ tests/test_timeperiod.py | 2 +- 5 files changed, 80 insertions(+), 43 deletions(-) diff --git a/aw_core/models.py b/aw_core/models.py index 3ccb9bce..e38e648d 100644 --- a/aw_core/models.py +++ b/aw_core/models.py @@ -53,10 +53,19 @@ def __init__(self, id: Id = None, timestamp: ConvertableTimestamp = None, self.duration = duration self.data = data - def __eq__(self, other): - return self.timestamp == other.timestamp\ - and self.duration == other.duration\ - and self.data == other.data + def __eq__(self, other: object) -> bool: + if isinstance(other, Event): + return self.timestamp == other.timestamp \ + and self.duration == other.duration \ + and self.data == other.data + else: + raise TypeError("operator not supported between instances of '{}' and '{}'".format(type(self), type(other))) + + def __lt__(self, other: object) -> bool: + if isinstance(other, Event): + return self.timestamp < other.timestamp + else: + raise TypeError("operator not supported between instances of '{}' and '{}'".format(type(self), type(other))) def to_json_dict(self) -> dict: """Useful when sending data over the wire. @@ -70,17 +79,17 @@ def to_json_str(self) -> str: data = self.to_json_dict() return json.dumps(data) - def _hasprop(self, propname): + def _hasprop(self, propname: str) -> bool: """Badly named, but basically checks if the underlying dict has a prop, and if it is a non-empty list""" return propname in self and self[propname] is not None @property - def id(self) -> Any: + def id(self) -> Id: return self["id"] if self._hasprop("id") else None @id.setter - def id(self, id: Any): + def id(self, id: Id) -> None: self["id"] = id @property @@ -88,7 +97,7 @@ def data(self) -> dict: return self["data"] if self._hasprop("data") else {} @data.setter - def data(self, data: dict): + def data(self, data: dict) -> None: self["data"] = data @property @@ -105,9 +114,9 @@ def duration(self) -> timedelta: @duration.setter def duration(self, duration: Duration) -> None: - if type(duration) == timedelta: + if isinstance(duration, timedelta): self["duration"] = duration elif isinstance(duration, numbers.Real): self["duration"] = timedelta(seconds=duration) # type: ignore else: - logger.error("Couldn't parse duration of invalid type {}".format(type(duration))) + raise TypeError("Couldn't parse duration of invalid type {}".format(type(duration))) diff --git a/aw_core/timeperiod.py b/aw_core/timeperiod.py index 14b97383..9cc0e165 100644 --- a/aw_core/timeperiod.py +++ b/aw_core/timeperiod.py @@ -15,20 +15,23 @@ def duration(self) -> timedelta: return self.end - self.start def overlaps(self, other: "TimePeriod") -> bool: - """Checks if this event is overlapping partially or entirely with another event""" + """Checks if this timeperiod is overlapping partially or entirely with another timeperiod""" return self.start < other.start < self.end \ or self.start < other.end < self.end \ or other.start < self.start and self.end < other.end \ or self == other + def intersects(self, other: "TimePeriod") -> bool: + return self.overlaps(other) + def contains(self, other: Union[datetime, "TimePeriod"]) -> bool: - """Checks if this event contains the entirety of another event""" + """Checks if this timeperiod contains the entirety of another timeperiod or a datetime""" if isinstance(other, TimePeriod): return self.start <= other.start and other.end <= self.end elif isinstance(other, datetime): return self.start <= other <= self.end else: - raise ValueError("argument of invalid type '{}'".format(type(other))) + raise TypeError("argument of invalid type '{}'".format(type(other))) def __contains__(self, other: Union[datetime, "TimePeriod"]) -> bool: return self.contains(other) @@ -39,6 +42,13 @@ def __eq__(self, other: object) -> bool: else: return False + def __lt__(self, other: object) -> bool: + # implemented to easily allow sorting of a list of timeperiods + if isinstance(other, TimePeriod): + return self.start < other.start + else: + raise TypeError("operator not supported between instaces of '{}' and '{}'".format(type(self), type(other))) + def intersection(self, other: "TimePeriod") -> Optional["TimePeriod"]: """Returns the timeperiod contained in both periods""" # https://stackoverflow.com/posts/3721426/revisions diff --git a/aw_transform/filter_period_intersect.py b/aw_transform/filter_period_intersect.py index d64c4b96..949fec1b 100644 --- a/aw_transform/filter_period_intersect.py +++ b/aw_transform/filter_period_intersect.py @@ -9,7 +9,6 @@ def _get_event_period(event: Event) -> TimePeriod: - # TODO: Better parsing of event duration start = event.timestamp end = start + event.duration return TimePeriod(start, end) @@ -22,7 +21,8 @@ def _replace_event_period(event: Event, period: TimePeriod) -> Event: return e -def _concurrent_eventpairs(events1, events2) -> Iterable[Tuple[Event, Event]]: +def _intersecting_eventpairs(events1: List[Event], events2: List[Event]) -> Iterable[Tuple[Event, Event, TimePeriod]]: + """A generator that yields each overlapping pair of events from two eventlists along with a TimePeriod of the intersection""" e1_i = 0 e2_i = 0 while e1_i < len(events1) and e2_i < len(events2): @@ -33,8 +33,8 @@ def _concurrent_eventpairs(events1, events2) -> Iterable[Tuple[Event, Event]]: ip = e1_p.intersection(e2_p) if ip: - # If events intersected, add event with intersected duration and try next event - yield (e1, e2) + # If events intersected, yield events + yield (e1, e2, ip) if e1_p.end <= e2_p.end: e1_i += 1 else: @@ -61,30 +61,37 @@ def filter_period_intersect(events: List[Event], filterevents: List[Event]) -> L Useful for example when you want to filter away events or part of events during which a user was AFK. - Example: + Usage: windowevents_notafk = filter_period_intersect(windowevents, notafkevents) + Example: + events1 | ======= ======== | + events2 | ------ --- --- ---- | + result | ==== = ==== | + A JavaScript version used to exist in aw-webui but was removed in `this PR `_. """ - events = sorted(events, key=lambda e: e.timestamp) - filterevents = sorted(filterevents, key=lambda e: e.timestamp) - filtered_events = [] + events = sorted(events) + filterevents = sorted(filterevents) - for (e1, e2) in _concurrent_eventpairs(events, filterevents): - e1_p = _get_event_period(e1) - e2_p = _get_event_period(e2) + return [_replace_event_period(e1, ip) for (e1, _, ip) in _intersecting_eventpairs(events, filterevents)] - ip = e1_p.intersection(e2_p) - if ip: - # If events intersected, add event with intersected duration - filtered_events.append(_replace_event_period(e1, ip)) - return filtered_events +def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: + """ + Takes a list of two events and returns a new list of events covering the union + of the timeperiods contained in the eventlists with no overlapping events. + WARNING: This function gives no guarantees about what will end up in the data + attribute of returned events, only use it when the event data is irrelevant. -def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: - events = sorted(events1 + events2, key=lambda e: e.timestamp) + Example: + events1 | ======= ========= | + events2 | ------ --- -- ---- | + result | ----------- -- ========= | + """ + events = sorted(events1 + events2) merged_events = [] if events: merged_events.append(events.pop(0)) @@ -100,3 +107,7 @@ def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: else: merged_events.append(e) return merged_events + + +def union(events1: List[Event], events2: List[Event]) -> List[Event]: + return sorted(events1 + events2) diff --git a/tests/test_event.py b/tests/test_event.py index b61a9f0c..1b53991d 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -1,20 +1,27 @@ from datetime import datetime, timedelta, timezone import json -import logging from aw_core.models import Event -import unittest - valid_timestamp="1937-01-01T12:00:27.87+00:20" +now = datetime.now(timezone.utc) +td1s = timedelta(seconds=1) + + +def test_create(): + Event(timestamp=now, duration=timedelta(hours=13, minutes=37), data={"key": "val"}) + Event(timestamp=valid_timestamp, duration=timedelta(hours=13, minutes=37), data={"key": "val"}) + + +def test_json_serialization(): + e = Event(timestamp=now, duration=timedelta(hours=13, minutes=37), data={"key": "val"}) + assert e == Event(**json.loads(e.to_json_str())) -class EventTest(unittest.TestCase): - def test_create(self): - Event(timestamp=datetime.now(timezone.utc), duration=timedelta(hours=13, minutes=37), data={"key": "val"}) - def test_json_serialization(self): - e = Event(timestamp=datetime.now(timezone.utc), duration=timedelta(hours=13, minutes=37), data={"key": "val"}) - json_str = e.to_json_str() - logging.error(json_str) - assert e == Event(**json.loads(json_str)) +def test_sort(): + e1 = Event(timestamp=now) + e2 = Event(timestamp=now + td1s) + e_sorted = sorted([e2, e1]) + assert e_sorted[0] == e1 + assert e_sorted[1] == e2 diff --git a/tests/test_timeperiod.py b/tests/test_timeperiod.py index 61fc0f19..b0eab2e9 100644 --- a/tests/test_timeperiod.py +++ b/tests/test_timeperiod.py @@ -53,7 +53,7 @@ def test_contains(): assert tp2 in tp1 assert tp1 not in tp2 - with pytest.raises(ValueError): + with pytest.raises(TypeError): assert 0 in tp1 From f2ffe81cf1e64e485c53577473608a8a8ec74a30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Wed, 27 Jun 2018 11:31:53 +0200 Subject: [PATCH 6/9] renamed @lundibundi's period_union to union, cleared inconsistent data in my period_union --- aw_transform/__init__.py | 2 +- aw_transform/filter_period_intersect.py | 49 ++++++++++++++++++++++-- aw_transform/period_union.py | 50 ------------------------- tests/test_transforms.py | 10 ++--- 4 files changed, 52 insertions(+), 59 deletions(-) delete mode 100644 aw_transform/period_union.py diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index 17782177..d7e96d90 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -1,5 +1,5 @@ from .filter_keyvals import filter_keyvals, filter_keyvals_regex -from .filter_period_intersect import filter_period_intersect, period_union +from .filter_period_intersect import filter_period_intersect, period_union, union from .heartbeats import heartbeat_merge, heartbeat_reduce from .merge_events_by_keys import merge_events_by_keys from .chunk_events_by_key import chunk_events_by_key diff --git a/aw_transform/filter_period_intersect.py b/aw_transform/filter_period_intersect.py index 949fec1b..82a51f0b 100644 --- a/aw_transform/filter_period_intersect.py +++ b/aw_transform/filter_period_intersect.py @@ -83,8 +83,7 @@ def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: Takes a list of two events and returns a new list of events covering the union of the timeperiods contained in the eventlists with no overlapping events. - WARNING: This function gives no guarantees about what will end up in the data - attribute of returned events, only use it when the event data is irrelevant. + WARNING: This function strips all data from events as it cannot keep it consistent. Example: events1 | ======= ========= | @@ -106,8 +105,52 @@ def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: merged_events[-1] = _replace_event_period(last_event, new_period) else: merged_events.append(e) + for event in merged_events: + # Clear data + event.data = {} return merged_events def union(events1: List[Event], events2: List[Event]) -> List[Event]: - return sorted(events1 + events2) + """ + Concatenates and sorts union of 2 event lists and removes duplicates. + + Example that merges events from a backup-bucket with events from a "living" bucket: + events = union(events_backup, events_living) + """ + + events1 = sorted(events1, key=lambda e: (e.timestamp, e.duration)) + events2 = sorted(events2, key=lambda e: (e.timestamp, e.duration)) + events_union = [] + + e1_i = 0 + e2_i = 0 + while e1_i < len(events1) and e2_i < len(events2): + e1 = events1[e1_i] + e2 = events2[e2_i] + + if e1 == e2: + events_union.append(e1) + e1_i += 1 + e2_i += 1 + else: + if e1.timestamp < e2.timestamp: + events_union.append(e1) + e1_i += 1 + elif e1.timestamp > e2.timestamp: + events_union.append(e2) + e2_i += 1 + elif e1.duration < e2.duration: + events_union.append(e1) + e1_i += 1 + else: + events_union.append(e2) + e2_i += 1 + + if e1_i < len(events1): + events_union.extend(events1[e1_i:]) + + if e2_i < len(events2): + events_union.extend(events2[e2_i:]) + + return events_union diff --git a/aw_transform/period_union.py b/aw_transform/period_union.py deleted file mode 100644 index 11b54dd3..00000000 --- a/aw_transform/period_union.py +++ /dev/null @@ -1,50 +0,0 @@ -import logging -from typing import List - -from aw_core.models import Event - -logger = logging.getLogger(__name__) - - -def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: - """ - Creates union of 2 event lists removing duplicates - Example: - windowevents_and_notafk = period_union(windowevents, notafkevents) - """ - - events1 = sorted(events1, key=lambda e: (e.timestamp, e.duration)) - events2 = sorted(events2, key=lambda e: (e.timestamp, e.duration)) - events_union = [] - - e1_i = 0 - e2_i = 0 - while e1_i < len(events1) and e2_i < len(events2): - e1 = events1[e1_i] - e2 = events2[e2_i] - - if e1 == e2: - events_union.append(e1) - e1_i += 1 - e2_i += 1 - else: - if e1.timestamp < e2.timestamp: - events_union.append(e1) - e1_i += 1 - elif e1.timestamp > e2.timestamp: - events_union.append(e2) - e2_i += 1 - elif e1.duration < e2.duration: - events_union.append(e1) - e1_i += 1 - else: - events_union.append(e2) - e2_i += 1 - - if e1_i < len(events1): - events_union.extend(events1[e1_i:]) - - if e2_i < len(events2): - events_union.extend(events2[e2_i:]) - - return events_union diff --git a/tests/test_transforms.py b/tests/test_transforms.py index c3d7a696..9b0c87d4 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -13,7 +13,7 @@ chunk_events_by_key, split_url_events, simplify_string, - period_union + union, ) @@ -253,7 +253,7 @@ def test_url_parse_event(): assert result[0].data["identifier"] == "" -def test_period_union(): +def test_union(): now = datetime.now(timezone.utc) e1 = Event(timestamp=now - timedelta(seconds=20), duration=timedelta(seconds=5)) @@ -262,7 +262,7 @@ def test_period_union(): e4 = Event(timestamp=now + timedelta(seconds=20), duration=timedelta(seconds=1)) # union separate event lists with duplicates - events_union = period_union([e1, e2, e4], [e2, e3]) + events_union = union([e1, e2, e4], [e2, e3]) assert events_union == [e1, e2, e3, e4] e1 = Event(timestamp=now - timedelta(seconds=20), duration=timedelta(seconds=5)) @@ -272,7 +272,7 @@ def test_period_union(): e5 = Event(timestamp=now, duration=timedelta(seconds=10)) # union event lists with intersecting duplicates - events_union = period_union([e3, e2, e5], [e1, e3, e4, e5]) + events_union = union([e3, e2, e5], [e1, e3, e4, e5]) assert events_union == [e1, e2, e3, e4, e5] e1 = Event(timestamp=now - timedelta(seconds=30), duration=timedelta(seconds=15)) @@ -281,5 +281,5 @@ def test_period_union(): e4 = Event(timestamp=now, duration=timedelta(seconds=10)) # union event lists with same timestamp but different duration duplicates - events_union = period_union([e1, e2, e4], [e3, e2, e1]) + events_union = union([e1, e2, e4], [e3, e2, e1]) assert events_union == [e1, e2, e3, e4] From 0893935b1e9aeb7a833ecddf9f7e8c32294e9d28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Wed, 27 Jun 2018 11:41:35 +0200 Subject: [PATCH 7/9] fixed TimePeriod.overlaps --- aw_core/timeperiod.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/aw_core/timeperiod.py b/aw_core/timeperiod.py index 9cc0e165..777e09b1 100644 --- a/aw_core/timeperiod.py +++ b/aw_core/timeperiod.py @@ -6,7 +6,9 @@ class TimePeriod: # Inspired by: http://www.codeproject.com/Articles/168662/Time-Period-Library-for-NET # TODO: Move to its own package def __init__(self, start: datetime, end: datetime) -> None: - # assert start <= end + # TODO: Introduce once tested in production (where negative duration events might occur) + # if start > end: + # raise ValueError("TimePeriod cannot have negative duration, start '{}' came after end '{}'".format(start, end)) self.start = start self.end = end @@ -16,12 +18,12 @@ def duration(self) -> timedelta: def overlaps(self, other: "TimePeriod") -> bool: """Checks if this timeperiod is overlapping partially or entirely with another timeperiod""" - return self.start < other.start < self.end \ - or self.start < other.end < self.end \ - or other.start < self.start and self.end < other.end \ - or self == other + return self.start <= other.start < self.end \ + or self.start < other.end <= self.end \ + or self in other def intersects(self, other: "TimePeriod") -> bool: + """Alias for overlaps""" return self.overlaps(other) def contains(self, other: Union[datetime, "TimePeriod"]) -> bool: From 54abea3ce949d3c4da7b4d789e56096af20dc68f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 2 Jul 2018 11:20:56 +0200 Subject: [PATCH 8/9] made docstring less confusing --- aw_transform/filter_period_intersect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aw_transform/filter_period_intersect.py b/aw_transform/filter_period_intersect.py index 82a51f0b..ded0f8b3 100644 --- a/aw_transform/filter_period_intersect.py +++ b/aw_transform/filter_period_intersect.py @@ -86,9 +86,9 @@ def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: WARNING: This function strips all data from events as it cannot keep it consistent. Example: - events1 | ======= ========= | + events1 | ------- --------- | events2 | ------ --- -- ---- | - result | ----------- -- ========= | + result | ----------- -- --------- | """ events = sorted(events1 + events2) merged_events = [] From ffd5653b20b46a2179b5718e26d66a0b3c024abb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 2 Jul 2018 11:21:13 +0200 Subject: [PATCH 9/9] simplified TimePeriod.gap --- aw_core/timeperiod.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/aw_core/timeperiod.py b/aw_core/timeperiod.py index 777e09b1..7da9ac48 100644 --- a/aw_core/timeperiod.py +++ b/aw_core/timeperiod.py @@ -74,11 +74,12 @@ def adjacent(self, other: "TimePeriod") -> bool: def gap(self, other: "TimePeriod") -> Optional["TimePeriod"]: """If periods are separated by a non-zero gap, return the gap as a new timeperiod, else None""" - if not (self.overlaps(other) or self.adjacent(other)): - gap_start = min(self.end, other.end) - gap_end = max(self.start, other.start) - return TimePeriod(gap_start, gap_end) - return None + if self.end < other.start: + return TimePeriod(self.end, other.start) + elif other.end < self.start: + return TimePeriod(other.end, self.start) + else: + return None def union(self, other: "TimePeriod") -> "TimePeriod": if not self.gap(other):