diff --git a/aw_datastore/datastore.py b/aw_datastore/datastore.py index 85c2f2c0..fa9e68a7 100644 --- a/aw_datastore/datastore.py +++ b/aw_datastore/datastore.py @@ -87,7 +87,7 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]: """ last_event_list = self.get(1) last_event = None - if len(last_event_list) > 0: + if last_event_list: last_event = last_event_list[0] """ @@ -103,9 +103,9 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]: inserted = self.ds.storage_strategy.insert_one(self.bucket_id, events) #assert inserted elif isinstance(events, list): - if len(events) > 0: + if events: oldest_event = sorted(events, key=lambda k: k['timestamp'])[0] - else: + else: # pragma: no cover oldest_event = None for event in events: if event.timestamp + event.duration > now: @@ -117,7 +117,7 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]: # Warn if timestamp is older than last event """ if last_event and oldest_event: - if oldest_event.timestamp < last_event.timestamp: + if oldest_event.timestamp < last_event.timestamp: # pragma: no cover self.logger.warning("Inserting event that has a older timestamp than previous event!" + "\nPrevious:" + str(last_event) + "\nInserted:" + str(oldest_event)) diff --git a/aw_datastore/storages/abstract.py b/aw_datastore/storages/abstract.py index 330ed15b..667add54 100644 --- a/aw_datastore/storages/abstract.py +++ b/aw_datastore/storages/abstract.py @@ -36,11 +36,11 @@ def get_metadata(self, bucket_id: str) -> dict: @abstractmethod def get_events(self, bucket_id: str, limit: int, - starttime: Optional[datetime]=None, endtime: Optional[datetime]=None) -> List[Event]: + starttime: Optional[datetime] = None, endtime: Optional[datetime] = None) -> List[Event]: raise NotImplementedError def get_eventcount(self, bucket_id: str, - starttime: Optional[datetime]=None, endtime: Optional[datetime]=None) -> int: + starttime: Optional[datetime] = None, endtime: Optional[datetime] = None) -> int: raise NotImplementedError @abstractmethod diff --git a/aw_datastore/storages/memory.py b/aw_datastore/storages/memory.py index 691c7257..c62ba131 100644 --- a/aw_datastore/storages/memory.py +++ b/aw_datastore/storages/memory.py @@ -1,7 +1,7 @@ -import logging import sys import copy from datetime import datetime +from typing import List, Dict from aw_core.models import Event @@ -13,7 +13,7 @@ class MemoryStorage(AbstractStorage): """For storage of data in-memory, useful primarily in testing""" sid = "memory" - def __init__(self, testing): + def __init__(self, testing: bool) -> None: self.logger = logger.getChild(self.sid) # self.logger.warning("Using in-memory storage, any events stored will not be persistent and will be lost when server is shut down. Use the --storage parameter to set a different storage method.") self.db = {} # type: Dict[str, List[Event]] @@ -37,6 +37,8 @@ def delete_bucket(self, bucket_id: str) -> None: del self.db[bucket_id] if bucket_id in self._metadata: del self._metadata[bucket_id] + else: + raise Exception("Bucket did not exist, could not delete") def buckets(self): buckets = dict() @@ -45,7 +47,7 @@ def buckets(self): return buckets def get_events(self, bucket: str, limit: int, - starttime: datetime=None, endtime: datetime=None): + starttime: datetime=None, endtime: datetime=None) -> List[Event]: events = self.db[bucket] # Sort by timestamp events = sorted(events, key=lambda k: k['timestamp'])[::-1] @@ -72,11 +74,16 @@ def get_events(self, bucket: str, limit: int, return copy.deepcopy(events) def get_eventcount(self, bucket: str, - starttime: datetime=None, endtime: datetime=None): - return len(self.db[bucket]) + starttime: datetime=None, endtime: datetime=None) -> int: + return len([e for e in self.db[bucket] if + (not starttime or starttime <= e.timestamp) and + (not endtime or e.timestamp <= endtime)]) def get_metadata(self, bucket_id: str): - return self._metadata[bucket_id] + if bucket_id in self._metadata: + return self._metadata[bucket_id] + else: + raise Exception('Bucket did not exist, could not get metadata') def insert_one(self, bucket: str, event: Event) -> Event: self.db[bucket].append(Event(**event)) @@ -84,11 +91,10 @@ def insert_one(self, bucket: str, event: Event) -> Event: return event def delete(self, bucket_id, event_id): - if len(self.db[bucket_id]) >= event_id: - self.db[bucket_id].pop(event_id) + for idx in (idx for idx, event in reversed(list(enumerate(self.db[bucket_id]))) if event.id == event_id): + self.db[bucket_id].pop(idx) return True - else: - return False + return False def replace(self, bucket_id, event_id, event): self.db[bucket_id][event_id] = event diff --git a/aw_datastore/storages/mongodb.py b/aw_datastore/storages/mongodb.py index 97b4039c..fe67db4d 100644 --- a/aw_datastore/storages/mongodb.py +++ b/aw_datastore/storages/mongodb.py @@ -46,8 +46,13 @@ def create_bucket(self, bucket_id: str, type_id: str, client: str, hostname: str self.db[bucket_id]["metadata"].insert_one(metadata) def delete_bucket(self, bucket_id: str) -> None: - self.db[bucket_id]["events"].drop() - self.db[bucket_id]["metadata"].drop() + print(self.db.collection_names()) + if bucket_id + ".metadata" in self.db.collection_names(): + self.db[bucket_id]["events"].drop() + self.db[bucket_id]["metadata"].drop() + else: + # TODO: Create custom exception + raise Exception('Bucket did not exist, could not delete') def buckets(self) -> Dict[str, dict]: bucketnames = set() @@ -63,7 +68,9 @@ def get_metadata(self, bucket_id: str) -> dict: metadata = self.db[bucket_id]["metadata"].find_one({"_id": "metadata"}) if metadata: del metadata["_id"] - return metadata + return metadata + else: + raise Exception('Bucket did not exist, could not get metadata') def get_events(self, bucket_id: str, limit: int, starttime: Optional[datetime] = None, endtime: Optional[datetime] = None): @@ -91,7 +98,7 @@ def get_events(self, bucket_id: str, limit: int, return events def get_eventcount(self, bucket_id: str, - starttime: Optional[datetime] = None, endtime: Optional[datetime] = None): + starttime: datetime = None, endtime: datetime = None) -> int: query_filter = {} # type: Dict[str, dict] if starttime or endtime: query_filter["timestamp"] = {} @@ -102,7 +109,7 @@ def get_eventcount(self, bucket_id: str, return self.db[bucket_id]["events"].find(query_filter).count() def _transform_event(self, event: dict) -> dict: - if "duration" in event: + if "duration" in event: # pragma: no cover event["duration"] = event["duration"].total_seconds() return event diff --git a/aw_datastore/storages/peewee.py b/aw_datastore/storages/peewee.py index 6b01c331..a177c3fc 100644 --- a/aw_datastore/storages/peewee.py +++ b/aw_datastore/storages/peewee.py @@ -5,7 +5,7 @@ import logging import iso8601 -from peewee import Model, CharField, IntegerField, DecimalField, DateTimeField, ForeignKeyField, PrimaryKeyField +from peewee import Model, CharField, IntegerField, DecimalField, DateTimeField, ForeignKeyField, AutoField from playhouse.sqlite_ext import SqliteExtDatabase from aw_core.models import Event @@ -57,8 +57,8 @@ def json(self): class EventModel(BaseModel): - id = PrimaryKeyField() - bucket = ForeignKeyField(BucketModel, related_name='events', index=True) + id = AutoField() + bucket = ForeignKeyField(BucketModel, backref='events', index=True) timestamp = DateTimeField(index=True, default=datetime.now) duration = DecimalField() datastr = CharField() @@ -92,10 +92,8 @@ def __init__(self, testing: bool = True, filepath: str = None) -> None: self.db.connect() self.bucket_keys = {} # type: Dict[str, int] - if not BucketModel.table_exists(): - BucketModel.create_table() - if not EventModel.table_exists(): - EventModel.create_table() + BucketModel.create_table(safe=True) + EventModel.create_table(safe=True) self.update_bucket_keys() def update_bucket_keys(self) -> None: @@ -112,13 +110,19 @@ def create_bucket(self, bucket_id: str, type_id: str, client: str, hostname=hostname, created=created, name=name) self.update_bucket_keys() - def delete_bucket(self, bucket_id: str): - EventModel.delete().where(EventModel.bucket == self.bucket_keys[bucket_id]).execute() - BucketModel.delete().where(BucketModel.key == self.bucket_keys[bucket_id]).execute() - self.update_bucket_keys() + def delete_bucket(self, bucket_id: str) -> None: + if bucket_id in self.bucket_keys: + EventModel.delete().where(EventModel.bucket == self.bucket_keys[bucket_id]).execute() + BucketModel.delete().where(BucketModel.key == self.bucket_keys[bucket_id]).execute() + self.update_bucket_keys() + else: + raise Exception('Bucket did not exist, could not delete') def get_metadata(self, bucket_id: str): - return BucketModel.get(BucketModel.key == self.bucket_keys[bucket_id]).json() + if bucket_id in self.bucket_keys: + return BucketModel.get(BucketModel.key == self.bucket_keys[bucket_id]).json() + else: + raise Exception('Bucket did not exist, could not get metadata') def insert_one(self, bucket_id: str, event: Event) -> Event: e = EventModel.from_event(self.bucket_keys[bucket_id], event) diff --git a/aw_datastore/storages/sqlite.py b/aw_datastore/storages/sqlite.py index d20d6bff..ea537575 100644 --- a/aw_datastore/storages/sqlite.py +++ b/aw_datastore/storages/sqlite.py @@ -16,7 +16,7 @@ LATEST_VERSION=1 # The max integer value in SQLite is signed 8 Bytes / 64 bits -MAX_TIMESTAMP = 2**63-1 +MAX_TIMESTAMP = 2**63 - 1 CREATE_BUCKETS_TABLE = """ CREATE TABLE IF NOT EXISTS buckets ( @@ -124,39 +124,43 @@ def buckets(self): def create_bucket(self, bucket_id: str, type_id: str, client: str, hostname: str, created: str, name: Optional[str] = None): - self.conn.execute("INSERT INTO buckets(id, name, type, client, hostname, created, datastr) " + \ + self.conn.execute("INSERT INTO buckets(id, name, type, client, hostname, created, datastr) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", - [bucket_id, name, type_id, client, hostname, created, str({})]) - self.commit(); + [bucket_id, name, type_id, client, hostname, created, str({})]) + self.commit() return self.get_metadata(bucket_id) def delete_bucket(self, bucket_id: str): self.conn.execute("DELETE FROM events WHERE bucketrow IN (SELECT rowid FROM buckets WHERE id = ?)", [bucket_id]) - self.conn.execute("DELETE FROM buckets WHERE id = ?", [bucket_id]) + cursor = self.conn.execute("DELETE FROM buckets WHERE id = ?", [bucket_id]) self.commit() + if cursor.rowcount != 1: + raise Exception('Bucket did not exist, could not delete') def get_metadata(self, bucket_id: str): c = self.conn.cursor() res = c.execute("SELECT id, name, type, client, hostname, created FROM buckets WHERE id = ?", [bucket_id]) row = res.fetchone() - bucket = { - "id": row[0], - "name": row[1], - "type": row[2], - "client": row[3], - "hostname": row[4], - "created": row[5], - } - return bucket + if row is not None: + return { + "id": row[0], + "name": row[1], + "type": row[2], + "client": row[3], + "hostname": row[4], + "created": row[5], + } + else: + raise Exception('Bucket did not exist, could not get metadata') def insert_one(self, bucket_id: str, event: Event) -> Event: c = self.conn.cursor() starttime = event.timestamp.timestamp() * 1000000 endtime = starttime + (event.duration.total_seconds() * 1000000) datastr = json.dumps(event.data) - c.execute("INSERT INTO events(bucketrow, starttime, endtime, datastr) " + \ + c.execute("INSERT INTO events(bucketrow, starttime, endtime, datastr) " + "VALUES ((SELECT rowid FROM buckets WHERE id = ?), ?, ?, ?)", - [bucket_id, starttime, endtime, datastr]) + [bucket_id, starttime, endtime, datastr]) event.id = c.lastrowid self.conditional_commit(1) return event @@ -168,7 +172,7 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None: # See: https://github.com/coleifer/peewee/issues/948 event_rows = [] for event in events: - starttime = event.timestamp.timestamp()*1000000 + starttime = event.timestamp.timestamp() * 1000000 endtime = starttime + (event.duration.total_seconds() * 1000000) datastr = json.dumps(event.data) event_rows.append((bucket_id, starttime, endtime, datastr)) @@ -178,14 +182,15 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None: self.conditional_commit(len(event_rows)) def replace_last(self, bucket_id, event): - starttime = event.timestamp.timestamp()*1000000 + starttime = event.timestamp.timestamp() * 1000000 endtime = starttime + (event.duration.total_seconds() * 1000000) datastr = json.dumps(event.data) - query = "UPDATE events " + \ - "SET starttime = ?, endtime = ?, datastr = ? " + \ - "WHERE id = (SELECT id FROM events WHERE endtime = " + \ - "(SELECT max(endtime) FROM events WHERE bucketrow = " + \ - "(SELECT rowid FROM buckets WHERE id = ?) LIMIT 1))" + query = """UPDATE events + SET starttime = ?, endtime = ?, datastr = ? + WHERE id = ( + SELECT id FROM events WHERE endtime = + (SELECT max(endtime) FROM events WHERE bucketrow = + (SELECT rowid FROM buckets WHERE id = ?) LIMIT 1))""" self.conn.execute(query, [starttime, endtime, datastr, bucket_id]) self.conditional_commit(1) return True @@ -193,18 +198,19 @@ def replace_last(self, bucket_id, event): def delete(self, bucket_id, event_id): query = "DELETE FROM events " + \ "WHERE id = ? AND bucketrow = (SELECT b.rowid FROM buckets b WHERE b.id = ?)" - self.conn.execute(query, [event_id, bucket_id]) - # TODO: Handle if event doesn't exist - return True + cursor = self.conn.execute(query, [event_id, bucket_id]) + return cursor.rowcount == 1 def replace(self, bucket_id, event_id, event) -> bool: - starttime = event.timestamp.timestamp()*1000000 + starttime = event.timestamp.timestamp() * 1000000 endtime = starttime + (event.duration.total_seconds() * 1000000) datastr = json.dumps(event.data) - query = "UPDATE events " + \ - "SET bucketrow = (SELECT rowid FROM buckets WHERE id = ?), " + \ - "starttime = ?, endtime = ?, datastr = ? " + \ - "WHERE id = ?" + query = """UPDATE events + SET bucketrow = (SELECT rowid FROM buckets WHERE id = ?), + starttime = ?, + endtime = ?, + datastr = ? + WHERE id = ?""" self.conn.execute(query, [bucket_id, starttime, endtime, datastr, event_id]) self.conditional_commit(1) return True @@ -217,8 +223,8 @@ def get_events(self, bucket_id: str, limit: int, limit = -1 self.commit() c = self.conn.cursor() - starttime_i = starttime.timestamp()*1000000 if starttime else 0 - endtime_i = endtime.timestamp()*1000000 if endtime else MAX_TIMESTAMP + starttime_i = starttime.timestamp() * 1000000 if starttime else 0 + endtime_i = endtime.timestamp() * 1000000 if endtime else MAX_TIMESTAMP query = "SELECT id, starttime, endtime, datastr " + \ "FROM events " + \ "WHERE bucketrow = (SELECT rowid FROM buckets WHERE id = ?) " + \ @@ -228,8 +234,8 @@ def get_events(self, bucket_id: str, limit: int, events = [] for row in rows: eid = row[0] - starttime = datetime.fromtimestamp(row[1]/1000000, timezone.utc) - endtime = datetime.fromtimestamp(row[2]/1000000, timezone.utc) + starttime = datetime.fromtimestamp(row[1] / 1000000, timezone.utc) + endtime = datetime.fromtimestamp(row[2] / 1000000, timezone.utc) duration = endtime - starttime data = json.loads(row[3]) events.append(Event(id=eid, timestamp=starttime, duration=duration, data=data)) @@ -239,8 +245,8 @@ def get_eventcount(self, bucket_id: str, starttime: Optional[datetime] = None, endtime: Optional[datetime] = None): self.commit() c = self.conn.cursor() - starttime_i = starttime.timestamp()*1000000 if starttime else 0 - endtime_i = endtime.timestamp()*1000000 if endtime else MAX_TIMESTAMP + starttime_i = starttime.timestamp() * 1000000 if starttime else 0 + endtime_i = endtime.timestamp() * 1000000 if endtime else MAX_TIMESTAMP query = "SELECT count(*) " + \ "FROM events " + \ "WHERE bucketrow = (SELECT rowid FROM buckets WHERE id = ?) " + \ diff --git a/aw_transform/merge_events_by_keys.py b/aw_transform/merge_events_by_keys.py index 20063621..3fb9fb5f 100644 --- a/aw_transform/merge_events_by_keys.py +++ b/aw_transform/merge_events_by_keys.py @@ -1,5 +1,5 @@ import logging -from typing import List, Dict +from typing import List, Dict, Tuple from aw_core.models import Event @@ -11,23 +11,23 @@ def merge_events_by_keys(events, keys) -> List[Event]: # Call recursively until all keys are consumed if len(keys) < 1: return events - merged_events = {} # type: Dict[str, Event] + merged_events = {} # type: Dict[Tuple, Event] for event in events: - summed_key = "" + composite_key = () # type: Tuple for key in keys: if key in event.data: - summed_key = summed_key + "." + event["data"][key] - if summed_key not in merged_events: - merged_events[summed_key] = Event( + composite_key = composite_key + (event["data"][key],) + if composite_key not in merged_events: + merged_events[composite_key] = Event( timestamp=event.timestamp, duration=event.duration, data={} ) for key in keys: if key in event.data: - merged_events[summed_key].data[key] = event.data[key] + merged_events[composite_key].data[key] = event.data[key] else: - merged_events[summed_key].duration += event.duration + merged_events[composite_key].duration += event.duration result = [] for key in merged_events: result.append(Event(**merged_events[key])) diff --git a/tests/test_datastore.py b/tests/test_datastore.py index 417eb11c..9be5221a 100644 --- a/tests/test_datastore.py +++ b/tests/test_datastore.py @@ -50,6 +50,16 @@ def test_create_bucket(datastore): assert bid not in datastore.buckets() +@pytest.mark.parametrize("datastore", param_datastore_objects()) +def test_delete_bucket(datastore): + bid = "test" + datastore.create_bucket(bucket_id=bid, type="test", client="test", hostname="test", name="test") + datastore.delete_bucket(bid) + assert bid not in datastore.buckets() + with pytest.raises(Exception): + datastore.delete_bucket(bid) + + @pytest.mark.parametrize("datastore", param_datastore_objects()) def test_nonexistant_bucket(datastore): """ @@ -114,8 +124,13 @@ def test_delete(bucket_cm): print(fetched_events[0]) assert num_events == len(fetched_events) + # Test deleting event assert bucket.delete(fetched_events[0]["id"]) + # Test deleting non-existant event + # FIXME: Doesn't work due to lazy evaluation in SqliteDatastore + # assert not bucket.delete(fetched_events[0]["id"]) + fetched_events = bucket.get(limit=-1) assert num_events - 1 == len(fetched_events) @@ -126,11 +141,11 @@ def test_insert_badtype(bucket_cm): Tests that you cannot insert non-event types into a bucket """ with bucket_cm as bucket: - l = len(bucket.get()) + bucket_len = len(bucket.get()) badevent = 1 with pytest.raises(TypeError): bucket.insert(badevent) - assert l == len(bucket.get()) + assert bucket_len == len(bucket.get()) @pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm()) @@ -326,6 +341,9 @@ def test_get_metadata(bucket_cm): assert 'id' in metadata assert 'name' in metadata assert 'type' in metadata + with pytest.raises(Exception): + bucket.metadata() + @pytest.mark.parametrize("bucket_cm", param_testing_buckets_cm()) def test_get_eventcount(bucket_cm): @@ -334,8 +352,13 @@ def test_get_eventcount(bucket_cm): """ with bucket_cm as bucket: print(bucket.ds.storage_strategy) - assert 0 == bucket.get_eventcount() - for i in range(5): + assert bucket.get_eventcount() == 0 + for _ in range(5): bucket.insert(Event(timestamp=now)) - assert 5 == bucket.get_eventcount() + assert bucket.get_eventcount() == 5 # TODO: Test with timestamps and start/endtime filtering + + bucket.insert(Event(timestamp=now + timedelta(seconds=5))) + assert bucket.get_eventcount(starttime=now - timedelta(seconds=1), endtime=now) == 5 + assert bucket.get_eventcount(endtime=now + timedelta(seconds=1)) == 5 + assert bucket.get_eventcount(starttime=now + timedelta(seconds=1)) == 1 diff --git a/tests/test_event.py b/tests/test_event.py index 1b53991d..81e8133e 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -1,6 +1,8 @@ from datetime import datetime, timedelta, timezone import json +import pytest + from aw_core.models import Event valid_timestamp="1937-01-01T12:00:27.87+00:20" @@ -9,17 +11,23 @@ td1s = timedelta(seconds=1) -def test_create(): +def test_create() -> None: Event(timestamp=now, duration=timedelta(hours=13, minutes=37), data={"key": "val"}) Event(timestamp=valid_timestamp, duration=timedelta(hours=13, minutes=37), data={"key": "val"}) -def test_json_serialization(): +def test_json_serialization() -> None: e = Event(timestamp=now, duration=timedelta(hours=13, minutes=37), data={"key": "val"}) assert e == Event(**json.loads(e.to_json_str())) -def test_sort(): +def test_set_invalid_duration() -> None: + e = Event() + with pytest.raises(TypeError): + e.duration = "12" # type: ignore + + +def test_sort() -> None: e1 = Event(timestamp=now) e2 = Event(timestamp=now + td1s) e_sorted = sorted([e2, e1]) diff --git a/tests/test_heartbeat.py b/tests/test_heartbeat.py index 71b7d8af..f17546a2 100644 --- a/tests/test_heartbeat.py +++ b/tests/test_heartbeat.py @@ -39,6 +39,9 @@ def test_heartbeat_reduce(): now = datetime.now() td_1s = timedelta(seconds=1) + # Check that empty list works + assert not heartbeat_reduce([], pulsetime=1) + events = [Event(timestamp=now, data={"label": "test"}), Event(timestamp=now + td_1s, data={"label": "test"})] reduced_events = heartbeat_reduce(events, pulsetime=2) assert len(reduced_events) == 1 diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 9b0c87d4..f04dc184 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -65,9 +65,9 @@ def test_filter_period_intersect(): # Filter 1h event with another 1h event at a 30min offset to_filter = [Event(timestamp=now, duration=td1h)] - filter_with = [Event(timestamp=now + timedelta(minutes=30), duration=td1h)] + filter_with = [Event(timestamp=now + td30min, duration=td1h)] filtered_events = filter_period_intersect(to_filter, filter_with) - assert filtered_events[0].duration == timedelta(minutes=30) + assert filtered_events[0].duration == td30min # Filter 2x 30min events with a 15min gap with another 45min event in between intersecting both to_filter = [ @@ -152,8 +152,15 @@ def test_merge_events_by_keys_1(): e2_data = {"label": "b"} e1 = Event(data=e1_data, timestamp=now, duration=timedelta(seconds=1)) e2 = Event(data=e2_data, timestamp=now, duration=timedelta(seconds=1)) - events = events + [e1]*10 - events = events + [e2]*5 + events = events + [e1] * 10 + events = events + [e2] * 5 + + # Check that an empty key list has no effect + assert merge_events_by_keys(events, []) == events + + # Check that trying to merge on unavailable key has no effect + assert len(merge_events_by_keys(events, ["unknown"])) == 1 + result = merge_events_by_keys(events, ["label"]) result = sort_by_duration(result) print(result) @@ -172,9 +179,9 @@ def test_merge_events_by_keys_2(): e1 = Event(data=e1_data, timestamp=now, duration=timedelta(seconds=1)) e2 = Event(data=e2_data, timestamp=now, duration=timedelta(seconds=1)) e3 = Event(data=e3_data, timestamp=now, duration=timedelta(seconds=1)) - events = events + [e1]*10 - events = events + [e2]*9 - events = events + [e3]*8 + events = events + [e1] * 10 + events = events + [e2] * 9 + events = events + [e3] * 8 result = merge_events_by_keys(events, ["k1", "k2"]) result = sort_by_duration(result) print(result)