Skip to content
This repository was archived by the owner on Jan 17, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ nose = "==1.3.7"
nose-cov = "==1.6"
mock = "==2.0.0"
"autopep8" = "==1.3.1"
pylint = "==1.8.2"
pylint = "*"
mypy = "*"
flake8 = "*"

[requires]
python_version = "3.6"
556 changes: 301 additions & 255 deletions Pipfile.lock

Large diffs are not rendered by default.

9 changes: 2 additions & 7 deletions build.bat
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
cd reactivexcomponent

echo Running lint...
pylint reactivexcomponent --extension-pkg-whitelist=lxml -f parseable > pylint.out
IF ERRORLEVEL 1 (
echo Lint failed!
type pylint.out
EXIT /B 1
)
echo Running flake8...
flake8 reactivexcomponent > flake8.out

echo Running tests...
nosetests tests/unit --with-xunit --with-cov --cov reactivexcomponent --exe
Expand Down
6 changes: 3 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
cd reactivexcomponent

echo Running lint...
pylint reactivexcomponent --extension-pkg-whitelist=lxml -f parseable > pylint.out
flake8 reactivexcomponent > flake8.out

rc=$?; if [[ $rc != 0 ]]; then
echo Lint failed!
cat pylint.out
echo Flake8 failed!
cat flake8.out
exit $rc;
fi

Expand Down
3 changes: 3 additions & 0 deletions reactivexcomponent/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]

max-line-length=160
Empty file added reactivexcomponent/flake8.out
Empty file.
9 changes: 4 additions & 5 deletions reactivexcomponent/reactivexcomponent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import logging

from reactivexcomponent.xcomponent_api import XcomponentAPI
from typing import Any

__version__ = '1.0.0'

try:
from logging import NullHandler
from logging import NullHandler # type: ignore
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
class NullHandler(logging.Handler): # type: ignore
def emit(self, record: Any) -> None:
pass

logging.getLogger(__name__).addHandler(NullHandler())
60 changes: 41 additions & 19 deletions reactivexcomponent/reactivexcomponent/communication/publisher.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import json
from typing import Dict, Any
import websocket as WebSocket
from reactivexcomponent.configuration.serializer import to_websocket_input_format
from reactivexcomponent.configuration.api_configuration import format_fsharp_field
from reactivexcomponent.configuration.api_configuration import format_fsharp_field, APIConfiguration


class Publisher:

def __init__(self, apiconfiguration, websocket_instance):
def __init__(self, apiconfiguration: APIConfiguration, websocket_instance: WebSocket.WebSocketApp) -> None:
self.configuration = apiconfiguration
self.websocket = websocket_instance

