Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ language: python
python:
# - "3.3"
# - "3.4"
- "3.5"
# - "3.5"
- "3.6"

services:
Expand Down
8 changes: 2 additions & 6 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@ environment:
# a later point release.
# See: http://www.appveyor.com/docs/installed-software#python

- PYTHON: "C:\\Python35"
PYTHON_VERSION: "3.5.x" # currently 3.5.2
- PYTHON: "C:\\Python36"
PYTHON_VERSION: "3.6.x" # currently 3.6.6
PYTHON_ARCH: "32"

- PYTHON: "C:\\Python35-x64"
PYTHON_VERSION: "3.5.x" # currently 3.5.2
PYTHON_ARCH: "64"

install:
# If there is a newer build queued for the same PR, cancel this one.
# The AppVeyor 'rollout builds' option is supposed to serve the same
Expand Down
4 changes: 2 additions & 2 deletions aw_datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
# like ellipsises. See here: https://github.com/python/typing/issues/259
def get_storage_methods() -> Dict[str, Callable[[Any], storages.AbstractStorage]]:
from .storages import MemoryStorage, MongoDBStorage, PeeweeStorage, SqliteStorage
methods = {
methods: Dict[str, Callable[[Any], storages.AbstractStorage]] = {
PeeweeStorage.sid: PeeweeStorage,
MemoryStorage.sid: MemoryStorage,
SqliteStorage.sid: SqliteStorage,
} # type: Dict[str, Callable[[Any], storages.AbstractStorage]]
}

# MongoDB is not supported on Windows or macOS
if _platform.system() == "Linux": # pragma: no branch
Expand Down
6 changes: 3 additions & 3 deletions aw_datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class Datastore:
def __init__(self, storage_strategy: Callable[..., AbstractStorage], testing=False) -> None:
self.logger = logger.getChild("Datastore")
self.bucket_instances = dict() # type: Dict[str, Bucket]
self.bucket_instances: Dict[str, Bucket] = dict()

self.storage_strategy = storage_strategy(testing=testing)

Expand Down Expand Up @@ -93,11 +93,11 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:

now = datetime.now(tz=timezone.utc)

inserted = None # type: Optional[Event]
inserted: Optional[Event] = None

# Call insert
if isinstance(events, Event):
oldest_event = events # type: Optional[Event]
oldest_event: Optional[Event] = events
if events.timestamp + events.duration > now:
self.logger.warning("Event inserted into bucket {} reaches into the future. Current UTC time: {}. Event data: {}".format(self.bucket_id, str(now), str(events)))
inserted = self.ds.storage_strategy.insert_one(self.bucket_id, events)
Expand Down
2 changes: 1 addition & 1 deletion aw_datastore/storages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging as _logging

logger = _logging.getLogger(__name__) # type: _logging.Logger
logger: _logging.Logger = _logging.getLogger(__name__)

from .abstract import AbstractStorage

Expand Down
4 changes: 2 additions & 2 deletions aw_datastore/storages/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class MemoryStorage(AbstractStorage):
def __init__(self, testing: bool) -> None:
self.logger = logger.getChild(self.sid)
# self.logger.warning("Using in-memory storage, any events stored will not be persistent and will be lost when server is shut down. Use the --storage parameter to set a different storage method.")
self.db = {} # type: Dict[str, List[Event]]
self._metadata = dict() # type: Dict[str, dict]
self.db: Dict[str, List[Event]] = {}
self._metadata: Dict[str, dict] = dict()

def create_bucket(self, bucket_id, type_id, client, hostname, created, name=None) -> None:
if not name:
Expand Down
6 changes: 3 additions & 3 deletions aw_datastore/storages/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_metadata(self, bucket_id: str) -> dict:

def get_events(self, bucket_id: str, limit: int,
starttime: Optional[datetime] = None, endtime: Optional[datetime] = None):
query_filter = {} # type: Dict[str, dict]
query_filter: Dict[str, dict] = {}
if starttime or endtime:
query_filter["timestamp"] = {}
if starttime:
Expand All @@ -99,7 +99,7 @@ def get_events(self, bucket_id: str, limit: int,

def get_eventcount(self, bucket_id: str,
starttime: datetime = None, endtime: datetime = None) -> int:
query_filter = {} # type: Dict[str, dict]
query_filter: Dict[str, dict] = {}
if starttime or endtime:
query_filter["timestamp"] = {}
if starttime:
Expand All @@ -123,7 +123,7 @@ def insert_one(self, bucket: str, event: Event) -> Event:

def insert_many(self, bucket: str, events: List[Event]):
# .copy is needed because otherwise mongodb inserts a _id field into the event
dict_events = [self._transform_event(event.copy()) for event in events] # type: List[dict]
dict_events: List[dict] = [self._transform_event(event.copy()) for event in events]
self.db[bucket]["events"].insert_many(dict_events)

def delete(self, bucket_id: str, event_id) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion aw_datastore/storages/peewee.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, testing: bool = True, filepath: str = None) -> None:

