From d3266f29613e01cb97e7424fdce41fbd5c1a7ab3 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Mon, 27 Mar 2023 11:18:43 -0700 Subject: [PATCH 1/6] implemented neighboring agent discovery and agent to agent communication --- gridappsd/field_interface/__init__.py | 2 - gridappsd/field_interface/agents/agents.py | 141 ++++++++----- gridappsd/field_interface/context.py | 49 +++-- .../field_interface/gridappsd_field_bus.py | 25 ++- gridappsd/field_interface/interfaces.py | 188 +----------------- gridappsd/goss.py | 2 +- gridappsd/topics.py | 59 +++++- 7 files changed, 209 insertions(+), 257 deletions(-) diff --git a/gridappsd/field_interface/__init__.py b/gridappsd/field_interface/__init__.py index 2181da7..e5299d7 100644 --- a/gridappsd/field_interface/__init__.py +++ b/gridappsd/field_interface/__init__.py @@ -1,9 +1,7 @@ from typing import List -from gridappsd.field_interface.context import ContextManager from gridappsd.field_interface.interfaces import MessageBusDefinition __all__: List[str] = [ - "ContextManager", "MessageBusDefinition" ] diff --git a/gridappsd/field_interface/agents/agents.py b/gridappsd/field_interface/agents/agents.py index 6436733..515520a 100644 --- a/gridappsd/field_interface/agents/agents.py +++ b/gridappsd/field_interface/agents/agents.py @@ -1,25 +1,18 @@ -from typing import Dict -import cimlab.data_profile.cimext_2022 as cim - -from abc import abstractmethod from dataclasses import dataclass, field import importlib import logging +from typing import Dict +import uuid -from gridappsd.field_interface.context import ContextManager - -from cimlab.loaders import Parameter, ConnectionParameters -from cimlab.loaders.gridappsd import GridappsdConnection, get_topology_response +from cimlab.loaders import ConnectionParameters +from cimlab.loaders import gridappsd +from cimlab.loaders.gridappsd import GridappsdConnection from cimlab.models import SwitchArea, SecondaryArea, DistributedModel +from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus from gridappsd.field_interface.interfaces import MessageBusDefinition - -import cimlab.data_profile.cimext_2022 as cim -from cimlab.loaders import Parameter, ConnectionParameters -from cimlab.loaders import gridappsd -from cimlab.loaders.gridappsd import GridappsdConnection, get_topology_response -from cimlab.models import SwitchArea, SecondaryArea, DistributedModel +import gridappsd.topics as t cim = None @@ -40,7 +33,8 @@ class DistributedAgent: def __init__(self, upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - agent_dict=None, + agent_config, + agent_area_dict=None, simulation_id=None, cim_profile: str = None): """ @@ -52,8 +46,16 @@ def __init__(self, self.downstream_message_bus = None self.simulation_id = simulation_id self.context = None + + #TODO: Change params and connection to local connection self.params = ConnectionParameters() self.connection = GridappsdConnection(self.params) + + self.app_id = agent_config['app_id'] + self.description = agent_config['description'] + self.agent_id = str(uuid.uuid4()) + self.agent_area_dict = agent_area_dict + if upstream_message_bus_def is not None: if upstream_message_bus_def.is_ot_bus: @@ -73,36 +75,88 @@ def __init__(self, # self.addressable_equipments = agent_dict['addressable_equipment'] # self.unaddressable_equipments = agent_dict['unaddressable_equipment'] - @classmethod - def from_feeder(cls, feeder_id, area_id): - context = ContextManager.get_context_by_feeder(feeder_id, area_id) - return cls(context.get('upstream_message_bus_def'), context.get('downstream_message_bus_def')) - def connect(self): + + if self.agent_area_dict is None: + context = LocalContext.get_context_by_message_bus(self.downstream_message_bus) + self.agent_area_dict = context['data'] + if self.upstream_message_bus is not None: self.upstream_message_bus.connect() if self.downstream_message_bus is not None: self.downstream_message_bus.connect() if self.downstream_message_bus is None and self.upstream_message_bus is None: raise ValueError("Either upstream or downstream bus must be specified!") + self.subscribe_to_measurement() + self.subscribe_to_messages() + self.subscribe_to_requests() + + if('context_manager' not in self.app_id): + LocalContext.register_agent(self.downstream_message_bus, self.upstream_message_bus,self) def subscribe_to_measurement(self): if self.simulation_id is None: - self.downstream_message_bus.subscribe(f"fieldbus/{self.downstream_message_bus.id}", self.on_measurement) + self.downstream_message_bus.subscribe(f"/topic/goss.gridappsd.field.output.{self.downstream_message_bus.id}", self.on_measurement) else: topic = f"/topic/goss.gridappsd.field.simulation.output.{self.simulation_id}.{self.downstream_message_bus.id}" - _log.debug(f"subscribing to sim_output on topic {topic}") + _log.debug(f"subscribing to simulation output on topic {topic}") self.downstream_message_bus.subscribe(topic, self.on_simulation_output) + + def subscribe_to_messages(self): + + + self.downstream_message_bus.subscribe(t.field_message_bus_topic(self.downstream_message_bus), self.on_downstream_message) + self.downstream_message_bus.subscribe(t.field_message_bus_topic(self.upstream_message_bus), self.on_upstream_message) + + _log.debug(f"Subscribing to messages on application topics: \n {t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id)} \ + \n {t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id)}") + self.downstream_message_bus.subscribe(t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id), self.on_downstream_message) + self.downstream_message_bus.subscribe(t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id), self.on_upstream_message) + + _log.debug(f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)}") + self.downstream_message_bus.subscribe(t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id), self.on_downstream_message) + self.downstream_message_bus.subscribe(t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id), self.on_upstream_message) + + def subscribe_to_requests(self): + + _log.debug(f"Subscribing to requests on agents queue: \n {t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id)}") + self.downstream_message_bus.subscribe(t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id), self.on_request_from_downstream) + self.downstream_message_bus.subscribe(t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id), self.on_request_from_uptream) def on_measurement(self, headers: Dict, message) -> None: raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") def on_simulation_output(self, headers, message): self.on_measurement(headers=headers, message=message) - - + + def on_upstream_message(self, headers: Dict, message) -> None: + raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") + + def on_downstream_message(self, headers: Dict, message) -> None: + raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") + + def on_request_from_uptream(self, headers: Dict, message): + self.on_request(self.upstream_message_bus, headers, message) + + def on_request_from_downstream(self, headers: Dict, message): + self.on_request(self.downstream_message_bus, headers, message) + + def on_request(self, message_bus, headers:Dict, message): + raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") + + def get_registration_details(self): + return {'agent_id':str(self.agent_id), + 'app_id':self.app_id, + 'description':self.description, + 'upstream_message_bus_id':self.upstream_message_bus.id, + 'downstream_message_bus_id':self.downstream_message_bus.id + } + + ''' TODO this has not been implemented yet, so we are commented them out for now. # not all agent would use this def on_control(self, control): @@ -110,20 +164,6 @@ def on_control(self, control): command = control.get('command') self.control_device(device_id, command) - def publish_to_upstream_bus(self,output): - self.switch_message_bus.publish(self.output_topic, output) - - # could be and upstream or peer level agent - def publish_to_upstream_bus_agent(self,agent_id, output): - self.switch_message_bus.publish(self.topic.agent_id, output) - - def publish_to_downstream_bus(self,message): - self.secondary_message_bus.publish(self.topic, message) - - # downstream agent - def publish_to_downstream_bus_agent(self,agent_id, message): - self.secondary_message_bus.publish(self.topic.agent_id, message) - def control_device(self, device_id, command): device_topic = self.devices.get(device_id) self.secondary_message_bus.publish(device_topic, command)''' @@ -133,14 +173,15 @@ class FeederAgent(DistributedAgent): def __init__(self, upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, feeder_dict=None, simulation_id=None): super(FeederAgent, self).__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, feeder_dict, simulation_id) if feeder_dict is not None: feeder = cim.Feeder(mRID=downstream_message_bus_def.id) - self.feeder_area = DistributedModel(connection=self.connection, feeder=feeder, topology=feeder_dict) @@ -148,10 +189,12 @@ class SwitchAreaAgent(DistributedAgent): def __init__(self, upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, switch_area_dict=None, simulation_id=None): super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, switch_area_dict, simulation_id) if switch_area_dict is not None: @@ -163,10 +206,12 @@ class SecondaryAreaAgent(DistributedAgent): def __init__(self, upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, secondary_area_dict=None, simulation_id=None): super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, secondary_area_dict, simulation_id) if secondary_area_dict is not None: @@ -191,6 +236,10 @@ def __init__(self, feeder_id, system_message_bus_def: MessageBusDefinition, simu self.system_message_bus = GridAPPSDMessageBus(system_message_bus_def) self.system_message_bus.connect() + + #This will change when we have multiple feeders per system + self.downstream_message_bus = self.system_message_bus + # self.context = ContextManager.getContextByFeeder(self.feeder_id) # print(self.context) # self.addressable_equipments = self.context['data']['addressable_equipment'] @@ -205,20 +254,6 @@ def spawn_distributed_agent(self, distributed_agent: DistributedAgent): ''' - def on_message_from_feeder_bus(self, message): - pass - - def subscribe_to_distribution_bus(self, topic): - #self.system_message_bus.subscribe("/topic/goss.gridappsd.field."+self.feeder_id, - self.on_message_from_feeder_bus) - self.system_message_bus.subscribe(topic, self.on_message_from_feeder_bus) - - def subscribe_to_feeder_bus(self, topic): - self.system_message_bus.subscribe(topic, self.on_message_from_feeder_bus) - - def on_measurement(self, measurements): - print(measurements) - def on_control(self, control): device_id = control.get('device') command = control.get('command') diff --git a/gridappsd/field_interface/context.py b/gridappsd/field_interface/context.py index ae1c882..41214ec 100644 --- a/gridappsd/field_interface/context.py +++ b/gridappsd/field_interface/context.py @@ -1,36 +1,53 @@ -from gridappsd import GridAPPSD +from gridappsd.field_interface.interfaces import FieldMessageBus +import gridappsd.topics as t -request_field_queue_prefix = 'goss.gridappsd.process.request.field' -request_field_context_queue = request_field_queue_prefix + '.context' -class ContextManager: - +class LocalContext: + @classmethod - def get_context_by_feeder(cls, feeder_mrid, area_id=None): - gridappsd_obj = GridAPPSD() - - request = {'modelId': feeder_mrid, + def get_context_by_feeder(cls, downstream_message_bus: FieldMessageBus, feeder_mrid, area_id=None): + + request = {'request_type' : 'get_context', + 'modelId': feeder_mrid, 'areaId': area_id} - - response = gridappsd_obj.get_response(request_field_context_queue, request) + response = downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request, timeout=10) return response @classmethod - def get_context_by_message_bus(cls, downstream_message_bus_id): + def get_context_by_message_bus(cls, downstream_message_bus: FieldMessageBus): """ return agents/devices based on upstream and/or downstream message bus as input make message bus id a list based on filter return distributed agents for different applications as well """ - gridappsd_obj = GridAPPSD() - - request = {'downstream_message_bus_id': downstream_message_bus_id, + + request = {'request_type' : 'get_context', + 'downstream_message_bus_id': downstream_message_bus.id, 'agents': True, 'devices': True} + return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request) + + @classmethod + def register_agent(cls, downstream_message_bus: FieldMessageBus, upstream_message_bus: FieldMessageBus, agent): + """ + Sends the newly created distributed agent's info to OT bus + + """ + request = {'request_type' : 'register_agent', + 'agent' : agent.get_registration_details()} + downstream_message_bus.send(t.context_request_queue(downstream_message_bus.id), request) + upstream_message_bus.send(t.context_request_queue(upstream_message_bus.id), request) + + @classmethod + def get_agents(cls, downstream_message_bus: FieldMessageBus): + """ + Sends the newly created distributed agent's info to OT bus - return gridappsd_obj.get_response(request_field_context_queue, request) + """ + request = {'request_type' : 'get_agents'} + return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request) # Provide context based on router (ip trace) or PKI # Maybe able to emulate/simulate diff --git a/gridappsd/field_interface/gridappsd_field_bus.py b/gridappsd/field_interface/gridappsd_field_bus.py index 7d93f58..350af6c 100644 --- a/gridappsd/field_interface/gridappsd_field_bus.py +++ b/gridappsd/field_interface/gridappsd_field_bus.py @@ -1,9 +1,9 @@ -from gridappsd.field_interface.interfaces import MessageBusDefinition -from gridappsd.field_interface.interfaces import FeederMessageBus from gridappsd import GridAPPSD +from gridappsd.field_interface.interfaces import FieldMessageBus +from gridappsd.field_interface.interfaces import MessageBusDefinition -class GridAPPSDMessageBus(FeederMessageBus): +class GridAPPSDMessageBus(FieldMessageBus): def __init__(self, definition: MessageBusDefinition): super(GridAPPSDMessageBus, self).__init__(definition) self._id = definition.id @@ -36,11 +36,26 @@ def subscribe(self, topic, callback): def unsubscribe(self, topic): pass - def publish(self, data, topic: str = None): + def send(self, topic, message): """ Publish device specific data to the concrete message bus. """ - pass + if self.gridappsd_obj is not None: + self.gridappsd_obj.send(topic, message) + + def get_response(self, topic, message, timeout=5): + """ + Sends a message on a specific concrete queue, waits and returns the response + """ + if self.gridappsd_obj is not None: + return self.gridappsd_obj.get_response(topic, message, timeout) + + def get_agent_response(self, topic, message, timeout=5): + """ + Sends a message on a specific concrete queue, waits and returns the response + """ + if self.gridappsd_obj is not None: + return self.gridappsd_obj.get_response(topic, message, timeout) def disconnect(self): """ diff --git a/gridappsd/field_interface/interfaces.py b/gridappsd/field_interface/interfaces.py index 3b776a9..32a370f 100644 --- a/gridappsd/field_interface/interfaces.py +++ b/gridappsd/field_interface/interfaces.py @@ -3,6 +3,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum +import gridappsd.topics as t import logging from os import PathLike from pathlib import Path @@ -164,194 +165,25 @@ def unsubscribe(self, topic): pass @abstractmethod - def publish(self, data, topic: str = None): + def send(self, topic, data): """ Publish device specific data to the concrete message bus. """ pass - - @abstractmethod - def disconnect(self): - """ - Disconnect the device from the concrete message bus. - """ - pass - - -class SwitchAreaMessageBus: - def __init__(self, config: MessageBusDefinition): - self._devices = dict() - self._is_ot_bus = config.is_ot_bus - self._id = config.id - - @property - def id(self): - return self._id - - @property - def is_ot_bus(self): - return self._is_ot_bus - - def add_device(self, device: "DeviceFieldInterface"): - self._devices[device.id] = device - - def disconnect_device(self, id: str): - del self._devices[id] - - @abstractmethod - def query_devices(self) -> dict: - pass - - @abstractmethod - def is_connected(self) -> bool: - """ - Is this object connected to the message bus - """ - pass - - @abstractmethod - def connect(self): - """ - Connect to the concrete message bus that implements this interface. - """ - pass - - @abstractmethod - def subscribe(self, topic, callback): - pass - - @abstractmethod - def unsubscribe(self, topic): - pass - - @abstractmethod - def publish(self, data, topic: str = None): - """ - Publish device specific data to the concrete message bus. - """ - pass - - @abstractmethod - def disconnect(self): - """ - Disconnect the device from the concrete message bus. - """ - pass - - -class SecondaryMessageBus: - def __init__(self, config: MessageBusDefinition): - self._devices = dict() - self._is_ot_bus = config.is_ot_bus - self._id = config.id - - @property - def id(self): - return self._id - - @property - def is_ot_bus(self): - return self._is_ot_bus - - def add_device(self, device: "DeviceFieldInterface"): - self._devices[device.id] = device - - def disconnect_device(self, id: str): - del self._devices[id] - - @abstractmethod - def query_devices(self) -> dict: - pass - - @abstractmethod - def is_connected(self) -> bool: - """ - Is this object connected to the message bus - """ - pass - - @abstractmethod - def connect(self): - """ - Connect to the concrete message bus that implements this interface. - """ - pass - - @abstractmethod - def subscribe(self, topic, callback): - pass - - @abstractmethod - def unsubscribe(self, topic): - pass - - @abstractmethod - def publish(self, data, topic: str = None): - """ - Publish device specific data to the concrete message bus. - """ - pass - - @abstractmethod - def disconnect(self): - """ - Disconnect the device from the concrete message bus. - """ - pass - -class FeederMessageBus: - def __init__(self, config: MessageBusDefinition): - self._devices = dict() - self._is_ot_bus = config.is_ot_bus - self._id = config.id - - @property - def id(self): - return self._id - - @property - def is_ot_bus(self): - return self._is_ot_bus - - def add_device(self, device: "DeviceFieldInterface"): - self._devices[device.id] = device - - def disconnect_device(self, id: str): - del self._devices[id] - @abstractmethod - def query_devices(self) -> dict: - pass - - @abstractmethod - def is_connected(self) -> bool: + def get_response(self, topic, message, timeout): """ - Is this object connected to the message bus + Sends a message on a specific queue, waits and returns the response """ - pass - - @abstractmethod - def connect(self): - """ - Connect to the concrete message bus that implements this interface. - """ - pass - - @abstractmethod - def subscribe(self, topic, callback): - pass - - @abstractmethod - def unsubscribe(self, topic): - pass - + @abstractmethod - def publish(self, data, topic: str = None): + def get_agent_response(self, agent_id, message, timeout): """ - Publish device specific data to the concrete message bus. + Sends a message on a specific agent's request queue, waits and returns the response """ - pass + topic = "{}.request.{}.{}".format(t.BASE_FIELD_QUEUE,self.id, agent_id) + self.get_response(topic, message, timeout) @abstractmethod def disconnect(self): @@ -361,8 +193,6 @@ def disconnect(self): pass - - class MessageBusDefinitions: def __init__( self, diff --git a/gridappsd/goss.py b/gridappsd/goss.py index 6757e67..8dff23f 100644 --- a/gridappsd/goss.py +++ b/gridappsd/goss.py @@ -356,7 +356,7 @@ def __init__(self): self._thread.start() def run_callbacks(self): - _log.info("Starting thread queue") + _log.debug("Starting thread queue") while True: cb, hdrs, msg = self._queue_callerback.get() try: diff --git a/gridappsd/topics.py b/gridappsd/topics.py index 22f3722..adf969e 100644 --- a/gridappsd/topics.py +++ b/gridappsd/topics.py @@ -44,6 +44,10 @@ FNCS_BASE_OUTPUT_TOPIC = '/topic/goss.gridappsd.simulation.output' BASE_SIMULATION_TOPIC = '/topic/goss.gridappsd.simulation' BASE_SIMULATION_LOG_TOPIC = "/topic/goss.gridappsd.simulation.log" +BASE_FIELD_TOPIC = '/topic/goss.gridappsd.field' + +BASE_FIELD_QUEUE = 'goss.gridappsd.field' +REGISTER_AGENT_QUEUE = 'goss.gridappsd.field.register.agent' BLAZEGRAPH = "/queue/goss.gridappsd.process.request.data.powergridmodel" # https://gridappsd.readthedocs.io/en/latest/using_gridappsd/index.html#querying-logs @@ -69,7 +73,6 @@ REQUEST_APP_START = ".".join((PROCESS_PREFIX, "request.app.start")) BASE_APPLICATION_HEARTBEAT = ".".join((BASE_TOPIC_PREFIX, "heartbeat")) - def platform_log_topic(): """ Utility method for getting the platform.log base topic """ @@ -171,3 +174,57 @@ def simulation_log_topic(simulation_id): """https://gridappsd.readthedocs.io/en/latest/using_gridappsd/index.html#subscribing-to-logs """ return "{}.{}".format(BASE_SIMULATION_LOG_TOPIC, simulation_id) + +def field_message_bus_topic(message_bus_id, app_id=None, agent_id=None): + """ Utility method for getting the publish/subscribe topic for a specific message bus. + + :param message_bus_id: + :param app_id: + :param agent_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC, message_bus_id, app_id, agent_id) + +def field_message_bus_app_topic(message_bus_id, app_id=None): + """ Utility method for getting the publish/subscribe topic for a specific message bus. + + :param message_bus_id: + :param app_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.{}.{}".format(BASE_FIELD_TOPIC, message_bus_id, app_id) + +def field_message_bus_agent_topic(message_bus_id, agent_id=None): + """ Utility method for getting the publish/subscribe topic for a specific message bus. + + :param message_bus_id: + :param agent_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.{}.{}".format(BASE_FIELD_TOPIC, message_bus_id, agent_id) + +def field_agent_request_queue(message_bus_id, agent_id): + """ Utility method for getting the request topic for a specific distributed agent + + :param message_bus_id: + :param agent_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, agent_id) + + + +def context_request_queue(message_bus_id): + """ Utility method for getting the request topic for context manager + + :param message_bus_id: + :return: + """ + assert message_bus_id, "message_bus_id cannot be empty" + + return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, message_bus_id+'.context_manager') + From fa80dfe38b278ac2bf0715c4d65c7f86e352d066 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Fri, 7 Apr 2023 15:47:51 -0700 Subject: [PATCH 2/6] resolved issues on the pull request --- gridappsd/field_interface/__init__.py | 2 + gridappsd/field_interface/agents/agents.py | 38 +++++++++++++------ gridappsd/field_interface/context.py | 9 ++--- .../field_interface/gridappsd_field_bus.py | 10 +---- gridappsd/field_interface/interfaces.py | 5 +-- gridappsd/topics.py | 20 ++++++++-- 6 files changed, 52 insertions(+), 32 deletions(-) diff --git a/gridappsd/field_interface/__init__.py b/gridappsd/field_interface/__init__.py index e5299d7..66eba6b 100644 --- a/gridappsd/field_interface/__init__.py +++ b/gridappsd/field_interface/__init__.py @@ -1,7 +1,9 @@ from typing import List +from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.interfaces import MessageBusDefinition __all__: List[str] = [ + "LocalContext", "MessageBusDefinition" ] diff --git a/gridappsd/field_interface/agents/agents.py b/gridappsd/field_interface/agents/agents.py index 515520a..45148c0 100644 --- a/gridappsd/field_interface/agents/agents.py +++ b/gridappsd/field_interface/agents/agents.py @@ -1,8 +1,10 @@ +import dataclasses from dataclasses import dataclass, field +from datetime import datetime import importlib +import json import logging from typing import Dict -import uuid from cimlab.loaders import ConnectionParameters from cimlab.loaders import gridappsd @@ -11,7 +13,8 @@ from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus -from gridappsd.field_interface.interfaces import MessageBusDefinition +from gridappsd.field_interface.interfaces import MessageBusDefinition,\ + FieldMessageBus import gridappsd.topics as t @@ -25,6 +28,15 @@ def set_cim_profile(cim_profile): global cim cim = importlib.import_module('cimlab.data_profile.' + cim_profile) gridappsd.set_cim_profile(cim_profile) + + +@dataclass +class AgentRegistrationDetails: + agent_id:str + app_id: str + description: str + upstream_message_bus_id: FieldMessageBus.id + downstream_message_bus_id: FieldMessageBus.id class DistributedAgent: @@ -53,7 +65,9 @@ def __init__(self, self.app_id = agent_config['app_id'] self.description = agent_config['description'] - self.agent_id = str(uuid.uuid4()) + dt = datetime.now() + ts = datetime.timestamp(dt) + self.agent_id = "da_"+self.app_id+"_"+str(int(ts)) self.agent_area_dict = agent_area_dict @@ -97,9 +111,9 @@ def connect(self): def subscribe_to_measurement(self): if self.simulation_id is None: - self.downstream_message_bus.subscribe(f"/topic/goss.gridappsd.field.output.{self.downstream_message_bus.id}", self.on_measurement) + self.downstream_message_bus.subscribe(t.field_output_topic(self.downstream_message_bus.id), self.on_measurement) else: - topic = f"/topic/goss.gridappsd.field.simulation.output.{self.simulation_id}.{self.downstream_message_bus.id}" + topic = t.field_output_topic(self.downstream_message_bus.id, self.simulation_id) _log.debug(f"subscribing to simulation output on topic {topic}") self.downstream_message_bus.subscribe(topic, self.on_simulation_output) @@ -116,7 +130,7 @@ def subscribe_to_messages(self): self.downstream_message_bus.subscribe(t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id), self.on_upstream_message) _log.debug(f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ - \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)}") + \n {t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id)}") self.downstream_message_bus.subscribe(t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id), self.on_downstream_message) self.downstream_message_bus.subscribe(t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id), self.on_upstream_message) @@ -149,12 +163,12 @@ def on_request(self, message_bus, headers:Dict, message): raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") def get_registration_details(self): - return {'agent_id':str(self.agent_id), - 'app_id':self.app_id, - 'description':self.description, - 'upstream_message_bus_id':self.upstream_message_bus.id, - 'downstream_message_bus_id':self.downstream_message_bus.id - } + details = AgentRegistrationDetails(str(self.agent_id), + self.app_id, + self.description, + self.upstream_message_bus.id, + self.downstream_message_bus.id) + return dataclasses.asdict(details) ''' TODO this has not been implemented yet, so we are commented them out for now. diff --git a/gridappsd/field_interface/context.py b/gridappsd/field_interface/context.py index 41214ec..d6fa6c9 100644 --- a/gridappsd/field_interface/context.py +++ b/gridappsd/field_interface/context.py @@ -1,5 +1,7 @@ from gridappsd.field_interface.interfaces import FieldMessageBus +import dataclasses import gridappsd.topics as t +import json @@ -17,12 +19,9 @@ def get_context_by_feeder(cls, downstream_message_bus: FieldMessageBus, feeder_m @classmethod def get_context_by_message_bus(cls, downstream_message_bus: FieldMessageBus): """ - return agents/devices based on upstream and/or downstream message bus as input - make message bus id a list - - based on filter return distributed agents for different applications as well + return agents/devices based on downstream message bus as input + """ - request = {'request_type' : 'get_context', 'downstream_message_bus_id': downstream_message_bus.id, 'agents': True, diff --git a/gridappsd/field_interface/gridappsd_field_bus.py b/gridappsd/field_interface/gridappsd_field_bus.py index 350af6c..8cb1fa5 100644 --- a/gridappsd/field_interface/gridappsd_field_bus.py +++ b/gridappsd/field_interface/gridappsd_field_bus.py @@ -1,6 +1,7 @@ from gridappsd import GridAPPSD from gridappsd.field_interface.interfaces import FieldMessageBus from gridappsd.field_interface.interfaces import MessageBusDefinition +from typing import Any class GridAPPSDMessageBus(FieldMessageBus): @@ -36,7 +37,7 @@ def subscribe(self, topic, callback): def unsubscribe(self, topic): pass - def send(self, topic, message): + def send(self, topic: str, message: Any): """ Publish device specific data to the concrete message bus. """ @@ -50,13 +51,6 @@ def get_response(self, topic, message, timeout=5): if self.gridappsd_obj is not None: return self.gridappsd_obj.get_response(topic, message, timeout) - def get_agent_response(self, topic, message, timeout=5): - """ - Sends a message on a specific concrete queue, waits and returns the response - """ - if self.gridappsd_obj is not None: - return self.gridappsd_obj.get_response(topic, message, timeout) - def disconnect(self): """ Disconnect the device from the concrete message bus. diff --git a/gridappsd/field_interface/interfaces.py b/gridappsd/field_interface/interfaces.py index 32a370f..8e42e66 100644 --- a/gridappsd/field_interface/interfaces.py +++ b/gridappsd/field_interface/interfaces.py @@ -165,9 +165,9 @@ def unsubscribe(self, topic): pass @abstractmethod - def send(self, topic, data): + def send(self, topic, message): """ - Publish device specific data to the concrete message bus. + Publish device specific message to the concrete message bus. """ pass @@ -177,7 +177,6 @@ def get_response(self, topic, message, timeout): Sends a message on a specific queue, waits and returns the response """ - @abstractmethod def get_agent_response(self, agent_id, message, timeout): """ Sends a message on a specific agent's request queue, waits and returns the response diff --git a/gridappsd/topics.py b/gridappsd/topics.py index adf969e..043be8b 100644 --- a/gridappsd/topics.py +++ b/gridappsd/topics.py @@ -175,7 +175,7 @@ def simulation_log_topic(simulation_id): """ return "{}.{}".format(BASE_SIMULATION_LOG_TOPIC, simulation_id) -def field_message_bus_topic(message_bus_id, app_id=None, agent_id=None): +def field_message_bus_topic(message_bus_id:str, app_id: str=None, agent_id: str=None): """ Utility method for getting the publish/subscribe topic for a specific message bus. :param message_bus_id: @@ -184,7 +184,9 @@ def field_message_bus_topic(message_bus_id, app_id=None, agent_id=None): :return: """ assert message_bus_id, "message_bus_id cannot be empty" - return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC, message_bus_id, app_id, agent_id) + + return f"{BASE_FIELD_TOPIC}.{message_bus_id}.{app_id}.{agent_id}" + def field_message_bus_app_topic(message_bus_id, app_id=None): """ Utility method for getting the publish/subscribe topic for a specific message bus. @@ -216,8 +218,6 @@ def field_agent_request_queue(message_bus_id, agent_id): assert message_bus_id, "message_bus_id cannot be empty" return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, agent_id) - - def context_request_queue(message_bus_id): """ Utility method for getting the request topic for context manager @@ -228,3 +228,15 @@ def context_request_queue(message_bus_id): return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, message_bus_id+'.context_manager') +def field_output_topic(message_bus_id, simulation_id=None): + """ Utility method for getting the field output topic for distributed agents to receive measurements on the given message bus. + + :param message_bus_id: + :return: str: Topic to receive field measurements + """ + assert message_bus_id, "message_bus_id cannot be empty" + + if simulation_id is None: + return "{}.{}".format(BASE_FIELD_TOPIC, "output") + else: + return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC,"simulation.output",simulation_id,message_bus_id) From 17d1674c6a69e616662347fd3bd05f3bbcc9da93 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Wed, 12 Apr 2023 13:47:05 -0700 Subject: [PATCH 3/6] added field topic for centralized services --- gridappsd/topics.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/gridappsd/topics.py b/gridappsd/topics.py index 043be8b..87bf567 100644 --- a/gridappsd/topics.py +++ b/gridappsd/topics.py @@ -228,15 +228,32 @@ def context_request_queue(message_bus_id): return "{}.request.{}.{}".format(BASE_FIELD_QUEUE, message_bus_id, message_bus_id+'.context_manager') -def field_output_topic(message_bus_id, simulation_id=None): - """ Utility method for getting the field output topic for distributed agents to receive measurements on the given message bus. - +def field_output_topic(message_bus_id=None, simulation_id=None): + """ Utility method for getting the field output topic. + If message_bus_id is None, it returns topic used by centralized device interfaces to publish measurements. + If message_bus_id is not None, it returns topic used by distributed devices interfaces to publish measurements which is then subscribed by distributed agents. + :param message_bus_id: + :param simulation_id :return: str: Topic to receive field measurements """ - assert message_bus_id, "message_bus_id cannot be empty" if simulation_id is None: return "{}.{}".format(BASE_FIELD_TOPIC, "output") else: return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC,"simulation.output",simulation_id,message_bus_id) + +def field_input_topic(message_bus_id=None, simulation_id=None): + """ Utility method for getting the field input topic. + If message_bus_id is None, it returns topic used by centralized device interfaces to subscribe to control commands. + If message_bus_id is not None, it returns topic used by distributed devices interfaces to subscribe to control commands. + + :param message_bus_id: + :param simulation_id + :return: str: Topic to receive input control commands + """ + + if simulation_id is None: + return "{}.{}".format(BASE_FIELD_TOPIC, "input") + else: + return "{}.{}.{}.{}".format(BASE_FIELD_TOPIC,"simulation.input",simulation_id,message_bus_id) From 3ddf117ed1df80c8ec04c9f7dfcdd70809cd1699 Mon Sep 17 00:00:00 2001 From: C <3979063+craig8@users.noreply.github.com> Date: Thu, 13 Apr 2023 11:46:56 -0700 Subject: [PATCH 4/6] Update to use cimgraph Bump Versions --- gridappsd/field_interface/agents/agents.py | 255 ++++++++++++--------- pyproject.toml | 6 +- 2 files changed, 151 insertions(+), 110 deletions(-) diff --git a/gridappsd/field_interface/agents/agents.py b/gridappsd/field_interface/agents/agents.py index 45148c0..f99fad0 100644 --- a/gridappsd/field_interface/agents/agents.py +++ b/gridappsd/field_interface/agents/agents.py @@ -1,46 +1,43 @@ import dataclasses -from dataclasses import dataclass, field -from datetime import datetime import importlib import json import logging +from dataclasses import dataclass, field +from datetime import datetime from typing import Dict -from cimlab.loaders import ConnectionParameters -from cimlab.loaders import gridappsd -from cimlab.loaders.gridappsd import GridappsdConnection -from cimlab.models import SwitchArea, SecondaryArea, DistributedModel - +from cimgraph.loaders import ConnectionParameters, gridappsd +from cimgraph.models import DistributedModel, SecondaryArea, SwitchArea +from cimgraph.loaders.gridappsd import GridappsdConnection +cimgraph. +import gridappsd.topics as t from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus -from gridappsd.field_interface.interfaces import MessageBusDefinition,\ - FieldMessageBus -import gridappsd.topics as t - +from gridappsd.field_interface.interfaces import (FieldMessageBus, + MessageBusDefinition) cim = None sparql = None - _log = logging.getLogger(__name__) + def set_cim_profile(cim_profile): global cim - cim = importlib.import_module('cimlab.data_profile.' + cim_profile) + cim = importlib.import_module('cimgraph.data_profile.' + cim_profile) gridappsd.set_cim_profile(cim_profile) - + @dataclass class AgentRegistrationDetails: - agent_id:str + agent_id: str app_id: str description: str upstream_message_bus_id: FieldMessageBus.id - downstream_message_bus_id: FieldMessageBus.id + downstream_message_bus_id: FieldMessageBus.id - -class DistributedAgent: +class DistributedAgent: def __init__(self, upstream_message_bus_def: MessageBusDefinition, @@ -58,28 +55,29 @@ def __init__(self, self.downstream_message_bus = None self.simulation_id = simulation_id self.context = None - - #TODO: Change params and connection to local connection + + #TODO: Change params and connection to local connection self.params = ConnectionParameters() self.connection = GridappsdConnection(self.params) - + self.app_id = agent_config['app_id'] self.description = agent_config['description'] dt = datetime.now() ts = datetime.timestamp(dt) - self.agent_id = "da_"+self.app_id+"_"+str(int(ts)) + self.agent_id = "da_" + self.app_id + "_" + str(int(ts)) self.agent_area_dict = agent_area_dict - if upstream_message_bus_def is not None: if upstream_message_bus_def.is_ot_bus: - self.upstream_message_bus = GridAPPSDMessageBus(upstream_message_bus_def) + self.upstream_message_bus = GridAPPSDMessageBus( + upstream_message_bus_def) # else: # self.upstream_message_bus = VolttronMessageBus(upstream_message_bus_def) if downstream_message_bus_def is not None: if downstream_message_bus_def.is_ot_bus: - self.downstream_message_bus = GridAPPSDMessageBus(downstream_message_bus_def) + self.downstream_message_bus = GridAPPSDMessageBus( + downstream_message_bus_def) # else: # self.downstream_message_bus = VolttronMessageBus(downstream_message_bus_def) @@ -90,87 +88,123 @@ def __init__(self, # self.unaddressable_equipments = agent_dict['unaddressable_equipment'] def connect(self): - + if self.agent_area_dict is None: - context = LocalContext.get_context_by_message_bus(self.downstream_message_bus) + context = LocalContext.get_context_by_message_bus( + self.downstream_message_bus) self.agent_area_dict = context['data'] - + if self.upstream_message_bus is not None: self.upstream_message_bus.connect() if self.downstream_message_bus is not None: self.downstream_message_bus.connect() if self.downstream_message_bus is None and self.upstream_message_bus is None: - raise ValueError("Either upstream or downstream bus must be specified!") - + raise ValueError( + "Either upstream or downstream bus must be specified!") + self.subscribe_to_measurement() self.subscribe_to_messages() self.subscribe_to_requests() - - if('context_manager' not in self.app_id): - LocalContext.register_agent(self.downstream_message_bus, self.upstream_message_bus,self) + + if ('context_manager' not in self.app_id): + LocalContext.register_agent(self.downstream_message_bus, + self.upstream_message_bus, self) def subscribe_to_measurement(self): if self.simulation_id is None: - self.downstream_message_bus.subscribe(t.field_output_topic(self.downstream_message_bus.id), self.on_measurement) + self.downstream_message_bus.subscribe( + t.field_output_topic(self.downstream_message_bus.id), + self.on_measurement) else: - topic = t.field_output_topic(self.downstream_message_bus.id, self.simulation_id) + topic = t.field_output_topic(self.downstream_message_bus.id, + self.simulation_id) _log.debug(f"subscribing to simulation output on topic {topic}") self.downstream_message_bus.subscribe(topic, self.on_simulation_output) - + def subscribe_to_messages(self): - - - self.downstream_message_bus.subscribe(t.field_message_bus_topic(self.downstream_message_bus), self.on_downstream_message) - self.downstream_message_bus.subscribe(t.field_message_bus_topic(self.upstream_message_bus), self.on_upstream_message) - - _log.debug(f"Subscribing to messages on application topics: \n {t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id)} \ - \n {t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id)}") - self.downstream_message_bus.subscribe(t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id), self.on_downstream_message) - self.downstream_message_bus.subscribe(t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id), self.on_upstream_message) - - _log.debug(f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ - \n {t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id)}") - self.downstream_message_bus.subscribe(t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id), self.on_downstream_message) - self.downstream_message_bus.subscribe(t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id), self.on_upstream_message) - + + self.downstream_message_bus.subscribe( + t.field_message_bus_topic(self.downstream_message_bus), + self.on_downstream_message) + self.downstream_message_bus.subscribe( + t.field_message_bus_topic(self.upstream_message_bus), + self.on_upstream_message) + + _log.debug( + f"Subscribing to messages on application topics: \n {t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id)} \ + \n {t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_message_bus_app_topic(self.downstream_message_bus.id, + self.app_id), + self.on_downstream_message) + self.downstream_message_bus.subscribe( + t.field_message_bus_app_topic(self.upstream_message_bus.id, + self.app_id), + self.on_upstream_message) + + _log.debug( + f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_message_bus_agent_topic(self.downstream_message_bus.id, + self.agent_id), + self.on_downstream_message) + self.downstream_message_bus.subscribe( + t.field_message_bus_agent_topic(self.upstream_message_bus.id, + self.agent_id), + self.on_upstream_message) + def subscribe_to_requests(self): - - _log.debug(f"Subscribing to requests on agents queue: \n {t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id)} \ - \n {t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id)}") - self.downstream_message_bus.subscribe(t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id), self.on_request_from_downstream) - self.downstream_message_bus.subscribe(t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id), self.on_request_from_uptream) + + _log.debug( + f"Subscribing to requests on agents queue: \n {t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_agent_request_queue(self.downstream_message_bus.id, + self.agent_id), + self.on_request_from_downstream) + self.downstream_message_bus.subscribe( + t.field_agent_request_queue(self.upstream_message_bus.id, + self.agent_id), + self.on_request_from_uptream) def on_measurement(self, headers: Dict, message) -> None: - raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") def on_simulation_output(self, headers, message): self.on_measurement(headers=headers, message=message) - + def on_upstream_message(self, headers: Dict, message) -> None: - raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") - + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") + def on_downstream_message(self, headers: Dict, message) -> None: - raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") - + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") + def on_request_from_uptream(self, headers: Dict, message): self.on_request(self.upstream_message_bus, headers, message) - + def on_request_from_downstream(self, headers: Dict, message): self.on_request(self.downstream_message_bus, headers, message) - - def on_request(self, message_bus, headers:Dict, message): - raise NotImplementedError(f"{self.__class__.__name__} must be overriden in child class") - + + def on_request(self, message_bus, headers: Dict, message): + raise NotImplementedError( + f"{self.__class__.__name__} must be overriden in child class") + def get_registration_details(self): - details = AgentRegistrationDetails(str(self.agent_id), - self.app_id, - self.description, - self.upstream_message_bus.id, - self.downstream_message_bus.id) + details = AgentRegistrationDetails(str(self.agent_id), self.app_id, + self.description, + self.upstream_message_bus.id, + self.downstream_message_bus.id) return dataclasses.asdict(details) - - + + ''' TODO this has not been implemented yet, so we are commented them out for now. # not all agent would use this def on_control(self, control): @@ -185,51 +219,57 @@ def control_device(self, device_id, command): class FeederAgent(DistributedAgent): - def __init__(self, upstream_message_bus_def: MessageBusDefinition, + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - agent_config: Dict, - feeder_dict=None, simulation_id=None): - super(FeederAgent, self).__init__(upstream_message_bus_def, - downstream_message_bus_def, - agent_config, - feeder_dict, simulation_id) - + agent_config: Dict, + feeder_dict=None, + simulation_id=None): + super(FeederAgent, + self).__init__(upstream_message_bus_def, + downstream_message_bus_def, agent_config, + feeder_dict, simulation_id) + if feeder_dict is not None: feeder = cim.Feeder(mRID=downstream_message_bus_def.id) - self.feeder_area = DistributedModel(connection=self.connection, feeder=feeder, topology=feeder_dict) + self.feeder_area = DistributedModel(connection=self.connection, + feeder=feeder, + topology=feeder_dict) class SwitchAreaAgent(DistributedAgent): - def __init__(self, upstream_message_bus_def: MessageBusDefinition, + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - agent_config: Dict, - switch_area_dict=None, simulation_id=None): - - super().__init__(upstream_message_bus_def, - downstream_message_bus_def, - agent_config, - switch_area_dict, simulation_id) - + agent_config: Dict, + switch_area_dict=None, + simulation_id=None): + + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, switch_area_dict, simulation_id) + if switch_area_dict is not None: - self.switch_area = SwitchArea(downstream_message_bus_def.id, self.connection) + self.switch_area = SwitchArea(downstream_message_bus_def.id, + self.connection) self.switch_area.initialize_switch_area(switch_area_dict) - + class SecondaryAreaAgent(DistributedAgent): - def __init__(self, upstream_message_bus_def: MessageBusDefinition, + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, downstream_message_bus_def: MessageBusDefinition, - agent_config: Dict, - secondary_area_dict=None, simulation_id=None): - - super().__init__(upstream_message_bus_def, - downstream_message_bus_def, - agent_config, - secondary_area_dict, simulation_id) + agent_config: Dict, + secondary_area_dict=None, + simulation_id=None): + + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, secondary_area_dict, simulation_id) if secondary_area_dict is not None: - self.secondary_area = SecondaryArea(downstream_message_bus_def.id, self.connection) + self.secondary_area = SecondaryArea(downstream_message_bus_def.id, + self.connection) self.secondary_area.initialize_secondary_area(secondary_area_dict) @@ -244,16 +284,19 @@ class CoordinatingAgent: upstream, peer , downstream and broadcast """ - def __init__(self, feeder_id, system_message_bus_def: MessageBusDefinition, simulation_id=None): + def __init__(self, + feeder_id, + system_message_bus_def: MessageBusDefinition, + simulation_id=None): self.feeder_id = feeder_id self.distributed_agents = [] self.system_message_bus = GridAPPSDMessageBus(system_message_bus_def) self.system_message_bus.connect() - + #This will change when we have multiple feeders per system self.downstream_message_bus = self.system_message_bus - + # self.context = ContextManager.getContextByFeeder(self.feeder_id) # print(self.context) # self.addressable_equipments = self.context['data']['addressable_equipment'] diff --git a/pyproject.toml b/pyproject.toml index 0090974..39f2182 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gridappsd-python" -version = "v2.7.230209" +version = "v2.8.230413" description = "A GridAPPS-D Python Adapter" authors = [ "C. Allwardt <3979063+craig8@users.noreply.github.com>", @@ -33,10 +33,8 @@ python = ">=3.7.9,<4.0" PyYAML = "^6.0" pytz = "^2022.7" dateutils = "^0.6.7" -#gridappsd-cim-profile={url="https://github.com/GRIDAPPSD/gridappsd-cim-profile/releases/download/v0.7.20230120180831a0/gridappsd_cim_profile-0.7.20230120180831a0-py3-none-any.whl"} -#gridappsd-cim-profile = "^0.10.20230208223046a0" stomp-py = "6.0.0" -gridappsd-cim-lab = "^0.11.230209" +cim-graph = "^0.1.20230404214301a0" [tool.poetry.group.dev.dependencies] pytest = "^6.2.2" From f7932c1017d6d7f50a3c84555a80bcf9367c14ae Mon Sep 17 00:00:00 2001 From: C <3979063+craig8@users.noreply.github.com> Date: Thu, 13 Apr 2023 11:56:59 -0700 Subject: [PATCH 5/6] updated to use cimgraph rather than cimlab --- gridappsd/field_interface/agents/agents.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gridappsd/field_interface/agents/agents.py b/gridappsd/field_interface/agents/agents.py index f99fad0..68ae39b 100644 --- a/gridappsd/field_interface/agents/agents.py +++ b/gridappsd/field_interface/agents/agents.py @@ -7,9 +7,9 @@ from typing import Dict from cimgraph.loaders import ConnectionParameters, gridappsd -from cimgraph.models import DistributedModel, SecondaryArea, SwitchArea from cimgraph.loaders.gridappsd import GridappsdConnection -cimgraph. +from cimgraph.models import DistributedModel, SecondaryArea, SwitchArea + import gridappsd.topics as t from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus From 88b8d051fb854e9b1414c76b481db44e55893295 Mon Sep 17 00:00:00 2001 From: C <3979063+craig8@users.noreply.github.com> Date: Thu, 13 Apr 2023 12:19:00 -0700 Subject: [PATCH 6/6] update to latest cimgraph --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 39f2182..ffa31bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ PyYAML = "^6.0" pytz = "^2022.7" dateutils = "^0.6.7" stomp-py = "6.0.0" -cim-graph = "^0.1.20230404214301a0" +cim-graph = "^0.1.20230413185916a0" [tool.poetry.group.dev.dependencies] pytest = "^6.2.2"