Skip to content
8 changes: 8 additions & 0 deletions aw_analysis/query2_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
filter_period_intersect,
filter_keyvals,
filter_keyvals_regex,
period_union,
merge_events_by_keys,
chunk_events_by_key,
sort_by_timestamp,
Expand Down Expand Up @@ -153,6 +154,13 @@ def q2_filter_period_intersect(events: list, filterevents: list) -> List[Event]:
return filter_period_intersect(events, filterevents)


@q2_function
def q2_period_union(events1: list, events2: list) -> List[Event]:
_verify_variable_is_type(events1, list)
_verify_variable_is_type(events2, list)
return period_union(events1, events2)


@q2_function
def q2_limit_events(events: list, count: int) -> List[Event]:
_verify_variable_is_type(events, list)
Expand Down
29 changes: 19 additions & 10 deletions aw_core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,19 @@ def __init__(self, id: Id = None, timestamp: ConvertableTimestamp = None,
self.duration = duration
self.data = data

def __eq__(self, other):
return self.timestamp == other.timestamp\
and self.duration == other.duration\
and self.data == other.data
def __eq__(self, other: object) -> bool:
if isinstance(other, Event):
return self.timestamp == other.timestamp \
and self.duration == other.duration \
and self.data == other.data
else:
raise TypeError("operator not supported between instances of '{}' and '{}'".format(type(self), type(other)))

def __lt__(self, other: object) -> bool:
if isinstance(other, Event):
return self.timestamp < other.timestamp
else:
raise TypeError("operator not supported between instances of '{}' and '{}'".format(type(self), type(other)))

def to_json_dict(self) -> dict:
"""Useful when sending data over the wire.
Expand All @@ -70,25 +79,25 @@ def to_json_str(self) -> str:
data = self.to_json_dict()
return json.dumps(data)

def _hasprop(self, propname):
def _hasprop(self, propname: str) -> bool:
"""Badly named, but basically checks if the underlying
dict has a prop, and if it is a non-empty list"""
return propname in self and self[propname] is not None

@property
def id(self) -> Any:
def id(self) -> Id:
return self["id"] if self._hasprop("id") else None

@id.setter
def id(self, id: Any):
def id(self, id: Id) -> None:
self["id"] = id

@property
def data(self) -> dict:
return self["data"] if self._hasprop("data") else {}

@data.setter
def data(self, data: dict):
def data(self, data: dict) -> None:
self["data"] = data

@property
Expand All @@ -105,9 +114,9 @@ def duration(self) -> timedelta:

@duration.setter
def duration(self, duration: Duration) -> None:
if type(duration) == timedelta:
if isinstance(duration, timedelta):
self["duration"] = duration
elif isinstance(duration, numbers.Real):
self["duration"] = timedelta(seconds=duration) # type: ignore
else:
logger.error("Couldn't parse duration of invalid type {}".format(type(duration)))
raise TypeError("Couldn't parse duration of invalid type {}".format(type(duration)))
52 changes: 46 additions & 6 deletions aw_core/timeperiod.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ class TimePeriod:
# Inspired by: http://www.codeproject.com/Articles/168662/Time-Period-Library-for-NET
# TODO: Move to its own package
def __init__(self, start: datetime, end: datetime) -> None:
# TODO: Introduce once tested in production (where negative duration events might occur)
# if start > end:
# raise ValueError("TimePeriod cannot have negative duration, start '{}' came after end '{}'".format(start, end))
self.start = start
self.end = end

Expand All @@ -14,24 +17,42 @@ def duration(self) -> timedelta:
return self.end - self.start

def overlaps(self, other: "TimePeriod") -> bool:
# Checks if this event is overlapping partially with another event
return self.start < other.start < self.end \
or self.start < other.end < self.end \
or other.start < self.start and self.end < other.end
"""Checks if this timeperiod is overlapping partially or entirely with another timeperiod"""
return self.start <= other.start < self.end \
or self.start < other.end <= self.end \
or self in other

def intersects(self, other: "TimePeriod") -> bool:
"""Alias for overlaps"""
return self.overlaps(other)

def contains(self, other: Union[datetime, "TimePeriod"]) -> bool:
# Checks if this event contains the entirety of another event
"""Checks if this timeperiod contains the entirety of another timeperiod or a datetime"""
if isinstance(other, TimePeriod):
return self.start <= other.start and other.end <= self.end
elif isinstance(other, datetime):
return self.start <= other <= self.end
else:
raise ValueError("argument of invalid type '{}'".format(type(other)))
raise TypeError("argument of invalid type '{}'".format(type(other)))

def __contains__(self, other: Union[datetime, "TimePeriod"]) -> bool:
return self.contains(other)

def __eq__(self, other: object) -> bool:
if isinstance(other, TimePeriod):
return self.start == other.start and self.end == other.end
else:
return False

def __lt__(self, other: object) -> bool:
# implemented to easily allow sorting of a list of timeperiods
if isinstance(other, TimePeriod):
return self.start < other.start
else:
raise TypeError("operator not supported between instaces of '{}' and '{}'".format(type(self), type(other)))

def intersection(self, other: "TimePeriod") -> Optional["TimePeriod"]:
"""Returns the timeperiod contained in both periods"""
# https://stackoverflow.com/posts/3721426/revisions
if self.contains(other):
# Entirety of other is within self
Expand All @@ -46,3 +67,22 @@ def intersection(self, other: "TimePeriod") -> Optional["TimePeriod"]:
# Entirety of self is within other
return self
return None

def adjacent(self, other: "TimePeriod") -> bool:
"""Iff timeperiods are exactly next to each other, return True."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo 'Iff'.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return self.start == other.end or self.end == other.start

def gap(self, other: "TimePeriod") -> Optional["TimePeriod"]:
"""If periods are separated by a non-zero gap, return the gap as a new timeperiod, else None"""
if self.end < other.start:
return TimePeriod(self.end, other.start)
elif other.end < self.start:
return TimePeriod(other.end, self.start)
else:
return None

def union(self, other: "TimePeriod") -> "TimePeriod":
if not self.gap(other):
return TimePeriod(min(self.start, other.start), max(self.end, other.end))
else:
raise Exception("TimePeriods must not have a gap if they are to be unioned")
3 changes: 1 addition & 2 deletions aw_transform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .filter_keyvals import filter_keyvals, filter_keyvals_regex
from .filter_period_intersect import filter_period_intersect
from .period_union import period_union
from .filter_period_intersect import filter_period_intersect, period_union, union
from .heartbeats import heartbeat_merge, heartbeat_reduce
from .merge_events_by_keys import merge_events_by_keys
from .chunk_events_by_key import chunk_events_by_key
Expand Down
148 changes: 116 additions & 32 deletions aw_transform/filter_period_intersect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List
from typing import List, Iterable, Tuple
from copy import deepcopy

from aw_core.models import Event
Expand All @@ -9,7 +9,6 @@


def _get_event_period(event: Event) -> TimePeriod:
# TODO: Better parsing of event duration
start = event.timestamp
end = start + event.duration
return TimePeriod(start, end)
Expand All @@ -22,6 +21,38 @@ def _replace_event_period(event: Event, period: TimePeriod) -> Event:
return e


def _intersecting_eventpairs(events1: List[Event], events2: List[Event]) -> Iterable[Tuple[Event, Event, TimePeriod]]:
"""A generator that yields each overlapping pair of events from two eventlists along with a TimePeriod of the intersection"""
e1_i = 0
e2_i = 0
while e1_i < len(events1) and e2_i < len(events2):
e1 = events1[e1_i]
e2 = events2[e2_i]
e1_p = _get_event_period(e1)
e2_p = _get_event_period(e2)

ip = e1_p.intersection(e2_p)
if ip:
# If events intersected, yield events
yield (e1, e2, ip)
if e1_p.end <= e2_p.end:
e1_i += 1
else:
e2_i += 1
else:
# No intersection, check if event is before/after filterevent
if e1_p.end <= e2_p.start:
# Event ended before filter event started
e1_i += 1
elif e2_p.end <= e1_p.start:
# Event started after filter event ended
e2_i += 1
else:
logger.error("Should be unreachable, skipping period")
e1_i += 1
e2_i += 1


def filter_period_intersect(events: List[Event], filterevents: List[Event]) -> List[Event]:
"""
Filters away all events or time periods of events in which a
Expand All @@ -30,43 +61,96 @@ def filter_period_intersect(events: List[Event], filterevents: List[Event]) -> L
Useful for example when you want to filter away events or
part of events during which a user was AFK.

Example:
Usage:
windowevents_notafk = filter_period_intersect(windowevents, notafkevents)

Example:
events1 | ======= ======== |
events2 | ------ --- --- ---- |
result | ==== = ==== |

A JavaScript version used to exist in aw-webui but was removed in `this PR <https://github.com/ActivityWatch/aw-webui/pull/48>`_.
"""

events = sorted(events, key=lambda e: e.timestamp)
filterevents = sorted(filterevents, key=lambda e: e.timestamp)
filtered_events = []
events = sorted(events)
filterevents = sorted(filterevents)

e_i = 0
f_i = 0
while e_i < len(events) and f_i < len(filterevents):
event = events[e_i]
filterevent = filterevents[f_i]
ep = _get_event_period(event)
fp = _get_event_period(filterevent)
return [_replace_event_period(e1, ip) for (e1, _, ip) in _intersecting_eventpairs(events, filterevents)]

ip = ep.intersection(fp)
if ip:
# If events itersected, add event with intersected duration and try next event
filtered_events.append(_replace_event_period(event, ip))
if ep.end <= fp.end:
e_i += 1
else:
f_i += 1

def period_union(events1: List[Event], events2: List[Event]) -> List[Event]:
"""
Takes a list of two events and returns a new list of events covering the union
of the timeperiods contained in the eventlists with no overlapping events.

WARNING: This function strips all data from events as it cannot keep it consistent.

Example:
events1 | ------- --------- |
events2 | ------ --- -- ---- |
result | ----------- -- --------- |
"""
events = sorted(events1 + events2)
merged_events = []
if events:
merged_events.append(events.pop(0))
for e in events:
last_event = merged_events[-1]

e_p = _get_event_period(e)
le_p = _get_event_period(last_event)

if not e_p.gap(le_p):
new_period = e_p.union(le_p)
merged_events[-1] = _replace_event_period(last_event, new_period)
else:
# No intersection, check if event is before/after filterevent
if ep.end <= fp.start:
# Event ended before filter event started
e_i += 1
elif fp.end <= ep.start:
# Event started after filter event ended
f_i += 1
merged_events.append(e)
for event in merged_events:
# Clear data
event.data = {}
return merged_events


def union(events1: List[Event], events2: List[Event]) -> List[Event]:
"""
Concatenates and sorts union of 2 event lists and removes duplicates.

Example that merges events from a backup-bucket with events from a "living" bucket:
events = union(events_backup, events_living)
"""

events1 = sorted(events1, key=lambda e: (e.timestamp, e.duration))
events2 = sorted(events2, key=lambda e: (e.timestamp, e.duration))
events_union = []

e1_i = 0
e2_i = 0
while e1_i < len(events1) and e2_i < len(events2):
e1 = events1[e1_i]
e2 = events2[e2_i]

if e1 == e2:
events_union.append(e1)
e1_i += 1
e2_i += 1
else:
if e1.timestamp < e2.timestamp:
events_union.append(e1)
e1_i += 1
elif e1.timestamp > e2.timestamp:
events_union.append(e2)
e2_i += 1
elif e1.duration < e2.duration:
events_union.append(e1)
e1_i += 1
else:
logger.warning("Unclear if/how this could be reachable, skipping period")
e_i += 1
f_i += 1
events_union.append(e2)
e2_i += 1

if e1_i < len(events1):
events_union.extend(events1[e1_i:])

if e2_i < len(events2):
events_union.extend(events2[e2_i:])

return filtered_events
return events_union
Loading