def _header_config(self, component_code, state_machine_code, message_type):
def _header_config(self, component_code: int, state_machine_code: int, message_type: str) -> Dict[str, Any]:
return {"StateMachineCode": format_fsharp_field(state_machine_code),
"ComponentCode": format_fsharp_field(component_code),
"EventCode": self.configuration.get_publisher_details(component_code,
Expand All @@ -18,36 +19,50 @@ def _header_config(self, component_code, state_machine_code, message_type):
"IncomingType": 0,
"MessageType": format_fsharp_field(message_type)}

def _get_routing_key(self, component_code, state_machine_code, message_type):
def _get_routing_key(self, component_code: int, state_machine_code: int, message_type: str) -> str:
publisher_details = self.configuration.get_publisher_details(
component_code, state_machine_code, message_type)
return publisher_details['routingKey']

def _data_to_send(self, component_name, state_machine_name, message_type, json_message):
def _data_to_send(self, component_name: str,
state_machine_name: str, message_type: str, json_message: Dict[str, Any]) -> Dict[str, Any]:
component_code = self.configuration.get_component_code(component_name)
state_machine_code = self.configuration.get_state_machine_code(
component_name, state_machine_name)
header_config = self._header_config(component_code, state_machine_code, message_type)
routing_key = self._get_routing_key(component_code, state_machine_code, message_type)
header_config = self._header_config(
component_code, state_machine_code, message_type)
routing_key = self._get_routing_key(
component_code, state_machine_code, message_type)
return {
"RoutingKey": routing_key,
"ComponentCode": component_code,
"Event": {"Header": header_config, "JsonMessage": json.dumps(json_message)}
}

def can_publish(self, component_name, state_machine_name, message_type):
def can_publish(self, component_name: str, state_machine_name: str, message_type: str) -> bool:
if self.configuration.contains_state_machine(component_name, state_machine_name):
component_code = self.configuration.get_component_code(component_name)
state_machine_code = self.configuration.get_state_machine_code(component_name, state_machine_name)
component_code = self.configuration.get_component_code(
component_name)
state_machine_code = self.configuration.get_state_machine_code(
component_name, state_machine_name)
return self.configuration.contains_publisher(component_code, state_machine_code, message_type)
return False

def send_message(self, component_name, state_machine_name, message_type, json_message):
def send_message(self,
component_name: str,
state_machine_name: str,
message_type: str,
json_message: Dict[str, Any]) -> None:
data = self._data_to_send(
component_name, state_machine_name, message_type, json_message)
self.websocket.send(to_websocket_input_format(data))

def _header_config_ref(self, component_code, state_machine_code, message_type, state_machine_id, agent_id):

def _header_config_ref(self,
component_code: int,
state_machine_code: int,
message_type: str,
state_machine_id: int,
agent_id: int) -> Dict[str, Any]:
return {
"StateMachineId": format_fsharp_field(state_machine_id),
"AgentId": format_fsharp_field(agent_id),
Expand All @@ -58,14 +73,17 @@ def _header_config_ref(self, component_code, state_machine_code, message_type, s
message_type)["eventCode"],
"IncomingType": 0,
"MessageType": format_fsharp_field(message_type)
}
}

def _data_to_send_with_state_machine_ref(self, state_machine_ref, message_type, json_message):
def _data_to_send_with_state_machine_ref(self, state_machine_ref: Dict[str, Any],
message_type: str,
json_message: Dict[str, Any]) -> Dict[str, Any]:
component_code = state_machine_ref["ComponentCode"]
state_machine_code = state_machine_ref["StateMachineCode"]
header_config = self._header_config_ref(component_code, state_machine_code, message_type,
state_machine_ref["StateMachineId"], state_machine_ref["AgentId"])
routing_key = self._get_routing_key(component_code, state_machine_code, message_type)
routing_key = self._get_routing_key(
component_code, state_machine_code, message_type)
return {
"RoutingKey": routing_key,
"ComponentCode": component_code,
Expand All @@ -75,6 +93,10 @@ def _data_to_send_with_state_machine_ref(self, state_machine_ref, message_type,
}
}

def send_with_state_machine_ref(self, state_machine_ref, message_type, json_message):
data = self._data_to_send_with_state_machine_ref(state_machine_ref, message_type, json_message)
def send_with_state_machine_ref(self,
state_machine_ref: Dict[str, Any],
message_type: str,
json_message: Dict[str, Any]) -> None:
data = self._data_to_send_with_state_machine_ref(
state_machine_ref, message_type, json_message)
self.websocket.send(to_websocket_input_format(data))
89 changes: 50 additions & 39 deletions reactivexcomponent/reactivexcomponent/communication/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
import json
import websocket as WebSocket
from rx.subjects import Subject
from rx import Observable
from typing import Dict, Any, List
from reactivexcomponent.communication.publisher import Publisher
from reactivexcomponent.configuration.websocket_bridge_configuration import EventType, Command, WebsocketTopicKind
from reactivexcomponent.configuration.serializer import command_data_websocket_format, get_header_with_incoming_type
from reactivexcomponent.configuration.serializer import deserialize, get_json_data
from reactivexcomponent.configuration.serializer import command_data_websocket_format, get_header_with_incoming_type, deserialize, get_json_data
from reactivexcomponent.configuration.api_configuration import APIConfiguration