self.db.connect()

self.bucket_keys = {} # type: Dict[str, int]
self.bucket_keys: Dict[str, int] = {}
BucketModel.create_table(safe=True)
EventModel.create_table(safe=True)
self.update_bucket_keys()
Expand Down
27 changes: 25 additions & 2 deletions aw_query/functions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import iso8601
from typing import Callable, Dict, Any, List
from inspect import signature
Expand All @@ -12,6 +13,9 @@
filter_keyvals,
filter_keyvals_regex,
period_union,
categorize,
tag,
Rule,
merge_events_by_keys,
chunk_events_by_key,
sort_by_timestamp,
Expand All @@ -21,7 +25,7 @@
split_url_events,
simplify_string,
flood,
limit_events
limit_events,
)

from .exceptions import QueryFunctionException
Expand All @@ -48,7 +52,7 @@ def _verify_variable_is_type(variable, t):
"""
Declarations
"""
functions = {} # type: Dict[str, TQueryFunction]
functions: Dict[str, TQueryFunction] = {}


def q2_function(transform_func=None):
Expand Down Expand Up @@ -279,3 +283,22 @@ def q2_simplify_window_titles(events: list, key: str) -> List[Event]:
def q2_nop():
"""No operation function for unittesting"""
return 1


"""
Classify
"""


@q2_function(categorize)
@q2_typecheck
def q2_categorize(events: list, classes: list):
classes = [(_cls, Rule(rule_dict)) for _cls, rule_dict in classes]
return categorize(events, classes)


@q2_function(tag)
@q2_typecheck
def q2_tag(events: list, classes: list):
classes = [(_cls, Rule(rule_dict)) for _cls, rule_dict in classes]
return tag(events, classes)
14 changes: 8 additions & 6 deletions aw_query/query2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Union, List, Dict, Sequence, Callable, Type, Any
from typing import Union, List, Dict, Sequence, Callable, Type, Any, Tuple
from datetime import datetime

from aw_core.models import Event
Expand All @@ -20,7 +20,7 @@ def parse(string: str, namespace: dict):
raise NotImplementedError

@staticmethod
def check(string: str):
def check(string: str) -> Tuple[str, str]:
raise NotImplementedError


Expand Down Expand Up @@ -207,7 +207,7 @@ def interpret(self, datastore: Datastore, namespace: dict):
@staticmethod
def parse(string: str, namespace: dict) -> QToken:
entries_str = string[1:-1]
d = {} # type: Dict[str, QToken]
d: Dict[str, QToken] = {}
while len(entries_str) > 0:
entries_str = entries_str.strip()
if len(d) > 0 and entries_str[0] == ",":
Expand Down Expand Up @@ -272,7 +272,7 @@ def interpret(self, datastore: Datastore, namespace: dict):
@staticmethod
def parse(string: str, namespace: dict) -> QToken:
entries_str = string[1:-1]
l = [] # type: List[QToken]
l: List[QToken] = []
while len(entries_str) > 0:
entries_str = entries_str.strip()
if len(l) > 0 and entries_str[0] == ",":
Expand Down Expand Up @@ -314,8 +314,10 @@ def check(string: str):
return string[:i], string[i + 1:]


qtypes = [QString, QInteger, QFunction, QDict, QList, QVariable] # type: Sequence[Type[QToken]]
def _parse_token(string: str, namespace: dict): # TODO: Add return type
qtypes: Sequence[Type[QToken]] = [QString, QInteger, QFunction, QDict, QList, QVariable]


def _parse_token(string: str, namespace: dict) -> Tuple[Tuple[Any, str], str]:
# TODO: The whole parsing thing is shoddily written, needs a rewrite from ground-up
if not isinstance(string, str):
raise QueryParseException("Reached unreachable, cannot parse something that isn't a string")
Expand Down
4 changes: 4 additions & 0 deletions aw_transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
from .split_url_events import split_url_events
from .simplify import simplify_string
from .flood import flood
from .classify import categorize, tag, Rule

