diff --git a/gridappsd/field_interface/__init__.py b/gridappsd/field_interface/__init__.py index 2181da7..66eba6b 100644 --- a/gridappsd/field_interface/__init__.py +++ b/gridappsd/field_interface/__init__.py @@ -1,9 +1,9 @@ from typing import List -from gridappsd.field_interface.context import ContextManager +from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.interfaces import MessageBusDefinition __all__: List[str] = [ - "ContextManager", + "LocalContext", "MessageBusDefinition" ] diff --git a/gridappsd/field_interface/agents/agents.py b/gridappsd/field_interface/agents/agents.py index 6436733..68ae39b 100644 --- a/gridappsd/field_interface/agents/agents.py +++ b/gridappsd/field_interface/agents/agents.py @@ -1,46 +1,49 @@ -from typing import Dict -import cimlab.data_profile.cimext_2022 as cim - -from abc import abstractmethod -from dataclasses import dataclass, field +import dataclasses import importlib +import json import logging +from dataclasses import dataclass, field +from datetime import datetime +from typing import Dict -from gridappsd.field_interface.context import ContextManager - -from cimlab.loaders import Parameter, ConnectionParameters -from cimlab.loaders.gridappsd import GridappsdConnection, get_topology_response -from cimlab.models import SwitchArea, SecondaryArea, DistributedModel +from cimgraph.loaders import ConnectionParameters, gridappsd +from cimgraph.loaders.gridappsd import GridappsdConnection +from cimgraph.models import DistributedModel, SecondaryArea, SwitchArea +import gridappsd.topics as t +from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus -from gridappsd.field_interface.interfaces import MessageBusDefinition - -import cimlab.data_profile.cimext_2022 as cim -from cimlab.loaders import Parameter, ConnectionParameters -from cimlab.loaders import gridappsd -from cimlab.loaders.gridappsd import GridappsdConnection, get_topology_response -from cimlab.models import SwitchArea, SecondaryArea, DistributedModel - +from gridappsd.field_interface.interfaces import (FieldMessageBus, + MessageBusDefinition) cim = None sparql = None - _log = logging.getLogger(__name__) + def set_cim_profile(cim_profile): global cim - cim = importlib.import_module('cimlab.data_profile.' + cim_profile) + cim = importlib.import_module('cimgraph.data_profile.' + cim_profile) gridappsd.set_cim_profile(cim_profile) - -class DistributedAgent: +@dataclass +class AgentRegistrationDetails: + agent_id: str + app_id: str + description: str + upstream_message_bus_id: FieldMessageBus.id + downstream_message_bus_id: FieldMessageBus.id + + +class DistributedAgent: def __init__(self, upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - agent_dict=None, + agent_config, + agent_area_dict=None, simulation_id=None, cim_profile: str = None): """ @@ -52,18 +55,29 @@ def __init__(self, self.downstream_message_bus = None self.simulation_id = simulation_id self.context = None + + #TODO: Change params and connection to local connection self.params = ConnectionParameters() self.connection = GridappsdConnection(self.params) + self.app_id = agent_config['app_id'] + self.description = agent_config['description'] + dt = datetime.now() + ts = datetime.timestamp(dt) + self.agent_id = "da_" + self.app_id + "_" + str(int(ts)) + self.agent_area_dict = agent_area_dict + if upstream_message_bus_def is not None: if upstream_message_bus_def.is_ot_bus: - self.upstream_message_bus = GridAPPSDMessageBus(upstream_message_bus_def) + self.upstream_message_bus = GridAPPSDMessageBus( + upstream_message_bus_def) # else: # self.upstream_message_bus = VolttronMessageBus(upstream_message_bus_def) if downstream_message_bus_def is not None: if downstream_message_bus_def.is_ot_bus: - self.downstream_message_bus = GridAPPSDMessageBus(downstream_message_bus_def) + self.downstream_message_bus = GridAPPSDMessageBus( + downstream_message_bus_def) # else: # self.downstream_message_bus = VolttronMessageBus(downstream_message_bus_def) @@ -73,35 +87,123 @@ def __init__(self, # self.addressable_equipments = agent_dict['addressable_equipment'] # self.unaddressable_equipments = agent_dict['unaddressable_equipment'] - @classmethod - def from_feeder(cls, feeder_id, area_id): - context = ContextManager.get_context_by_feeder(feeder_id, area_id) - return cls(context.get('upstream_message_bus_def'), context.get('downstream_message_bus_def')) - def connect(self): + + if self.agent_area_dict is None: + context = LocalContext.get_context_by_message_bus( + self.downstream_message_bus) + self.agent_area_dict = context['data'] + if self.upstream_message_bus is not None: self.upstream_message_bus.connect() if self.downstream_message_bus is not None: self.downstream_message_bus.connect() if self.downstream_message_bus is None and self.upstream_message_bus is None: - raise ValueError("Either upstream or downstream bus must be specified!") + raise ValueError( + "Either upstream or downstream bus must be specified!") + self.subscribe_to_measurement() + self.subscribe_to_messages() + self.subscribe_to_requests() + + if ('context_manager' not in self.app_id): + LocalContext.register_agent(self.downstream_message_bus, + self.upstream_message_bus, self) def subscribe_to_measurement(self): if self.simulation_id is None: - self.downstream_message_bus.subscribe(f"fieldbus/{self.downstream_message_bus.id}", self.on_measurement) + self.downstream_message_bus.subscribe( + t.field_output_topic(self.downstream_message_bus.id), + self.on_measurement) else: - topic = f"/topic/goss.gridappsd.field.simulation.output.{self.simulation_id}.{self.downstream_message_bus.id}" - _log.debug(f"subscribing to sim_output on topic {topic}") + topic = t.field_output_topic(self.downstream_message_bus.id, + self.simulation_id) + _log.debug(f"subscribing to simulation output on topic {topic}") self.downstream_message_bus.subscribe(topic, self.on_simulation_output) + def subscribe_to_messages(self): + + self.downstream_message_bus.subscribe( + t.field_message_bus_topic(self.downstream_message_bus), + self.on_downstream_message) + self.downstream_message_bus.subscribe( + t.field_message_bus_topic(self.upstream_message_bus), + self.on_upstream_message) + + _log.debug( + f"Subscribing to messages on application topics: \n {t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id)} \ + \n {t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_message_bus_app_topic(self.downstream_message_bus.id, + self.app_id), + self.on_downstream_message) + self.downstream_message_bus.subscribe( + t.field_message_bus_app_topic(self.upstream_message_bus.id, + self.app_id), + self.on_upstream_message) + + _log.debug( + f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_message_bus_agent_topic(self.downstream_message_bus.id, + self.agent_id), + self.on_downstream_message) + self.downstream_message_bus.subscribe( + t.field_message_bus_agent_topic(self.upstream_message_bus.id, + self.agent_id), + self.on_upstream_message) + + def subscribe_to_requests(self): + + _log.debug( + f"Subscribing to requests on agents queue: \n {t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_agent_request_queue(self.downstream_message_bus.id, + self.agent_id), + self.on_request_from_downstream) + self.downstream_message_bus.subscribe( + t.field_agent_request_queue(self.upstream_message_bus.id, + self.agent_id), + self.on_request_from_uptream) + def on_measurement(self, headers: Dict, message) -> None: - raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") def on_simulation_output(self, headers, message): self.on_measurement(headers=headers, message=message) + def on_upstream_message(self, headers: Dict, message) -> None: + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") + + def on_downstream_message(self, headers: Dict, message) -> None: + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") + + def on_request_from_uptream(self, headers: Dict, message): + self.on_request(self.upstream_message_bus, headers, message) + + def on_request_from_downstream(self, headers: Dict, message): + self.on_request(self.downstream_message_bus, headers, message) + + def on_request(self, message_bus, headers: Dict, message): + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") + + def get_registration_details(self): + details = AgentRegistrationDetails(str(self.agent_id), self.app_id, + self.description, + self.upstream_message_bus.id, + self.downstream_message_bus.id) + return dataclasses.asdict(details) + ''' TODO this has not been implemented yet, so we are commented them out for now. # not all agent would use this @@ -110,20 +212,6 @@ def on_control(self, control): command = control.get('command') self.control_device(device_id, command) - def publish_to_upstream_bus(self,output): - self.switch_message_bus.publish(self.output_topic, output) - - # could be and upstream or peer level agent - def publish_to_upstream_bus_agent(self,agent_id, output): - self.switch_message_bus.publish(self.topic.agent_id, output) - - def publish_to_downstream_bus(self,message): - self.secondary_message_bus.publish(self.topic, message) - - # downstream agent - def publish_to_downstream_bus_agent(self,agent_id, message): - self.secondary_message_bus.publish(self.topic.agent_id, message) - def control_device(self, device_id, command): device_topic = self.devices.get(device_id) self.secondary_message_bus.publish(device_topic, command)''' @@ -131,46 +219,57 @@ def control_device(self, device_id, command): class FeederAgent(DistributedAgent): - def __init__(self, upstream_message_bus_def: MessageBusDefinition, + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - feeder_dict=None, simulation_id=None): - super(FeederAgent, self).__init__(upstream_message_bus_def, - downstream_message_bus_def, - feeder_dict, simulation_id) - + agent_config: Dict, + feeder_dict=None, + simulation_id=None): + super(FeederAgent, + self).__init__(upstream_message_bus_def, + downstream_message_bus_def, agent_config, + feeder_dict, simulation_id) + if feeder_dict is not None: feeder = cim.Feeder(mRID=downstream_message_bus_def.id) - - self.feeder_area = DistributedModel(connection=self.connection, feeder=feeder, topology=feeder_dict) + self.feeder_area = DistributedModel(connection=self.connection, + feeder=feeder, + topology=feeder_dict) class SwitchAreaAgent(DistributedAgent): - def __init__(self, upstream_message_bus_def: MessageBusDefinition, + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - switch_area_dict=None, simulation_id=None): - - super().__init__(upstream_message_bus_def, - downstream_message_bus_def, - switch_area_dict, simulation_id) - + agent_config: Dict, + switch_area_dict=None, + simulation_id=None): + + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, switch_area_dict, simulation_id) + if switch_area_dict is not None: - self.switch_area = SwitchArea(downstream_message_bus_def.id, self.connection) + self.switch_area = SwitchArea(downstream_message_bus_def.id, + self.connection) self.switch_area.initialize_switch_area(switch_area_dict) - + class SecondaryAreaAgent(DistributedAgent): - def __init__(self, upstream_message_bus_def: MessageBusDefinition, + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - secondary_area_dict=None, simulation_id=None): - - super().__init__(upstream_message_bus_def, - downstream_message_bus_def, - secondary_area_dict, simulation_id) + agent_config: Dict, + secondary_area_dict=None, + simulation_id=None): + + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, secondary_area_dict, simulation_id) if secondary_area_dict is not None: - self.secondary_area = SecondaryArea(downstream_message_bus_def.id, self.connection) + self.secondary_area = SecondaryArea(downstream_message_bus_def.id, + self.connection) self.secondary_area.initialize_secondary_area(secondary_area_dict) @@ -185,12 +284,19 @@ class CoordinatingAgent: upstream, peer , downstream and broadcast """ - def __init__(self, feeder_id, system_message_bus_def: MessageBusDefinition, simulation_id=None): + def __init__(self, + feeder_id, + system_message_bus_def: MessageBusDefinition, + simulation_id=None): self.feeder_id = feeder_id self.distributed_agents = [] self.system_message_bus = GridAPPSDMessageBus(system_message_bus_def) self.system_message_bus.connect() + + #This will change when we have multiple feeders per system + self.downstream_message_bus = self.system_message_bus + # self.context = ContextManager.getContextByFeeder(self.feeder_id) # print(self.context) # self.addressable_equipments = self.context['data']['addressable_equipment'] @@ -205,20 +311,6 @@ def spawn_distributed_agent(self, distributed_agent: DistributedAgent): ''' - def on_message_from_feeder_bus(self, message): - pass - - def subscribe_to_distribution_bus(self, topic): - #self.system_message_bus.subscribe("/topic/goss.gridappsd.field."+self.feeder_id, - self.on_message_from_feeder_bus) - self.system_message_bus.subscribe(topic, self.on_message_from_feeder_bus) - - def subscribe_to_feeder_bus(self, topic): - self.system_message_bus.subscribe(topic, self.on_message_from_feeder_bus) - - def on_measurement(self, measurements): - print(measurements) - def on_control(self, control): device_id = control.get('device') command = control.get('command') diff --git a/gridappsd/field_interface/context.py b/gridappsd/field_interface/context.py index ae1c882..d6fa6c9 100644 --- a/gridappsd/field_interface/context.py +++ b/gridappsd/field_interface/context.py @@ -1,36 +1,52 @@ -from gridappsd import GridAPPSD +from gridappsd.field_interface.interfaces import FieldMessageBus +import dataclasses +import gridappsd.topics as t +import json -request_field_queue_prefix = 'goss.gridappsd.process.request.field' -request_field_context_queue = request_field_queue_prefix + '.context' -class ContextManager: - +class LocalContext: + @classmethod - def get_context_by_feeder(cls, feeder_mrid, area_id=None): - gridappsd_obj = GridAPPSD() - - request = {'modelId': feeder_mrid, + def get_context_by_feeder(cls, downstream_message_bus: FieldMessageBus, feeder_mrid, area_id=None): + + request = {'request_type' : 'get_context', + 'modelId': feeder_mrid, 'areaId': area_id} - - response = gridappsd_obj.get_response(request_field_context_queue, request) + response = downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request, timeout=10) return response @classmethod - def get_context_by_message_bus(cls, downstream_message_bus_id): + def get_context_by_message_bus(cls, downstream_message_bus: FieldMessageBus): """ - return agents/devices based on upstream and/or downstream message bus as input - make message bus id a list - - based on filter return distributed agents for different applications as well - """ - gridappsd_obj = GridAPPSD() + return agents/devices based on downstream message bus as input - request = {'downstream_message_bus_id': downstream_message_bus_id, + """ + request = {'request_type' : 'get_context', + 'downstream_message_bus_id': downstream_message_bus.id, 'agents': True, 'devices': True} + return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request) + + @classmethod + def register_agent(cls, downstream_message_bus: FieldMessageBus, upstream_message_bus: FieldMessageBus, agent): + """ + Sends the newly created distributed agent's info to OT bus - return gridappsd_obj.get_response(request_field_context_queue, request) + """ + request = {'request_type' : 'register_agent', + 'agent' : agent.get_registration_details()} + downstream_message_bus.send(t.context_request_queue(downstream_message_bus.id), request) + upstream_message_bus.send(t.context_request_queue(upstream_message_bus.id), request) + + @classmethod + def get_agents(cls, downstream_message_bus: FieldMessageBus): + """ + Sends the newly created distributed agent's info to OT bus + + """ + request = {'request_type' : 'get_agents'} + return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request) # Provide context based on router (ip trace) or PKI # Maybe able to emulate/simulate diff --git a/gridappsd/field_interface/gridappsd_field_bus.py b/gridappsd/field_interface/gridappsd_field_bus.py index 7d93f58..8cb1fa5 100644 --- a/gridappsd/field_interface/gridappsd_field_bus.py +++ b/gridappsd/field_interface/gridappsd_field_bus.py @@ -1,9 +1,10 @@ -from gridappsd.field_interface.interfaces import MessageBusDefinition -from gridappsd.field_interface.interfaces import FeederMessageBus from gridappsd import GridAPPSD +from gridappsd.field_interface.interfaces import FieldMessageBus +from gridappsd.field_interface.interfaces import MessageBusDefinition +from typing import Any -class GridAPPSDMessageBus(FeederMessageBus): +class GridAPPSDMessageBus(FieldMessageBus): def __init__(self, definition: MessageBusDefinition): super(GridAPPSDMessageBus, self).__init__(definition) self._id = definition.id @@ -36,12 +37,20 @@ def subscribe(self, topic, callback): def unsubscribe(self, topic): pass - def publish(self, data, topic: str = None): + def send(self, topic: str, message: Any): """ Publish device specific data to the concrete message bus. """ - pass - + if self.gridappsd_obj is not None: + self.gridappsd_obj.send(topic, message) + + def get_response(self, topic, message, timeout=5): + """ + Sends a message on a specific concrete queue, waits and returns the response + """ + if self.gridappsd_obj is not None: + return self.gridappsd_obj.get_response(topic, message, timeout) + def disconnect(self): """ Disconnect the device from the concrete message bus. diff --git a/gridappsd/field_interface/interfaces.py b/gridappsd/field_interface/interfaces.py index 3b776a9..8e42e66 100644 --- a/gridappsd/field_interface/interfaces.py +++ b/gridappsd/field_interface/interfaces.py @@ -3,6 +3,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum +import gridappsd.topics as t import logging from os import PathLike from pathlib import Path @@ -164,194 +165,24 @@ def unsubscribe(self, topic): pass @abstractmethod - def publish(self, data, topic: str = None): + def send(self, topic, message): """ - Publish device specific data to the concrete message bus. - """ - pass - - @abstractmethod - def disconnect(self): - """ - Disconnect the device from the concrete message bus. - """ - pass - - -class SwitchAreaMessageBus: - def __init__(self, config: MessageBusDefinition): - self._devices = dict() - self._is_ot_bus = config.is_ot_bus - self._id = config.id - - @property - def id(self): - return self._id - - @property - def is_ot_bus(self): - return self._is_ot_bus - - def add_device(self, device: "DeviceFieldInterface"): - self._devices[device.id] = device - - def disconnect_device(self, id: str): - del self._devices[id] - - @abstractmethod - def query_devices(self) -> dict: - pass - - @abstractmethod - def is_connected(self) -> bool: - """ - Is this object connected to the message bus - """ - pass - - @abstractmethod - def connect(self): - """ - Connect to the concrete message bus that implements this interface. - """ - pass - - @abstractmethod - def subscribe(self, topic, callback): - pass - - @abstractmethod - def unsubscribe(self, topic): - pass - - @abstractmethod - def publish(self, data, topic: str = None): - """ - Publish device specific data to the concrete message bus. - """ - pass - - @abstractmethod - def disconnect(self): - """ - Disconnect the device from the concrete message bus. - """ - pass - - -class SecondaryMessageBus: - def __init__(self, config: MessageBusDefinition): - self._devices = dict() - self._is_ot_bus = config.is_ot_bus - self._id = config.id - - @property - def id(self): - return self._id - - @property - def is_ot_bus(self): - return self._is_ot_bus - - def add_device(self, device: "DeviceFieldInterface"): - self._devices[device.id] = device - - def disconnect_device(self, id: str): - del self._devices[id] - - @abstractmethod - def query_devices(self) -> dict: - pass - - @abstractmethod - def is_connected(self) -> bool: - """ - Is this object connected to the message bus - """ - pass - - @abstractmethod - def connect(self): - """ - Connect to the concrete message bus that implements this interface. - """ - pass - - @abstractmethod - def subscribe(self, topic, callback): - pass - - @abstractmethod - def unsubscribe(self, topic): - pass - - @abstractmethod - def publish(self, data, topic: str = None): - """ - Publish device specific data to the concrete message bus. - """ - pass - - @abstractmethod - def disconnect(self): - """ - Disconnect the device from the concrete message bus. + Publish device specific message to the concrete message bus. """ pass - -class FeederMessageBus: - def __init__(self, config: MessageBusDefinition): - self._devices = dict() - self._is_ot_bus = config.is_ot_bus - self._id = config.id - - @property - def id(self): - return self._id - - @property - def is_ot_bus(self): - return self._is_ot_bus - - def add_device(self, device: "DeviceFieldInterface"): - self._devices[device.id] = device - - def disconnect_device(self, id: str): - del self._devices[id] - - @abstractmethod - def query_devices(self) -> dict: - pass - - @abstractmethod - def is_connected(self) -> bool: - """ - Is this object connected to the message bus - """ - pass - @abstractmethod - def connect(self): + def get_response(self, topic, message, timeout): """ - Connect to the concrete message bus that implements this interface. + Sends a message on a specific queue, waits and returns the response """ - pass - - @abstractmethod - def subscribe(self, topic, callback): - pass - - @abstractmethod - def unsubscribe(self, topic): - pass - - @abstractmethod - def publish(self, data, topic: str = None): + + def get_agent_response(self, agent_id, message, timeout): """ - Publish device specific data to the concrete message bus. + Sends a message on a specific agent's request queue, waits and returns the response """ - pass + topic = "{}.request.{}.{}".format(t.BASE_FIELD_QUEUE,self.id, agent_id) + self.get_response(topic, message, timeout) @abstractmethod def disconnect(self): @@ -361,8 +192,6 @@ def disconnect(self): pass - - class MessageBusDefinitions: def __init__( self, diff --git a/gridappsd/goss.py b/gridappsd/goss.py index 6757e67..8dff23f 100644 --- a/gridappsd/goss.py +++ b/gridappsd/goss.py @@ -356,7 +356,7 @@ def __init__(self): self._thread.start() def run_callbacks(self): - _log.info("Starting thread queue") + _log.debug("Starting thread queue") while True: cb, hdrs, msg = self._queue_callerback.get() try: diff --git a/gridappsd/topics.py b/gridappsd/topics.py index 22f3722..87bf567 100644 --- a/gridappsd/topics.py +++ b/gridappsd/topics.py @@ -44,6 +44,10 @@ FNCS_BASE_OUTPUT_TOPIC = '/topic/goss.gridappsd.simulation.output' BASE_SIMULATION_TOPIC = '/topic/goss.gridappsd.simulation' BASE_SIMULATION_LOG_TOPIC = "/topic/goss.gridappsd.simulation.log" +BASE_FIELD_TOPIC = '/topic/goss.gridappsd.field' + +BASE_FIELD_QUEUE = 'goss.gridappsd.field' +REGISTER_AGENT_QUEUE = 'goss.gridappsd.field.register.agent' BLAZEGRAPH = "/queue/goss.gridappsd.process.request.data.powergridmodel" # https://gridappsd.readthedocs.io/en/latest/using_gridappsd/index.html#querying-logs @@ -69,7 +73,6 @@ REQUEST_APP_START = ".".join((PROCESS_PREFIX, "request.app.start")) BASE_APPLICATION_HEARTBEAT = ".".join((BASE_TOPIC_PREFIX, "heartbeat")) - def platform_log_topic(): """ Utility method for getting the platform.log base topic """ @@ -171,3 +174,86 @@ def simulation_log_topic(simulation_id): """https://gridappsd.readthedocs.io/en/latest/using_gridappsd/index.html#subscribing-to-logs """ return "{}.{}".format(BASE_SIMULATION_LOG_TOPIC, simulation_id) + +def field_message_bus_topic(message_bus_id:str, app_id: str=None, agent_id: str=None): + """ Utility method for getting the publish/subscribe topic for a specific message bus. + + :param message_bus_id: + :param app_id: + :param agent_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + + return f"{BASE_FIELD_TOPIC}.{message_bus_id}.{app_id}.{agent_id}" + + +def field_message_bus_app_topic(message_bus_id, app_id=None): + """ Utility method for getting the publish/subscribe topic for a specific message bus. + + :param message_bus_id: + :param app_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.{}.{}".format(BASE_FIELD_TOPIC, message_bus_id, app_id) + +def field_message_bus_agent_topic(message_bus_id, agent_id=None): + """ Utility method for getting the publish/subscribe topic for a specific message bus. + + :param message_bus_id: + :param agent_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.{}.{}".format(BASE_FIELD_TOPIC, message_bus_id, agent_id) + +def field_agent_request_queue(message_bus_id, agent_id): + """ Utility method for getting the request topic for a specific distributed agent + + :param message_bus_id: + :param agent_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, agent_id) + +def context_request_queue(message_bus_id): + """ Utility method for getting the request topic for context manager + + :param message_bus_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + + return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, message_bus_id+'.context_manager') + +def field_output_topic(message_bus_id=None, simulation_id=None): + """ Utility method for getting the field output topic. + If message_bus_id is None, it returns topic used by centralized device interfaces to publish measurements. + If message_bus_id is not None, it returns topic used by distributed devices interfaces to publish measurements which is then subscribed by distributed agents. + + :param message_bus_id: + :param simulation_id + :return: str: Topic to receive field measurements + """ + + if simulation_id is None: + return "{}.{}".format(BASE_FIELD_TOPIC, "output") + else: + return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC,"simulation.output",simulation_id,message_bus_id) + +def field_input_topic(message_bus_id=None, simulation_id=None): + """ Utility method for getting the field input topic. + If message_bus_id is None, it returns topic used by centralized device interfaces to subscribe to control commands. + If message_bus_id is not None, it returns topic used by distributed devices interfaces to subscribe to control commands. + + :param message_bus_id: + :param simulation_id + :return: str: Topic to receive input control commands + """ + + if simulation_id is None: + return "{}.{}".format(BASE_FIELD_TOPIC, "input") + else: + return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC,"simulation.input",simulation_id,message_bus_id) diff --git a/pyproject.toml b/pyproject.toml index 0090974..ffa31bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gridappsd-python" -version = "v2.7.230209" +version = "v2.8.230413" description = "A GridAPPS-D Python Adapter" authors = [ "C. Allwardt <3979063+craig8@users.noreply.github.com>", @@ -33,10 +33,8 @@ python = ">=3.7.9,<4.0" PyYAML = "^6.0" pytz = "^2022.7" dateutils = "^0.6.7" -#gridappsd-cim-profile={url="https://github.com/GRIDAPPSD/gridappsd-cim-profile/releases/download/v0.7.20230120180831a0/gridappsd_cim_profile-0.7.20230120180831a0-py3-none-any.whl"} -#gridappsd-cim-profile = "^0.10.20230208223046a0" stomp-py = "6.0.0" -gridappsd-cim-lab = "^0.11.230209" +cim-graph = "^0.1.20230413185916a0" [tool.poetry.group.dev.dependencies] pytest = "^6.2.2"