def get_data_to_send(topic, kind):
def get_data_to_send(topic: str, kind: int) -> Dict[str, Any]:
return {"Header": get_header_with_incoming_type(),
"JsonMessage": json.dumps({
"Topic": {"Key": topic, "Kind": kind}
})
}
}


def is_same_state_machine(json_data: Dict[str, Any], state_machine_code: int) -> bool:
return json_data["stateMachineRef"]["StateMachineCode"] == state_machine_code

def is_same_state_machine(json_data, state_machine_code):
same_state_machine = (json_data["stateMachineRef"]["StateMachineCode"] == state_machine_code)
return same_state_machine

def is_same_component(json_data, component_code):
same_component = (json_data["stateMachineRef"]["ComponentCode"] == component_code)
return same_component
def is_same_component(json_data: Dict[str, Any], component_code: int) -> bool:
return json_data["stateMachineRef"]["ComponentCode"] == component_code

def is_subscribed(subscribed_state_machines, component_name, state_machine_name):
subscribed = (component_name in subscribed_state_machines) and (

def is_subscribed(subscribed_state_machines: List[str], component_name: str, state_machine_name: str) -> bool:
return (component_name in subscribed_state_machines) and (
state_machine_name in subscribed_state_machines[component_name])
return subscribed


class Subscriber:

def __init__(self, apiconfiguration, websocket_instance, subject, reply_publisher):
def __init__(self, apiconfiguration: APIConfiguration, websocket_instance: WebSocket.WebSocketApp, subject: Subject, reply_publisher: Publisher) -> None:
self.configuration = apiconfiguration
self.websocket = websocket_instance
self.subscribed_state_machines = {}
self.subscribed_state_machines: List[str] = {}
self.subject = subject
self.observable_subscribers = []
self.observable_subscribers: List[Subject] = []
self.reply_publisher = reply_publisher

def _json_data_from_event(self, data):
def _json_data_from_event(self, data: Dict[str, Any]) -> Dict[str, Any]:
json_data = get_json_data(data)
component_code = json_data["Header"]["ComponentCode"]["Fields"][0]
state_machine_code = json_data["Header"]["StateMachineCode"]["Fields"][0]
Expand All @@ -54,30 +58,30 @@ def _json_data_from_event(self, data):
"jsonMessage": json.loads(json_data["JsonMessage"])
}

def _prepare_state_machine_updates(self, component_name, state_machine_name):
def _prepare_state_machine_updates(self, component_name: str, state_machine_name: str) -> Subject:
component_code = self.configuration.get_component_code(component_name)
state_machine_code = self.configuration.get_state_machine_code(
component_name, state_machine_name)
filtered_observable = self.subject.map(deserialize) \
.filter(lambda data: data["command"] == Command.update) \
.map(lambda data: self._json_data_from_event(data["stringData"])) \
.filter(lambda json_data: is_same_component(json_data, component_code) and
is_same_state_machine(json_data, state_machine_code))
.filter(lambda data: data["command"] == Command.update) \
.map(lambda data: self._json_data_from_event(data["stringData"])) \
.filter(lambda json_data: is_same_component(json_data, component_code) and
is_same_state_machine(json_data, state_machine_code))
return filtered_observable

def _add_subscribe_state_machine(self, component_name, state_machine_name):
def _add_subscribe_state_machine(self, component_name: str, state_machine_name: str) -> None:
self.subscribed_state_machines[component_name] = (
self.subscribed_state_machines).get(component_name, [])
(self.subscribed_state_machines[component_name]).append(
state_machine_name)

def _send_subscribe_request_to_topic(self, topic, kind):
def _send_subscribe_request_to_topic(self, topic: str, kind: int) -> None:
data = get_data_to_send(topic, kind)
command_data = {"Command": Command.subscribe, "Data": data}
input_data = command_data_websocket_format(command_data)
self.websocket.send(input_data)

