Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gridappsd/field_interface/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List

from gridappsd.field_interface.context import ContextManager
Comment thread
craigpnnl marked this conversation as resolved.
from gridappsd.field_interface.context import LocalContext
from gridappsd.field_interface.interfaces import MessageBusDefinition

__all__: List[str] = [
"ContextManager",
"LocalContext",
"MessageBusDefinition"
]
268 changes: 180 additions & 88 deletions gridappsd/field_interface/agents/agents.py

Large diffs are not rendered by default.

56 changes: 36 additions & 20 deletions gridappsd/field_interface/context.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,52 @@
from gridappsd import GridAPPSD
from gridappsd.field_interface.interfaces import FieldMessageBus
import dataclasses
import gridappsd.topics as t
import json

request_field_queue_prefix = 'goss.gridappsd.process.request.field'
request_field_context_queue = request_field_queue_prefix + '.context'


class ContextManager:

class LocalContext:
@classmethod
def get_context_by_feeder(cls, feeder_mrid, area_id=None):
gridappsd_obj = GridAPPSD()

request = {'modelId': feeder_mrid,
def get_context_by_feeder(cls, downstream_message_bus: FieldMessageBus, feeder_mrid, area_id=None):

request = {'request_type' : 'get_context',
'modelId': feeder_mrid,
'areaId': area_id}

response = gridappsd_obj.get_response(request_field_context_queue, request)
response = downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request, timeout=10)
return response

@classmethod
def get_context_by_message_bus(cls, downstream_message_bus_id):
def get_context_by_message_bus(cls, downstream_message_bus: FieldMessageBus):
"""
return agents/devices based on upstream and/or downstream message bus as input
make message bus id a list

based on filter return distributed agents for different applications as well
"""
gridappsd_obj = GridAPPSD()
return agents/devices based on downstream message bus as input