__all__ = [
'flood',
'concat',
'categorize',
'tag',
'Rule',
'period_union',
'filter_period_intersect',
'union',
Expand Down
2 changes: 1 addition & 1 deletion aw_transform/chunk_events_by_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def chunk_events_by_key(events: List[Event], key: str, pulsetime: float=5.0) -> List[Event]:
chunked_events = [] # type: List[Event]
chunked_events: List[Event] = []
for event in events:
if key not in event.data:
break
Expand Down
52 changes: 52 additions & 0 deletions aw_transform/classify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Pattern, List, Iterable, Tuple, Dict, Optional
from functools import reduce
import re

from aw_core import Event


Tag = str
Category = List[str]


class Rule:
regex: Optional[Pattern]

def __init__(self, rules: Dict[str, str]):
if "regex" in rules:
self.regex = re.compile(rules["regex"]) if rules["regex"] else None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaghetti code, first you check if "regex" is in rules and then again you check if regex is in rules.

Would make more sense to write

    if "regex" in rules:
        self.regex = re.compile(rules["regex"])
    else:
        self.regex = None

Oh, now that I think of it it's even incorrect as well as "regex" is impossible to not be set and it is possible to set rules["regex"] = False which would be silly.

Copy link
Copy Markdown
Member Author

@ErikBjare ErikBjare Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not actually, the check is for when the regex string is an empty string, which would erroneously match everything. And regex might not be set at all, in the future another rule might be used instead.

It could be replaced with rules.get("regex", None) or None though, which is cleaner. Edit: Nevermind, actually it can't since it needs to pass through re.compile.


def match(self, e: Event):
for val in e.data.values():
if isinstance(val, str):
if self.regex and self.regex.search(val):
return True
return False


def categorize(events: List[Event], classes: List[Tuple[Category, Rule]]):
return [_categorize_one(e, classes) for e in events]


def _categorize_one(e: Event, classes: List[Tuple[Category, Rule]]) -> Event:
e.data["$category"] = _pick_category([_cls for _cls, rule in classes if rule.match(e)])
return e


def tag(events: List[Event], classes: List[Tuple[Tag, Rule]]):
return [_tag_one(e, classes) for e in events]


def _tag_one(e: Event, classes: List[Tuple[Tag, Rule]]) -> Event:
e.data["$tags"] = [_cls for _cls, rule in classes if rule.match(e)]
return e


def _pick_category(tags: Iterable[Category]) -> Category:
return reduce(_pick_deepest_cat, tags, ["Uncategorized"])


def _pick_deepest_cat(t1: Category, t2: Category) -> Category:
# t1 will be the accumulator when used in reduce
# Always bias against t1, since it could be "Uncategorized"
return t2 if len(t2) >= len(t1) else t1
10 changes: 7 additions & 3 deletions aw_transform/merge_events_by_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ def merge_events_by_keys(events, keys) -> List[Event]:
# Call recursively until all keys are consumed
if len(keys) < 1:
return events
merged_events = {} # type: Dict[Tuple, Event]
merged_events: Dict[Tuple, Event] = {}
for event in events:
composite_key = () # type: Tuple
composite_key: Tuple = ()
for key in keys:
if key in event.data:
composite_key = composite_key + (event["data"][key],)
val = event["data"][key]
# Needed for when the value is a list, such as for categories
if isinstance(val, list):
val = tuple(val)
composite_key = composite_key + (val,)
if composite_key not in merged_events:
merged_events[composite_key] = Event(
timestamp=event.timestamp,
Expand Down
28 changes: 10 additions & 18 deletions aw_transform/split_url_events.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
from copy import copy, deepcopy
import operator
from functools import reduce
from collections import defaultdict
from typing import List

from urllib.parse import urlparse

from aw_core.models import Event
from aw_core import TimePeriod

logger = logging.getLogger(__name__)


from urllib.parse import urlparse

def split_url_events(events):
def split_url_events(events: List[Event]) -> List[Event]:
for event in events:
if "url" in event.data:
url = event.data["url"]
parsed_url = urlparse(url)
event.data["protocol"] = parsed_url.scheme
event.data["domain"] = parsed_url.netloc
if event.data["domain"][:4] == "www.":
event.data["domain"] = event.data["domain"][4:]
event.data["path"] = parsed_url.path
event.data["params"] = parsed_url.params
event.data["options"] = parsed_url.query
event.data["identifier"] = parsed_url.fragment
event.data["$protocol"] = parsed_url.scheme
event.data["$domain"] = parsed_url.netloc[4:] if parsed_url.netloc[:4] == "www." else parsed_url.netloc
event.data["$path"] = parsed_url.path
event.data["$params"] = parsed_url.params
event.data["$options"] = parsed_url.query
event.data["$identifier"] = parsed_url.fragment
# TODO: Parse user, port etc aswell
return events
1 change: 1 addition & 0 deletions tests/test_query2.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def test_query2_query_functions(datastore):
events = split_url_events(events);
events = sort_by_timestamp(events);
events = sort_by_duration(events);
events = categorize(events, [[["test", "subtest"], {{"regex": "^test"}}], [["test"], {{"regex": "^test"}}]]);
duration = sum_durations(events);
eventcount = query_bucket_eventcount(bid);
asd = nop();
Expand Down
Loading