def _send_subscribe_request(self, component_name, state_machine_name):
def _send_subscribe_request(self, component_name: str, state_machine_name: str) -> None:
component_code = self.configuration.get_component_code(component_name)
state_machine_code = self.configuration.get_state_machine_code(
component_name, state_machine_name)
Expand All @@ -86,41 +90,48 @@ def _send_subscribe_request(self, component_name, state_machine_name):
self._send_subscribe_request_to_topic(topic, WebsocketTopicKind.Public)
self._add_subscribe_state_machine(component_name, state_machine_name)

def can_subscribe(self, component_name, state_machine_name):
def can_subscribe(self, component_name: str, state_machine_name: str) -> bool:
component_code = self.configuration.get_component_code(component_name)
state_machine_code = self.configuration.get_state_machine_code(component_name, state_machine_name)
state_machine_code = self.configuration.get_state_machine_code(
component_name, state_machine_name)
return self.configuration.contains_subscriber(component_code, state_machine_code, EventType.Update)

def get_state_machine_updates(self, component_name, state_machine_name):
filtered_observable = self._prepare_state_machine_updates(component_name, state_machine_name)
def get_state_machine_updates(self, component_name: str, state_machine_name: str) -> Subject:
filtered_observable = self._prepare_state_machine_updates(
component_name, state_machine_name)
self._send_subscribe_request(component_name, state_machine_name)
return filtered_observable

def subscribe(self, component_name, state_machine_name, state_machine_update_listener):
def subscribe(self, component_name: str, state_machine_name: str, state_machine_update_listener: Observable) -> None:
observable_subscriber = self._prepare_state_machine_updates(component_name, state_machine_name)\
.subscribe(state_machine_update_listener)
self.observable_subscribers.append(observable_subscriber)
self._send_subscribe_request(component_name, state_machine_name)

def dispose_observable_subscribers(self):
def dispose_observable_subscribers(self) -> None:
for i in range(len(self.observable_subscribers)):
self.observable_subscribers[i].dispose()
self.observable_subscribers = []

def remove_subscribed_statemachine(self, component_name, state_machine_name):
index = self.subscribed_state_machines[component_name].index(state_machine_name)
def remove_subscribed_statemachine(self, component_name: str, state_machine_name: str) -> None:
index = self.subscribed_state_machines[component_name].index(
state_machine_name)
del self.subscribed_state_machines[component_name][index]

def unsubscribe(self, component_name, state_machine_name):
def unsubscribe(self, component_name: str, state_machine_name: str) -> None:
if is_subscribed(self.subscribed_state_machines, component_name, state_machine_name):
component_code = self.configuration.get_component_code(component_name)
state_machine_code = self.configuration.get_state_machine_code(component_name, state_machine_name)
topic = self.configuration.get_subscriber_topic(component_code, state_machine_code, EventType.Update)
component_code = self.configuration.get_component_code(
component_name)
state_machine_code = self.configuration.get_state_machine_code(
component_name, state_machine_name)
topic = self.configuration.get_subscriber_topic(
component_code, state_machine_code, EventType.Update)
kind = WebsocketTopicKind.Public
data = get_data_to_send(topic, kind)
command_data = {
"Command": Command.unsubscribe,
"Data": data
}
self.websocket.send(command_data_websocket_format(command_data))
self.remove_subscribed_statemachine(component_name, state_machine_name)
self.remove_subscribed_statemachine(
component_name, state_machine_name)
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Callable, Any
from reactivexcomponent.communication.xc_session import XcSession


class XcConnection:
def __init__(self) -> None:
pass

def __init__(self):
self.session = None

def create_connection(self, xc_api, server_url, callback):
self.session = XcSession()
def create_connection(self, xc_api: str, server_url: str, callback: Callable[[Any, Any], None]) -> None:
self.session: XcSession = XcSession()
self.session.init(xc_api, server_url, callback)
Loading