request = {'downstream_message_bus_id': downstream_message_bus_id,
"""
request = {'request_type' : 'get_context',
'downstream_message_bus_id': downstream_message_bus.id,
'agents': True,
'devices': True}
return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request)

@classmethod
def register_agent(cls, downstream_message_bus: FieldMessageBus, upstream_message_bus: FieldMessageBus, agent):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the base type for the agent? it would be good for that to be here as the helper.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That leads to circular dependency so will have to change in another way.

"""
Sends the newly created distributed agent's info to OT bus

return gridappsd_obj.get_response(request_field_context_queue, request)
"""
request = {'request_type' : 'register_agent',
'agent' : agent.get_registration_details()}
downstream_message_bus.send(t.context_request_queue(downstream_message_bus.id), request)
upstream_message_bus.send(t.context_request_queue(upstream_message_bus.id), request)

@classmethod
def get_agents(cls, downstream_message_bus: FieldMessageBus):
"""
Sends the newly created distributed agent's info to OT bus

"""
request = {'request_type' : 'get_agents'}
return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request)

# Provide context based on router (ip trace) or PKI
# Maybe able to emulate/simulate
21 changes: 15 additions & 6 deletions gridappsd/field_interface/gridappsd_field_bus.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from gridappsd.field_interface.interfaces import MessageBusDefinition
from gridappsd.field_interface.interfaces import FeederMessageBus
from gridappsd import GridAPPSD
from gridappsd.field_interface.interfaces import FieldMessageBus
from gridappsd.field_interface.interfaces import MessageBusDefinition
from typing import Any


class GridAPPSDMessageBus(FeederMessageBus):
class GridAPPSDMessageBus(FieldMessageBus):
def __init__(self, definition: MessageBusDefinition):
super(GridAPPSDMessageBus, self).__init__(definition)
self._id = definition.id
Expand Down Expand Up @@ -36,12 +37,20 @@ def subscribe(self, topic, callback):
def unsubscribe(self, topic):
pass

def publish(self, data, topic: str = None):
def send(self, topic: str, message: Any):
"""
Publish device specific data to the concrete message bus.
Comment thread
craigpnnl marked this conversation as resolved.
"""
pass

if self.gridappsd_obj is not None:
self.gridappsd_obj.send(topic, message)

def get_response(self, topic, message, timeout=5):
Comment thread
craigpnnl marked this conversation as resolved.
"""
Sends a message on a specific concrete queue, waits and returns the response
"""
if self.gridappsd_obj is not None:
return self.gridappsd_obj.get_response(topic, message, timeout)

def disconnect(self):
"""
Disconnect the device from the concrete message bus.
Expand Down
191 changes: 10 additions & 181 deletions gridappsd/field_interface/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import gridappsd.topics as t
import logging
from os import PathLike
from pathlib import Path
Expand Down Expand Up @@ -164,194 +165,24 @@ def unsubscribe(self, topic):
pass

@abstractmethod
def publish(self, data, topic: str = None):
def send(self, topic, message):
"""
Publish device specific data to the concrete message bus.
"""
pass

@abstractmethod
def disconnect(self):
"""
Disconnect the device from the concrete message bus.
"""
pass


class SwitchAreaMessageBus:
def __init__(self, config: MessageBusDefinition):
self._devices = dict()
self._is_ot_bus = config.is_ot_bus
self._id = config.id

@property
def id(self):
return self._id

@property
def is_ot_bus(self):
return self._is_ot_bus

def add_device(self, device: "DeviceFieldInterface"):
self._devices[device.id] = device

def disconnect_device(self, id: str):
del self._devices[id]

@abstractmethod
def query_devices(self) -> dict:
pass

@abstractmethod
def is_connected(self) -> bool:
"""
Is this object connected to the message bus
"""
pass

@abstractmethod
def connect(self):
"""
Connect to the concrete message bus that implements this interface.
"""
pass

@abstractmethod
def subscribe(self, topic, callback):
pass

@abstractmethod
def unsubscribe(self, topic):
pass

@abstractmethod
def publish(self, data, topic: str = None):
"""
Publish device specific data to the concrete message bus.
"""
pass

@abstractmethod
def disconnect(self):
"""
Disconnect the device from the concrete message bus.
"""
pass


class SecondaryMessageBus:
def __init__(self, config: MessageBusDefinition):
self._devices = dict()
self._is_ot_bus = config.is_ot_bus
self._id = config.id

@property
def id(self):
return self._id

@property
def is_ot_bus(self):
return self._is_ot_bus

def add_device(self, device: "DeviceFieldInterface"):
self._devices[device.id] = device

def disconnect_device(self, id: str):
del self._devices[id]

@abstractmethod
def query_devices(self) -> dict:
pass

@abstractmethod
def is_connected(self) -> bool:
"""
Is this object connected to the message bus
"""
pass

@abstractmethod
def connect(self):
"""
Connect to the concrete message bus that implements this interface.
"""
pass

@abstractmethod
def subscribe(self, topic, callback):
pass

@abstractmethod
def unsubscribe(self, topic):
pass

@abstractmethod
def publish(self, data, topic: str = None):
"""
Publish device specific data to the concrete message bus.
"""
pass

@abstractmethod
def disconnect(self):
"""
Disconnect the device from the concrete message bus.
Publish device specific message to the concrete message bus.
"""
pass


class FeederMessageBus:
def __init__(self, config: MessageBusDefinition):
self._devices = dict()
self._is_ot_bus = config.is_ot_bus
self._id = config.id

@property
def id(self):
return self._id

@property
def is_ot_bus(self):
return self._is_ot_bus

def add_device(self, device: "DeviceFieldInterface"):
self._devices[device.id] = device

def disconnect_device(self, id: str):
del self._devices[id]

@abstractmethod
def query_devices(self) -> dict:
pass

@abstractmethod
def is_connected(self) -> bool:
"""
Is this object connected to the message bus
"""
pass

@abstractmethod
def connect(self):
def get_response(self, topic, message, timeout):
"""
Connect to the concrete message bus that implements this interface.
Sends a message on a specific queue, waits and returns the response
"""
pass

@abstractmethod
def subscribe(self, topic, callback):
pass

@abstractmethod
def unsubscribe(self, topic):
pass

@abstractmethod
def publish(self, data, topic: str = None):

def get_agent_response(self, agent_id, message, timeout):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstractmethods don't have implementation....

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed abstract.

"""
Publish device specific data to the concrete message bus.
Sends a message on a specific agent's request queue, waits and returns the response
"""
pass
topic = "{}.request.{}.{}".format(t.BASE_FIELD_QUEUE,self.id, agent_id)
self.get_response(topic, message, timeout)

@abstractmethod
def disconnect(self):
Expand All @@ -361,8 +192,6 @@ def disconnect(self):
pass




class MessageBusDefinitions:
def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion gridappsd/goss.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def __init__(self):
self._thread.start()

def run_callbacks(self):
_log.info("Starting thread queue")
_log.debug("Starting thread queue")
while True:
cb, hdrs, msg = self._queue_callerback.get()
try:
Expand Down
Loading