Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions aw_datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
"""
last_event_list = self.get(1)
last_event = None
if len(last_event_list) > 0:
if last_event_list:
last_event = last_event_list[0]
"""

Expand All @@ -103,9 +103,9 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
inserted = self.ds.storage_strategy.insert_one(self.bucket_id, events)
#assert inserted
elif isinstance(events, list):
if len(events) > 0:
if events:
oldest_event = sorted(events, key=lambda k: k['timestamp'])[0]
else:
else: # pragma: no cover
oldest_event = None
for event in events:
if event.timestamp + event.duration > now:
Expand All @@ -117,7 +117,7 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
# Warn if timestamp is older than last event
"""
if last_event and oldest_event:
if oldest_event.timestamp < last_event.timestamp:
if oldest_event.timestamp < last_event.timestamp: # pragma: no cover
self.logger.warning("Inserting event that has a older timestamp than previous event!" +
"\nPrevious:" + str(last_event) +
"\nInserted:" + str(oldest_event))
Expand Down
4 changes: 2 additions & 2 deletions aw_datastore/storages/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def get_metadata(self, bucket_id: str) -> dict:

@abstractmethod
def get_events(self, bucket_id: str, limit: int,
starttime: Optional[datetime]=None, endtime: Optional[datetime]=None) -> List[Event]:
starttime: Optional[datetime] = None, endtime: Optional[datetime] = None) -> List[Event]:
raise NotImplementedError

def get_eventcount(self, bucket_id: str,
starttime: Optional[datetime]=None, endtime: Optional[datetime]=None) -> int:
starttime: Optional[datetime] = None, endtime: Optional[datetime] = None) -> int:
raise NotImplementedError

@abstractmethod
Expand Down
26 changes: 16 additions & 10 deletions aw_datastore/storages/memory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
import copy
from datetime import datetime
from typing import List, Dict

from aw_core.models import Event

Expand All @@ -13,7 +13,7 @@ class MemoryStorage(AbstractStorage):
"""For storage of data in-memory, useful primarily in testing"""
sid = "memory"

def __init__(self, testing):
def __init__(self, testing: bool) -> None:
Comment thread
ErikBjare marked this conversation as resolved.
self.logger = logger.getChild(self.sid)
# self.logger.warning("Using in-memory storage, any events stored will not be persistent and will be lost when server is shut down. Use the --storage parameter to set a different storage method.")
self.db = {} # type: Dict[str, List[Event]]
Expand All @@ -37,6 +37,8 @@ def delete_bucket(self, bucket_id: str) -> None:
del self.db[bucket_id]
if bucket_id in self._metadata:
del self._metadata[bucket_id]
else:
raise Exception("Bucket did not exist, could not delete")

def buckets(self):
buckets = dict()
Expand All @@ -45,7 +47,7 @@ def buckets(self):
return buckets

def get_events(self, bucket: str, limit: int,
starttime: datetime=None, endtime: datetime=None):
starttime: datetime=None, endtime: datetime=None) -> List[Event]:
events = self.db[bucket]
# Sort by timestamp
events = sorted(events, key=lambda k: k['timestamp'])[::-1]
Expand All @@ -72,23 +74,27 @@ def get_events(self, bucket: str, limit: int,
return copy.deepcopy(events)

def get_eventcount(self, bucket: str,
starttime: datetime=None, endtime: datetime=None):
return len(self.db[bucket])
starttime: datetime=None, endtime: datetime=None) -> int:
return len([e for e in self.db[bucket] if
(not starttime or starttime <= e.timestamp) and
(not endtime or e.timestamp <= endtime)])

def get_metadata(self, bucket_id: str):
return self._metadata[bucket_id]
if bucket_id in self._metadata:
return self._metadata[bucket_id]
else:
raise Exception('Bucket did not exist, could not get metadata')

def insert_one(self, bucket: str, event: Event) -> Event:
self.db[bucket].append(Event(**event))
event.id = len(self.db[bucket]) - 1
return event

def delete(self, bucket_id, event_id):
if len(self.db[bucket_id]) >= event_id:
self.db[bucket_id].pop(event_id)
for idx in (idx for idx, event in reversed(list(enumerate(self.db[bucket_id]))) if event.id == event_id):
self.db[bucket_id].pop(idx)
return True
else:
return False
return False

def replace(self, bucket_id, event_id, event):
self.db[bucket_id][event_id] = event
Expand Down
17 changes: 12 additions & 5 deletions aw_datastore/storages/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ def create_bucket(self, bucket_id: str, type_id: str, client: str, hostname: str
self.db[bucket_id]["metadata"].insert_one(metadata)

def delete_bucket(self, bucket_id: str) -> None:
self.db[bucket_id]["events"].drop()
self.db[bucket_id]["metadata"].drop()
print(self.db.collection_names())
if bucket_id + ".metadata" in self.db.collection_names():
self.db[bucket_id]["events"].drop()
self.db[bucket_id]["metadata"].drop()
else:
# TODO: Create custom exception
raise Exception('Bucket did not exist, could not delete')

def buckets(self) -> Dict[str, dict]:
bucketnames = set()
Expand All @@ -63,7 +68,9 @@ def get_metadata(self, bucket_id: str) -> dict:
metadata = self.db[bucket_id]["metadata"].find_one({"_id": "metadata"})
if metadata:
del metadata["_id"]
return metadata
return metadata
else:
raise Exception('Bucket did not exist, could not get metadata')

def get_events(self, bucket_id: str, limit: int,
starttime: Optional[datetime] = None, endtime: Optional[datetime] = None):
Expand Down Expand Up @@ -91,7 +98,7 @@ def get_events(self, bucket_id: str, limit: int,
return events

def get_eventcount(self, bucket_id: str,
starttime: Optional[datetime] = None, endtime: Optional[datetime] = None):
starttime: datetime = None, endtime: datetime = None) -> int:
query_filter = {} # type: Dict[str, dict]
if starttime or endtime:
query_filter["timestamp"] = {}
Expand All @@ -102,7 +109,7 @@ def get_eventcount(self, bucket_id: str,
return self.db[bucket_id]["events"].find(query_filter).count()

def _transform_event(self, event: dict) -> dict:
if "duration" in event:
if "duration" in event: # pragma: no cover
event["duration"] = event["duration"].total_seconds()
return event

Expand Down
28 changes: 16 additions & 12 deletions aw_datastore/storages/peewee.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import iso8601

from peewee import Model, CharField, IntegerField, DecimalField, DateTimeField, ForeignKeyField, PrimaryKeyField
from peewee import Model, CharField, IntegerField, DecimalField, DateTimeField, ForeignKeyField, AutoField
from playhouse.sqlite_ext import SqliteExtDatabase

from aw_core.models import Event
Expand Down Expand Up @@ -57,8 +57,8 @@ def json(self):


class EventModel(BaseModel):
id = PrimaryKeyField()
bucket = ForeignKeyField(BucketModel, related_name='events', index=True)
id = AutoField()
bucket = ForeignKeyField(BucketModel, backref='events', index=True)
timestamp = DateTimeField(index=True, default=datetime.now)
duration = DecimalField()
datastr = CharField()
Expand Down Expand Up @@ -92,10 +92,8 @@ def __init__(self, testing: bool = True, filepath: str = None) -> None:
self.db.connect()

self.bucket_keys = {} # type: Dict[str, int]
if not BucketModel.table_exists():
BucketModel.create_table()
if not EventModel.table_exists():
EventModel.create_table()
BucketModel.create_table(safe=True)
EventModel.create_table(safe=True)
self.update_bucket_keys()

def update_bucket_keys(self) -> None:
Expand All @@ -112,13 +110,19 @@ def create_bucket(self, bucket_id: str, type_id: str, client: str,
hostname=hostname, created=created, name=name)
self.update_bucket_keys()

def delete_bucket(self, bucket_id: str):
EventModel.delete().where(EventModel.bucket == self.bucket_keys[bucket_id]).execute()
BucketModel.delete().where(BucketModel.key == self.bucket_keys[bucket_id]).execute()
self.update_bucket_keys()
def delete_bucket(self, bucket_id: str) -> None:
if bucket_id in self.bucket_keys:
EventModel.delete().where(EventModel.bucket == self.bucket_keys[bucket_id]).execute()
BucketModel.delete().where(BucketModel.key == self.bucket_keys[bucket_id]).execute()
self.update_bucket_keys()
else:
raise Exception('Bucket did not exist, could not delete')

def get_metadata(self, bucket_id: str):
return BucketModel.get(BucketModel.key == self.bucket_keys[bucket_id]).json()
if bucket_id in self.bucket_keys:
return BucketModel.get(BucketModel.key == self.bucket_keys[bucket_id]).json()
else:
raise Exception('Bucket did not exist, could not get metadata')

def insert_one(self, bucket_id: str, event: Event) -> Event:
e = EventModel.from_event(self.bucket_keys[bucket_id], event)
Expand Down
80 changes: 43 additions & 37 deletions aw_datastore/storages/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
LATEST_VERSION=1

# The max integer value in SQLite is signed 8 Bytes / 64 bits
MAX_TIMESTAMP = 2**63-1
MAX_TIMESTAMP = 2**63 - 1

CREATE_BUCKETS_TABLE = """
CREATE TABLE IF NOT EXISTS buckets (
Expand Down Expand Up @@ -124,39 +124,43 @@ def buckets(self):

def create_bucket(self, bucket_id: str, type_id: str, client: str,
hostname: str, created: str, name: Optional[str] = None):
self.conn.execute("INSERT INTO buckets(id, name, type, client, hostname, created, datastr) " + \
self.conn.execute("INSERT INTO buckets(id, name, type, client, hostname, created, datastr) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)",
[bucket_id, name, type_id, client, hostname, created, str({})])
self.commit();
[bucket_id, name, type_id, client, hostname, created, str({})])
self.commit()
return self.get_metadata(bucket_id)

def delete_bucket(self, bucket_id: str):
self.conn.execute("DELETE FROM events WHERE bucketrow IN (SELECT rowid FROM buckets WHERE id = ?)", [bucket_id])
self.conn.execute("DELETE FROM buckets WHERE id = ?", [bucket_id])
cursor = self.conn.execute("DELETE FROM buckets WHERE id = ?", [bucket_id])
self.commit()
if cursor.rowcount != 1:
raise Exception('Bucket did not exist, could not delete')

def get_metadata(self, bucket_id: str):
c = self.conn.cursor()
res = c.execute("SELECT id, name, type, client, hostname, created FROM buckets WHERE id = ?", [bucket_id])
row = res.fetchone()
bucket = {
"id": row[0],
"name": row[1],
"type": row[2],
"client": row[3],
"hostname": row[4],
"created": row[5],
}
return bucket
if row is not None:
return {
"id": row[0],
"name": row[1],
"type": row[2],
"client": row[3],
"hostname": row[4],
"created": row[5],
}
else:
raise Exception('Bucket did not exist, could not get metadata')

def insert_one(self, bucket_id: str, event: Event) -> Event:
c = self.conn.cursor()
starttime = event.timestamp.timestamp() * 1000000
endtime = starttime + (event.duration.total_seconds() * 1000000)
datastr = json.dumps(event.data)
c.execute("INSERT INTO events(bucketrow, starttime, endtime, datastr) " + \
c.execute("INSERT INTO events(bucketrow, starttime, endtime, datastr) " +
"VALUES ((SELECT rowid FROM buckets WHERE id = ?), ?, ?, ?)",
[bucket_id, starttime, endtime, datastr])
[bucket_id, starttime, endtime, datastr])
event.id = c.lastrowid
self.conditional_commit(1)
return event
Expand All @@ -168,7 +172,7 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
# See: https://github.com/coleifer/peewee/issues/948
event_rows = []
for event in events:
starttime = event.timestamp.timestamp()*1000000
starttime = event.timestamp.timestamp() * 1000000
endtime = starttime + (event.duration.total_seconds() * 1000000)
datastr = json.dumps(event.data)
event_rows.append((bucket_id, starttime, endtime, datastr))
Expand All @@ -178,33 +182,35 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
self.conditional_commit(len(event_rows))

def replace_last(self, bucket_id, event):
starttime = event.timestamp.timestamp()*1000000
starttime = event.timestamp.timestamp() * 1000000
endtime = starttime + (event.duration.total_seconds() * 1000000)
datastr = json.dumps(event.data)
query = "UPDATE events " + \
"SET starttime = ?, endtime = ?, datastr = ? " + \
"WHERE id = (SELECT id FROM events WHERE endtime = " + \
"(SELECT max(endtime) FROM events WHERE bucketrow = " + \
"(SELECT rowid FROM buckets WHERE id = ?) LIMIT 1))"
query = """UPDATE events
SET starttime = ?, endtime = ?, datastr = ?
WHERE id = (
SELECT id FROM events WHERE endtime =
(SELECT max(endtime) FROM events WHERE bucketrow =
(SELECT rowid FROM buckets WHERE id = ?) LIMIT 1))"""
self.conn.execute(query, [starttime, endtime, datastr, bucket_id])
self.conditional_commit(1)
return True

def delete(self, bucket_id, event_id):
query = "DELETE FROM events " + \
"WHERE id = ? AND bucketrow = (SELECT b.rowid FROM buckets b WHERE b.id = ?)"
self.conn.execute(query, [event_id, bucket_id])
# TODO: Handle if event doesn't exist
return True
cursor = self.conn.execute(query, [event_id, bucket_id])
return cursor.rowcount == 1

def replace(self, bucket_id, event_id, event) -> bool:
starttime = event.timestamp.timestamp()*1000000
starttime = event.timestamp.timestamp() * 1000000
endtime = starttime + (event.duration.total_seconds() * 1000000)
datastr = json.dumps(event.data)
query = "UPDATE events " + \
"SET bucketrow = (SELECT rowid FROM buckets WHERE id = ?), " + \
"starttime = ?, endtime = ?, datastr = ? " + \
"WHERE id = ?"
query = """UPDATE events
SET bucketrow = (SELECT rowid FROM buckets WHERE id = ?),
starttime = ?,
endtime = ?,
datastr = ?
WHERE id = ?"""
self.conn.execute(query, [bucket_id, starttime, endtime, datastr, event_id])
self.conditional_commit(1)
return True
Expand All @@ -217,8 +223,8 @@ def get_events(self, bucket_id: str, limit: int,
limit = -1
self.commit()
c = self.conn.cursor()
starttime_i = starttime.timestamp()*1000000 if starttime else 0
endtime_i = endtime.timestamp()*1000000 if endtime else MAX_TIMESTAMP
starttime_i = starttime.timestamp() * 1000000 if starttime else 0
endtime_i = endtime.timestamp() * 1000000 if endtime else MAX_TIMESTAMP
query = "SELECT id, starttime, endtime, datastr " + \
"FROM events " + \
"WHERE bucketrow = (SELECT rowid FROM buckets WHERE id = ?) " + \
Expand All @@ -228,8 +234,8 @@ def get_events(self, bucket_id: str, limit: int,
events = []
for row in rows:
eid = row[0]
starttime = datetime.fromtimestamp(row[1]/1000000, timezone.utc)
endtime = datetime.fromtimestamp(row[2]/1000000, timezone.utc)
starttime = datetime.fromtimestamp(row[1] / 1000000, timezone.utc)
endtime = datetime.fromtimestamp(row[2] / 1000000, timezone.utc)
duration = endtime - starttime
data = json.loads(row[3])
events.append(Event(id=eid, timestamp=starttime, duration=duration, data=data))
Expand All @@ -239,8 +245,8 @@ def get_eventcount(self, bucket_id: str,
starttime: Optional[datetime] = None, endtime: Optional[datetime] = None):
self.commit()
c = self.conn.cursor()
starttime_i = starttime.timestamp()*1000000 if starttime else 0
endtime_i = endtime.timestamp()*1000000 if endtime else MAX_TIMESTAMP
starttime_i = starttime.timestamp() * 1000000 if starttime else 0
endtime_i = endtime.timestamp() * 1000000 if endtime else MAX_TIMESTAMP
query = "SELECT count(*) " + \
"FROM events " + \
"WHERE bucketrow = (SELECT rowid FROM buckets WHERE id = ?) " + \
Expand Down
Loading