diff --git a/.travis.yml b/.travis.yml index cdee1cc3..43b1cb7c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language: python python: # - "3.3" # - "3.4" - - "3.5" +# - "3.5" - "3.6" services: diff --git a/appveyor.yml b/appveyor.yml index 38531b18..e2dfdf27 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -14,14 +14,10 @@ environment: # a later point release. # See: http://www.appveyor.com/docs/installed-software#python - - PYTHON: "C:\\Python35" - PYTHON_VERSION: "3.5.x" # currently 3.5.2 + - PYTHON: "C:\\Python36" + PYTHON_VERSION: "3.6.x" # currently 3.6.6 PYTHON_ARCH: "32" - - PYTHON: "C:\\Python35-x64" - PYTHON_VERSION: "3.5.x" # currently 3.5.2 - PYTHON_ARCH: "64" - install: # If there is a newer build queued for the same PR, cancel this one. # The AppVeyor 'rollout builds' option is supposed to serve the same diff --git a/aw_datastore/__init__.py b/aw_datastore/__init__.py index 4200c9f3..b2536e23 100644 --- a/aw_datastore/__init__.py +++ b/aw_datastore/__init__.py @@ -10,11 +10,11 @@ # like ellipsises. See here: https://github.com/python/typing/issues/259 def get_storage_methods() -> Dict[str, Callable[[Any], storages.AbstractStorage]]: from .storages import MemoryStorage, MongoDBStorage, PeeweeStorage, SqliteStorage - methods = { + methods: Dict[str, Callable[[Any], storages.AbstractStorage]] = { PeeweeStorage.sid: PeeweeStorage, MemoryStorage.sid: MemoryStorage, SqliteStorage.sid: SqliteStorage, - } # type: Dict[str, Callable[[Any], storages.AbstractStorage]] + } # MongoDB is not supported on Windows or macOS if _platform.system() == "Linux": # pragma: no branch diff --git a/aw_datastore/datastore.py b/aw_datastore/datastore.py index fa9e68a7..68841c88 100644 --- a/aw_datastore/datastore.py +++ b/aw_datastore/datastore.py @@ -12,7 +12,7 @@ class Datastore: def __init__(self, storage_strategy: Callable[..., AbstractStorage], testing=False) -> None: self.logger = logger.getChild("Datastore") - self.bucket_instances = dict() # type: Dict[str, Bucket] + self.bucket_instances: Dict[str, Bucket] = dict() self.storage_strategy = storage_strategy(testing=testing) @@ -93,11 +93,11 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]: now = datetime.now(tz=timezone.utc) - inserted = None # type: Optional[Event] + inserted: Optional[Event] = None # Call insert if isinstance(events, Event): - oldest_event = events # type: Optional[Event] + oldest_event: Optional[Event] = events if events.timestamp + events.duration > now: self.logger.warning("Event inserted into bucket {} reaches into the future. Current UTC time: {}. Event data: {}".format(self.bucket_id, str(now), str(events))) inserted = self.ds.storage_strategy.insert_one(self.bucket_id, events) diff --git a/aw_datastore/storages/__init__.py b/aw_datastore/storages/__init__.py index 406a7c7b..bb0fb054 100644 --- a/aw_datastore/storages/__init__.py +++ b/aw_datastore/storages/__init__.py @@ -1,6 +1,6 @@ import logging as _logging -logger = _logging.getLogger(__name__) # type: _logging.Logger +logger: _logging.Logger = _logging.getLogger(__name__) from .abstract import AbstractStorage diff --git a/aw_datastore/storages/memory.py b/aw_datastore/storages/memory.py index c62ba131..96abc2e0 100644 --- a/aw_datastore/storages/memory.py +++ b/aw_datastore/storages/memory.py @@ -16,8 +16,8 @@ class MemoryStorage(AbstractStorage): def __init__(self, testing: bool) -> None: self.logger = logger.getChild(self.sid) # self.logger.warning("Using in-memory storage, any events stored will not be persistent and will be lost when server is shut down. Use the --storage parameter to set a different storage method.") - self.db = {} # type: Dict[str, List[Event]] - self._metadata = dict() # type: Dict[str, dict] + self.db: Dict[str, List[Event]] = {} + self._metadata: Dict[str, dict] = dict() def create_bucket(self, bucket_id, type_id, client, hostname, created, name=None) -> None: if not name: diff --git a/aw_datastore/storages/mongodb.py b/aw_datastore/storages/mongodb.py index fe67db4d..76abafdf 100644 --- a/aw_datastore/storages/mongodb.py +++ b/aw_datastore/storages/mongodb.py @@ -74,7 +74,7 @@ def get_metadata(self, bucket_id: str) -> dict: def get_events(self, bucket_id: str, limit: int, starttime: Optional[datetime] = None, endtime: Optional[datetime] = None): - query_filter = {} # type: Dict[str, dict] + query_filter: Dict[str, dict] = {} if starttime or endtime: query_filter["timestamp"] = {} if starttime: @@ -99,7 +99,7 @@ def get_events(self, bucket_id: str, limit: int, def get_eventcount(self, bucket_id: str, starttime: datetime = None, endtime: datetime = None) -> int: - query_filter = {} # type: Dict[str, dict] + query_filter: Dict[str, dict] = {} if starttime or endtime: query_filter["timestamp"] = {} if starttime: @@ -123,7 +123,7 @@ def insert_one(self, bucket: str, event: Event) -> Event: def insert_many(self, bucket: str, events: List[Event]): # .copy is needed because otherwise mongodb inserts a _id field into the event - dict_events = [self._transform_event(event.copy()) for event in events] # type: List[dict] + dict_events: List[dict] = [self._transform_event(event.copy()) for event in events] self.db[bucket]["events"].insert_many(dict_events) def delete(self, bucket_id: str, event_id) -> bool: diff --git a/aw_datastore/storages/peewee.py b/aw_datastore/storages/peewee.py index a177c3fc..0fe4fd3e 100644 --- a/aw_datastore/storages/peewee.py +++ b/aw_datastore/storages/peewee.py @@ -91,7 +91,7 @@ def __init__(self, testing: bool = True, filepath: str = None) -> None: self.db.connect() - self.bucket_keys = {} # type: Dict[str, int] + self.bucket_keys: Dict[str, int] = {} BucketModel.create_table(safe=True) EventModel.create_table(safe=True) self.update_bucket_keys() diff --git a/aw_query/functions.py b/aw_query/functions.py index 8e3af836..25489c02 100644 --- a/aw_query/functions.py +++ b/aw_query/functions.py @@ -1,3 +1,4 @@ +import re import iso8601 from typing import Callable, Dict, Any, List from inspect import signature @@ -12,6 +13,9 @@ filter_keyvals, filter_keyvals_regex, period_union, + categorize, + tag, + Rule, merge_events_by_keys, chunk_events_by_key, sort_by_timestamp, @@ -21,7 +25,7 @@ split_url_events, simplify_string, flood, - limit_events + limit_events, ) from .exceptions import QueryFunctionException @@ -48,7 +52,7 @@ def _verify_variable_is_type(variable, t): """ Declarations """ -functions = {} # type: Dict[str, TQueryFunction] +functions: Dict[str, TQueryFunction] = {} def q2_function(transform_func=None): @@ -279,3 +283,22 @@ def q2_simplify_window_titles(events: list, key: str) -> List[Event]: def q2_nop(): """No operation function for unittesting""" return 1 + + +""" + Classify +""" + + +@q2_function(categorize) +@q2_typecheck +def q2_categorize(events: list, classes: list): + classes = [(_cls, Rule(rule_dict)) for _cls, rule_dict in classes] + return categorize(events, classes) + + +@q2_function(tag) +@q2_typecheck +def q2_tag(events: list, classes: list): + classes = [(_cls, Rule(rule_dict)) for _cls, rule_dict in classes] + return tag(events, classes) diff --git a/aw_query/query2.py b/aw_query/query2.py index af92a091..86aa8d02 100644 --- a/aw_query/query2.py +++ b/aw_query/query2.py @@ -1,5 +1,5 @@ import logging -from typing import Union, List, Dict, Sequence, Callable, Type, Any +from typing import Union, List, Dict, Sequence, Callable, Type, Any, Tuple from datetime import datetime from aw_core.models import Event @@ -20,7 +20,7 @@ def parse(string: str, namespace: dict): raise NotImplementedError @staticmethod - def check(string: str): + def check(string: str) -> Tuple[str, str]: raise NotImplementedError @@ -207,7 +207,7 @@ def interpret(self, datastore: Datastore, namespace: dict): @staticmethod def parse(string: str, namespace: dict) -> QToken: entries_str = string[1:-1] - d = {} # type: Dict[str, QToken] + d: Dict[str, QToken] = {} while len(entries_str) > 0: entries_str = entries_str.strip() if len(d) > 0 and entries_str[0] == ",": @@ -272,7 +272,7 @@ def interpret(self, datastore: Datastore, namespace: dict): @staticmethod def parse(string: str, namespace: dict) -> QToken: entries_str = string[1:-1] - l = [] # type: List[QToken] + l: List[QToken] = [] while len(entries_str) > 0: entries_str = entries_str.strip() if len(l) > 0 and entries_str[0] == ",": @@ -314,8 +314,10 @@ def check(string: str): return string[:i], string[i + 1:] -qtypes = [QString, QInteger, QFunction, QDict, QList, QVariable] # type: Sequence[Type[QToken]] -def _parse_token(string: str, namespace: dict): # TODO: Add return type +qtypes: Sequence[Type[QToken]] = [QString, QInteger, QFunction, QDict, QList, QVariable] + + +def _parse_token(string: str, namespace: dict) -> Tuple[Tuple[Any, str], str]: # TODO: The whole parsing thing is shoddily written, needs a rewrite from ground-up if not isinstance(string, str): raise QueryParseException("Reached unreachable, cannot parse something that isn't a string") diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index 04342fcb..51050cba 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -7,10 +7,14 @@ from .split_url_events import split_url_events from .simplify import simplify_string from .flood import flood +from .classify import categorize, tag, Rule __all__ = [ 'flood', 'concat', + 'categorize', + 'tag', + 'Rule', 'period_union', 'filter_period_intersect', 'union', diff --git a/aw_transform/chunk_events_by_key.py b/aw_transform/chunk_events_by_key.py index a06a1033..2510183f 100644 --- a/aw_transform/chunk_events_by_key.py +++ b/aw_transform/chunk_events_by_key.py @@ -8,7 +8,7 @@ def chunk_events_by_key(events: List[Event], key: str, pulsetime: float=5.0) -> List[Event]: - chunked_events = [] # type: List[Event] + chunked_events: List[Event] = [] for event in events: if key not in event.data: break diff --git a/aw_transform/classify.py b/aw_transform/classify.py new file mode 100644 index 00000000..c2113987 --- /dev/null +++ b/aw_transform/classify.py @@ -0,0 +1,52 @@ +from typing import Pattern, List, Iterable, Tuple, Dict, Optional +from functools import reduce +import re + +from aw_core import Event + + +Tag = str +Category = List[str] + + +class Rule: + regex: Optional[Pattern] + + def __init__(self, rules: Dict[str, str]): + if "regex" in rules: + self.regex = re.compile(rules["regex"]) if rules["regex"] else None + + def match(self, e: Event): + for val in e.data.values(): + if isinstance(val, str): + if self.regex and self.regex.search(val): + return True + return False + + +def categorize(events: List[Event], classes: List[Tuple[Category, Rule]]): + return [_categorize_one(e, classes) for e in events] + + +def _categorize_one(e: Event, classes: List[Tuple[Category, Rule]]) -> Event: + e.data["$category"] = _pick_category([_cls for _cls, rule in classes if rule.match(e)]) + return e + + +def tag(events: List[Event], classes: List[Tuple[Tag, Rule]]): + return [_tag_one(e, classes) for e in events] + + +def _tag_one(e: Event, classes: List[Tuple[Tag, Rule]]) -> Event: + e.data["$tags"] = [_cls for _cls, rule in classes if rule.match(e)] + return e + + +def _pick_category(tags: Iterable[Category]) -> Category: + return reduce(_pick_deepest_cat, tags, ["Uncategorized"]) + + +def _pick_deepest_cat(t1: Category, t2: Category) -> Category: + # t1 will be the accumulator when used in reduce + # Always bias against t1, since it could be "Uncategorized" + return t2 if len(t2) >= len(t1) else t1 diff --git a/aw_transform/merge_events_by_keys.py b/aw_transform/merge_events_by_keys.py index 3fb9fb5f..ea9fdec9 100644 --- a/aw_transform/merge_events_by_keys.py +++ b/aw_transform/merge_events_by_keys.py @@ -11,12 +11,16 @@ def merge_events_by_keys(events, keys) -> List[Event]: # Call recursively until all keys are consumed if len(keys) < 1: return events - merged_events = {} # type: Dict[Tuple, Event] + merged_events: Dict[Tuple, Event] = {} for event in events: - composite_key = () # type: Tuple + composite_key: Tuple = () for key in keys: if key in event.data: - composite_key = composite_key + (event["data"][key],) + val = event["data"][key] + # Needed for when the value is a list, such as for categories + if isinstance(val, list): + val = tuple(val) + composite_key = composite_key + (val,) if composite_key not in merged_events: merged_events[composite_key] = Event( timestamp=event.timestamp, diff --git a/aw_transform/split_url_events.py b/aw_transform/split_url_events.py index c3d50bb3..666efbf0 100644 --- a/aw_transform/split_url_events.py +++ b/aw_transform/split_url_events.py @@ -1,31 +1,23 @@ import logging -from datetime import datetime, timedelta -from typing import List, Dict, Optional, Any -from copy import copy, deepcopy -import operator -from functools import reduce -from collections import defaultdict +from typing import List + +from urllib.parse import urlparse from aw_core.models import Event -from aw_core import TimePeriod logger = logging.getLogger(__name__) -from urllib.parse import urlparse - -def split_url_events(events): +def split_url_events(events: List[Event]) -> List[Event]: for event in events: if "url" in event.data: url = event.data["url"] parsed_url = urlparse(url) - event.data["protocol"] = parsed_url.scheme - event.data["domain"] = parsed_url.netloc - if event.data["domain"][:4] == "www.": - event.data["domain"] = event.data["domain"][4:] - event.data["path"] = parsed_url.path - event.data["params"] = parsed_url.params - event.data["options"] = parsed_url.query - event.data["identifier"] = parsed_url.fragment + event.data["$protocol"] = parsed_url.scheme + event.data["$domain"] = parsed_url.netloc[4:] if parsed_url.netloc[:4] == "www." else parsed_url.netloc + event.data["$path"] = parsed_url.path + event.data["$params"] = parsed_url.params + event.data["$options"] = parsed_url.query + event.data["$identifier"] = parsed_url.fragment # TODO: Parse user, port etc aswell return events diff --git a/tests/test_query2.py b/tests/test_query2.py index 6409d674..3ab874a3 100644 --- a/tests/test_query2.py +++ b/tests/test_query2.py @@ -308,6 +308,7 @@ def test_query2_query_functions(datastore): events = split_url_events(events); events = sort_by_timestamp(events); events = sort_by_duration(events); + events = categorize(events, [[["test", "subtest"], {{"regex": "^test"}}], [["test"], {{"regex": "^test"}}]]); duration = sum_durations(events); eventcount = query_bucket_eventcount(bid); asd = nop(); diff --git a/tests/test_transforms.py b/tests/test_transforms.py index f04dc184..665cd257 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -1,3 +1,5 @@ +import re +from pprint import pprint from datetime import datetime, timedelta, timezone from aw_core.models import Event @@ -14,6 +16,9 @@ split_url_events, simplify_string, union, + categorize, + tag, + Rule, ) @@ -195,9 +200,6 @@ def test_merge_events_by_keys_2(): assert result[2].duration == timedelta(seconds=8) -from pprint import pprint - - def test_chunk_events_by_key(): now = datetime.now(timezone.utc) events = [] @@ -232,32 +234,32 @@ def test_url_parse_event(): e = Event(data={"url": "http://asd.com/test/?a=1"}, timestamp=now, duration=timedelta(seconds=1)) result = split_url_events([e]) print(result) - assert result[0].data["protocol"] == "http" - assert result[0].data["domain"] == "asd.com" - assert result[0].data["path"] == "/test/" - assert result[0].data["params"] == "" - assert result[0].data["options"] == "a=1" - assert result[0].data["identifier"] == "" + assert result[0].data["$protocol"] == "http" + assert result[0].data["$domain"] == "asd.com" + assert result[0].data["$path"] == "/test/" + assert result[0].data["$params"] == "" + assert result[0].data["$options"] == "a=1" + assert result[0].data["$identifier"] == "" e2 = Event(data={"url": "https://www.asd.asd.com/test/test2/meh;meh2?asd=2&asdf=3#id"}, timestamp=now, duration=timedelta(seconds=1)) result = split_url_events([e2]) print(result) - assert result[0].data["protocol"] == "https" - assert result[0].data["domain"] == "asd.asd.com" - assert result[0].data["path"] == "/test/test2/meh" - assert result[0].data["params"] == "meh2" - assert result[0].data["options"] == "asd=2&asdf=3" - assert result[0].data["identifier"] == "id" + assert result[0].data["$protocol"] == "https" + assert result[0].data["$domain"] == "asd.asd.com" + assert result[0].data["$path"] == "/test/test2/meh" + assert result[0].data["$params"] == "meh2" + assert result[0].data["$options"] == "asd=2&asdf=3" + assert result[0].data["$identifier"] == "id" e3 = Event(data={"url": "file:///home/johan/myfile.txt"}, timestamp=now, duration=timedelta(seconds=1)) result = split_url_events([e3]) print(result) - assert result[0].data["protocol"] == "file" - assert result[0].data["domain"] == "" - assert result[0].data["path"] == "/home/johan/myfile.txt" - assert result[0].data["params"] == "" - assert result[0].data["options"] == "" - assert result[0].data["identifier"] == "" + assert result[0].data["$protocol"] == "file" + assert result[0].data["$domain"] == "" + assert result[0].data["$path"] == "/home/johan/myfile.txt" + assert result[0].data["$params"] == "" + assert result[0].data["$options"] == "" + assert result[0].data["$identifier"] == "" def test_union(): @@ -290,3 +292,37 @@ def test_union(): # union event lists with same timestamp but different duration duplicates events_union = union([e1, e2, e4], [e3, e2, e1]) assert events_union == [e1, e2, e3, e4] + + +def test_categorize(): + now = datetime.now(timezone.utc) + + classes = [ + (["Test", "Subtest"], Rule({"regex": "value$"})), + (["Test"], Rule({"regex": "^just"})), + ] + events = [ + Event(timestamp=now, duration=0, data={"key": "just a test value"}), + Event(timestamp=now, duration=0, data={}), + ] + events = categorize(events, classes) + + assert events[0].data["$category"] == ["Test", "Subtest"] + assert events[1].data["$category"] == ["Uncategorized"] + + +def test_tags(): + now = datetime.now(timezone.utc) + + classes = [ + ("Test", Rule({"regex": "value$"})), + ("Test", Rule({"regex": "^just"})), + ] + events = [ + Event(timestamp=now, duration=0, data={"key": "just a test value"}), + Event(timestamp=now, duration=0, data={}), + ] + events = tag(events, classes) + + assert len(events[0].data["$tags"]) == 2 + assert len(events[1].data["$tags"]) == 0