From c10f842a945fdf0a3937b7c2b2058d4241889809 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Mon, 16 Dec 2024 14:07:54 -0800 Subject: [PATCH 1/6] Updated context manager to read SETO topo areas --- .../field_interface/agents/__init__.py | 2 +- .../field_interface/agents/agents.py | 21 +++ .../context_managers/__init__.py | 9 ++ .../centralized_context_managers.py | 93 ++++++++++++ .../context_manager_agents.py} | 138 +++++------------- .../context_managers/substation.py | 78 ++++++++++ .../field_interface/context_managers/utils.py | 23 +++ 7 files changed, 263 insertions(+), 101 deletions(-) create mode 100644 gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py create mode 100644 gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py rename gridappsd-field-bus-lib/gridappsd/field_interface/{context_manager.py => context_managers/context_manager_agents.py} (66%) create mode 100644 gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py create mode 100644 gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py index 92f6359..b7a0bdd 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py @@ -2,6 +2,6 @@ from gridappsd.field_interface.agents.agents import (FeederAgent, DistributedAgent, CoordinatingAgent, SwitchAreaAgent, - SecondaryAreaAgent) + SecondaryAreaAgent, SubstationAgent) __all__: List[str] = ["FeederAgent", "DistributedAgent", "CoordinatingAgent"] diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py index 01a7aaf..de120ca 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py @@ -233,7 +233,28 @@ def on_control(self, control): self.control_device(device_id, command) ''' +class SubstationAgent(DistributedAgent): + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + substation_dict=None, + simulation_id=None): + super().__init__(upstream_message_bus_def, downstream_message_bus_def, agent_config, + substation_dict, simulation_id) + self.substation_area = None + self.downstream_message_bus_def = downstream_message_bus_def + + self._connect() + + if self.agent_area_dict is not None: + substation = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + self.substation_area = DistributedArea(connection=self.connection, + container=substation, + distributed=True) + self.substation_area.build_from_topo_message(topology_dict=self.agent_area_dict, + centralized_graph=None) class FeederAgent(DistributedAgent): def __init__(self, diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py new file mode 100644 index 0000000..1cdd50c --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py @@ -0,0 +1,9 @@ +from typing import List + +from gridappsd.field_interface.context_managers.context_manager_agents import (SubstationAreaContextManager, + FeederAreaContextManager, + SwitchAreaContextManager, + SecondaryAreaContextManager) + + +__all__: List[str] = ["SubstationAreaContextManager","FeederAreaContextManager","SwitchAreaContextManager","SecondaryAreaContextManager"] \ No newline at end of file diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py new file mode 100644 index 0000000..012b50b --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py @@ -0,0 +1,93 @@ +import argparse +import logging +import json +import os +import time +from typing import Dict + + +from cimgraph.data_profile import CIM_PROFILE +from gridappsd import GridAPPSD +import gridappsd.topics as t +import gridappsd.field_interface.agents.agents as agents_mod +from gridappsd.field_interface.context_managers.utils import REQUEST_FIELD, get_MessageBusDefinition +from gridappsd.field_interface.context_managers.context_manager_agents import FeederAreaContextManager, SwitchAreaContextManager, SecondaryAreaContextManager + +cim_profile = CIM_PROFILE.RC4_2021.value +agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) +cim = agents_mod.cim + +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('goss').setLevel(logging.ERROR) +logging.getLogger('stomp.py').setLevel(logging.ERROR) + +_log = logging.getLogger(__name__) + +def _main(): + + time.sleep(10) + parser = argparse.ArgumentParser() + parser.add_argument( + "--simulation_id", + help="Simulation id to use for communicating with simulated devices on the message bus. \ + If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", + required=False) + opts = parser.parse_args() + simulation_id = opts.simulation_id + + agent_config = { + "app_id": + "context_manager", + "description": + "This agent provides topological context information like neighboring agents and devices to other distributed agents" + } + + gapps = GridAPPSD() + response = gapps.get_response(t.PLATFORM_STATUS, {"isField": True}) + field_model_mrid = response['fieldModelMrid'] + + is_field_initialized = False + + while not is_field_initialized: + response = gapps.get_response(REQUEST_FIELD, {"request_type": "is_initilized"}) + print(response) + is_field_initialized = response['data']['initialized'] + time.sleep(1) + + system_message_bus_def = get_MessageBusDefinition(field_model_mrid) + feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) + + #TODO: create access control for agents for different layers + feeder_agent = FeederAreaContextManager(system_message_bus_def, + feeder_message_bus_def, + agent_config, + simulation_id=simulation_id) + + for switch_area in feeder_agent.agent_area_dict['switch_areas']: + switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['message_bus_id'])) + print("Creating switch area agent " + str(switch_area['message_bus_id'])) + switch_area_agent = SwitchAreaContextManager(feeder_message_bus_def, + switch_area_message_bus_def, + agent_config, + simulation_id=simulation_id) + + # create secondary area distributed agents + for secondary_area in switch_area['secondary_areas']: + secondary_area_message_bus_def = get_MessageBusDefinition( + str(secondary_area['message_bus_id'])) + print("Creating secondary area agent " + str(secondary_area['message_bus_id'])) + secondary_area_agent = SecondaryAreaContextManager(switch_area_message_bus_def, + secondary_area_message_bus_def, + agent_config, + simulation_id=simulation_id) + + while True: + try: + time.sleep(0.1) + except KeyboardInterrupt: + print("Exiting sample") + break + + +if __name__ == "__main__": + _main() diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/context_manager_agents.py similarity index 66% rename from gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py rename to gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/context_manager_agents.py index 6ad6997..6371115 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/context_manager_agents.py @@ -1,21 +1,12 @@ -import argparse import logging -import os import time from typing import Dict -import gridappsd.field_interface.agents.agents as agents_mod import gridappsd.topics as t -from cimgraph.data_profile import CIM_PROFILE from gridappsd import GridAPPSD -from gridappsd.field_interface.agents import (FeederAgent, SecondaryAreaAgent, SwitchAreaAgent) +from gridappsd.field_interface.agents import (SubstationAgent, FeederAgent, SecondaryAreaAgent, SwitchAreaAgent) from gridappsd.field_interface.interfaces import MessageBusDefinition - -cim_profile = CIM_PROFILE.RC4_2021.value - -agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) - -cim = agents_mod.cim +from gridappsd.field_interface.context_managers.utils import REQUEST_FIELD logging.basicConfig(level=logging.DEBUG) logging.getLogger('goss').setLevel(logging.ERROR) @@ -23,8 +14,42 @@ _log = logging.getLogger(__name__) -#FieldBusManager's request topics. To be used only by context manager user role only. -REQUEST_FIELD = ".".join((t.PROCESS_PREFIX, "request.field")) +class SubstationAreaContextManager(SubstationAgent): + + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + substation_dict: Dict = None, + simulation_id: str = None): + + self.ot_connection = GridAPPSD() + if substation_dict is None: + request = {'request_type': 'get_context', 'modelId': downstream_message_bus_def.id} + substation_dict = None + while substation_dict is None: + self.ot_connection.get_logger().debug(f"Requesting topology for {self.__class__}") + response = self.ot_connection.get_response(REQUEST_FIELD, request, timeout=10) + if 'DistributionArea' in response: + substation_dict = response['DistributionArea']['Substation']['@id'] + self.ot_connection.get_logger().debug("Topology received at Substation Area Context Manager") + else: + time.sleep(5) + super().__init__(upstream_message_bus_def, downstream_message_bus_def, agent_config, + substation_dict, simulation_id) + + #Override agent_id to a static value + self.agent_id = downstream_message_bus_def.id + '.context_manager' + + self.context = {'data':substation_dict} + + self.registered_agents = {} + self.registered_agents[self.agent_id] = self.get_registration_details() + + self.neighbouring_agents = {} + self.upstream_agents = {} + self.downstream_agents = {} + self.ot_connection.get_logger().info("Substation Area Context Manager Created") class FeederAreaContextManager(FeederAgent): @@ -205,90 +230,3 @@ def on_request(self, message_bus, headers: Dict, message): elif message['request_type'] == 'control_command': simulation_id = message['input']['simulation_id'] self.ot_connection.send(t.simulation_input_topic(simulation_id), message) - - -def get_MessageBusDefinition(area_id: str) -> MessageBusDefinition: - - connection_args = { - "GRIDAPPSD_ADDRESS": os.environ.get('GRIDAPPSD_ADDRESS', "tcp://gridappsd:61613"), - "GRIDAPPSD_USER": os.environ.get('GRIDAPPSD_USER'), - "GRIDAPPSD_PASSWORD": os.environ.get('GRIDAPPSD_PASSWORD'), - "GRIDAPPSD_APPLICATION_ID": os.environ.get('GRIDAPPSD_APPLICATION_ID') - } - - bus = MessageBusDefinition(id=area_id, - is_ot_bus=True, - connection_type="GRIDAPPSD_TYPE_GRIDAPPSD", - conneciton_args=connection_args) - - return bus - - -def _main(): - - time.sleep(10) - parser = argparse.ArgumentParser() - parser.add_argument( - "--simulation_id", - help="Simulation id to use for communicating with simulated devices on the message bus. \ - If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", - required=False) - opts = parser.parse_args() - simulation_id = opts.simulation_id - - agent_config = { - "app_id": - "context_manager", - "description": - "This agent provides topological context information like neighboring agents and devices to other distributed agents" - } - - gapps = GridAPPSD() - response = gapps.get_response(t.PLATFORM_STATUS, {"isField": True}) - field_model_mrid = response['fieldModelMrid'] - - is_field_initialized = False - - while not is_field_initialized: - response = gapps.get_response(REQUEST_FIELD, {"request_type": "is_initilized"}) - print(response) - is_field_initialized = response['data']['initialized'] - time.sleep(1) - - system_message_bus_def = get_MessageBusDefinition(field_model_mrid) - feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) - - #TODO: create access control for agents for different layers - feeder_agent = FeederAreaContextManager(system_message_bus_def, - feeder_message_bus_def, - agent_config, - simulation_id=simulation_id) - - for switch_area in feeder_agent.agent_area_dict['switch_areas']: - switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['message_bus_id'])) - print("Creating switch area agent " + str(switch_area['message_bus_id'])) - switch_area_agent = SwitchAreaContextManager(feeder_message_bus_def, - switch_area_message_bus_def, - agent_config, - simulation_id=simulation_id) - - # create secondary area distributed agents - for secondary_area in switch_area['secondary_areas']: - secondary_area_message_bus_def = get_MessageBusDefinition( - str(secondary_area['message_bus_id'])) - print("Creating secondary area agent " + str(secondary_area['message_bus_id'])) - secondary_area_agent = SecondaryAreaContextManager(switch_area_message_bus_def, - secondary_area_message_bus_def, - agent_config, - simulation_id=simulation_id) - - while True: - try: - time.sleep(0.1) - except KeyboardInterrupt: - print("Exiting sample") - break - - -if __name__ == "__main__": - _main() diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py new file mode 100644 index 0000000..2aa74fa --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py @@ -0,0 +1,78 @@ +import argparse +import json +import logging +import time + +from cimgraph.data_profile import CIM_PROFILE + +import gridappsd.field_interface.agents.agents as agents_mod +from gridappsd.field_interface.interfaces import MessageBusDefinition +from gridappsd.field_interface.context_managers.context_manager_agents import SubstationAreaContextManager + + +cim_profile = CIM_PROFILE.RC4_2021.value +agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) +cim = agents_mod.cim + +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('goss').setLevel(logging.ERROR) +logging.getLogger('stomp.py').setLevel(logging.ERROR) + +_log = logging.getLogger(__name__) + +def _main(): + + parser = argparse.ArgumentParser() + parser.add_argument( + "--simulation_id", + help="Simulation id to use for communicating with simulated devices on the message bus. \ + If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", + required=False) + parser.add_argument( + "--system_message_bus", + help="Yaml file to connect with upstream system(OT) message bus.", + required=True) + + parser.add_argument( + "--substation_message_bus", + help="Yaml file to connect with downstream substation area message bus.", + required=True) + + parser.add_argument( + "--substation_dict", + help="JSON file containing substation topology dictionary. If this file is not provided then disctionary is requested by Field Bus Manager using upstream message bus.", + required=False) + + opts = parser.parse_args() + simulation_id = opts.simulation_id + + agent_config = { + "app_id": + "context_manager", + "description": + "This agent provides topological context information like neighboring agents and devices to other distributed agents" + } + + + system_message_bus_def = MessageBusDefinition.load(opts.system_message_bus) + substation_message_bus_def = MessageBusDefinition.load(opts.substation_message_bus) + + with open(opts.substation_dict,encoding="utf-8") as f: + substation_dict = json.load(f) + + substation_agent = SubstationAreaContextManager(system_message_bus_def, + substation_message_bus_def, + agent_config, + substation_dict = substation_dict, + simulation_id=simulation_id) + + while True: + try: + time.sleep(0.1) + except KeyboardInterrupt: + print("Exiting sample") + break + + +if __name__ == "__main__": + _main() diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py new file mode 100644 index 0000000..aacdf5b --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py @@ -0,0 +1,23 @@ +import os + +import gridappsd.topics as t +from gridappsd.field_interface.interfaces import MessageBusDefinition + +#FieldBusManager's request topics. To be used only by context manager user role only. +REQUEST_FIELD = ".".join((t.PROCESS_PREFIX, "request.field")) + +def get_MessageBusDefinition(area_id: str) -> MessageBusDefinition: + + connection_args = { + "GRIDAPPSD_ADDRESS": os.environ.get('GRIDAPPSD_ADDRESS', "tcp://gridappsd:61613"), + "GRIDAPPSD_USER": os.environ.get('GRIDAPPSD_USER'), + "GRIDAPPSD_PASSWORD": os.environ.get('GRIDAPPSD_PASSWORD'), + "GRIDAPPSD_APPLICATION_ID": os.environ.get('GRIDAPPSD_APPLICATION_ID') + } + + bus = MessageBusDefinition(id=area_id, + is_ot_bus=True, + connection_type="GRIDAPPSD_TYPE_GRIDAPPSD", + conneciton_args=connection_args) + + return bus From 092dab7720b9b8143c28c63a886200127810b775 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Thu, 19 Dec 2024 14:42:52 -0800 Subject: [PATCH 2/6] Updated classes for areas --- .../gridappsd/field_interface/agents/agents.py | 11 ++++++----- .../centralized_context_managers.py | 5 ++++- .../context_managers/substation.py | 16 ++++++++++------ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py index de120ca..246cf12 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py @@ -249,12 +249,13 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - substation = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + substation = cim.Substation(mRID=self.downstream_message_bus_def.id) self.substation_area = DistributedArea(connection=self.connection, container=substation, distributed=True) self.substation_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + centralized_graph=None) + class FeederAgent(DistributedAgent): def __init__(self, @@ -271,7 +272,7 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - feeder = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + feeder = cim.FeederArea(mRID=self.downstream_message_bus_def.id) self.feeder_area = DistributedArea(connection=self.connection, container=feeder, distributed=True) @@ -295,7 +296,7 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - container = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + container = cim.SwitchArea(mRID=self.downstream_message_bus_def.id) self.switch_area = DistributedArea(container=container, connection=self.connection, distributed=True) @@ -323,7 +324,7 @@ def __init__(self, _log.warning( f"No addressable equipment in the secondary area with down stream message bus id: {self.downstream_message_bus.id}." ) - container = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + container = cim.SecondaryArea(mRID=self.downstream_message_bus_def.id) self.secondary_area = DistributedArea(container=container, connection=self.connection, distributed=True) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py index 012b50b..9af78cb 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py @@ -42,7 +42,7 @@ def _main(): "This agent provides topological context information like neighboring agents and devices to other distributed agents" } - gapps = GridAPPSD() + '''gapps = GridAPPSD() response = gapps.get_response(t.PLATFORM_STATUS, {"isField": True}) field_model_mrid = response['fieldModelMrid'] @@ -53,6 +53,9 @@ def _main(): print(response) is_field_initialized = response['data']['initialized'] time.sleep(1) + ''' + + field_model_mrid = "49AD8E07-3BF9-A4E2-CB8F-C3722F837B62" system_message_bus_def = get_MessageBusDefinition(field_model_mrid) feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py index 2aa74fa..4d91694 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py @@ -10,8 +10,8 @@ from gridappsd.field_interface.context_managers.context_manager_agents import SubstationAreaContextManager -cim_profile = CIM_PROFILE.RC4_2021.value -agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) +cim_profile = "cimhub_2023" +agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=8) cim = agents_mod.cim logging.basicConfig(level=logging.DEBUG) @@ -29,18 +29,22 @@ def _main(): If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", required=False) parser.add_argument( - "--system_message_bus", + "-u", + "--upstream_system_message_bus", help="Yaml file to connect with upstream system(OT) message bus.", required=True) parser.add_argument( - "--substation_message_bus", + "-d", + "--downstream_substation_message_bus", help="Yaml file to connect with downstream substation area message bus.", + type=str, required=True) parser.add_argument( "--substation_dict", help="JSON file containing substation topology dictionary. If this file is not provided then disctionary is requested by Field Bus Manager using upstream message bus.", + type=str, required=False) opts = parser.parse_args() @@ -54,8 +58,8 @@ def _main(): } - system_message_bus_def = MessageBusDefinition.load(opts.system_message_bus) - substation_message_bus_def = MessageBusDefinition.load(opts.substation_message_bus) + system_message_bus_def = MessageBusDefinition.load(opts.upstream_system_message_bus) + substation_message_bus_def = MessageBusDefinition.load(opts.downstream_substation_message_bus) with open(opts.substation_dict,encoding="utf-8") as f: substation_dict = json.load(f) From a4a7a98b9a3f637f89c70084f72810ca07043f84 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Tue, 21 Jan 2025 10:59:25 -0800 Subject: [PATCH 3/6] Updated to use latest cim-graph and seto model changes --- .../field_interface/agents/agents.py | 14 ++-- .../centralized_context_managers.py | 34 +++++--- .../context_managers/substation.py | 5 +- .../field_interface/field_proxy_forwarder.py | 79 +++++++++++++++++++ 4 files changed, 109 insertions(+), 23 deletions(-) create mode 100644 gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py index 246cf12..f6b9edf 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py @@ -253,8 +253,7 @@ def __init__(self, self.substation_area = DistributedArea(connection=self.connection, container=substation, distributed=True) - self.substation_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.substation_area.build_from_topo_message(topology_dict=self.agent_area_dict) class FeederAgent(DistributedAgent): @@ -276,8 +275,7 @@ def __init__(self, self.feeder_area = DistributedArea(connection=self.connection, container=feeder, distributed=True) - self.feeder_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.feeder_area.build_from_topo_message(topology_dict=self.agent_area_dict) class SwitchAreaAgent(DistributedAgent): @@ -300,8 +298,7 @@ def __init__(self, self.switch_area = DistributedArea(container=container, connection=self.connection, distributed=True) - self.switch_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.switch_area.build_from_topo_message(topology_dict=self.agent_area_dict) class SecondaryAreaAgent(DistributedAgent): @@ -320,7 +317,7 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - if len(self.agent_area_dict['addressable_equipment']) == 0: + if len(self.agent_area_dict['AddressableEquipment']) == 0: _log.warning( f"No addressable equipment in the secondary area with down stream message bus id: {self.downstream_message_bus.id}." ) @@ -328,8 +325,7 @@ def __init__(self, self.secondary_area = DistributedArea(container=container, connection=self.connection, distributed=True) - self.secondary_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.secondary_area.build_from_topo_message(topology_dict=self.agent_area_dict) class CoordinatingAgent: diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py index 9af78cb..7f85a3a 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py @@ -13,7 +13,7 @@ from gridappsd.field_interface.context_managers.utils import REQUEST_FIELD, get_MessageBusDefinition from gridappsd.field_interface.context_managers.context_manager_agents import FeederAreaContextManager, SwitchAreaContextManager, SecondaryAreaContextManager -cim_profile = CIM_PROFILE.RC4_2021.value +cim_profile = CIM_PROFILE.CIMHUB_2023.value agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) cim = agents_mod.cim @@ -42,7 +42,7 @@ def _main(): "This agent provides topological context information like neighboring agents and devices to other distributed agents" } - '''gapps = GridAPPSD() + gapps = GridAPPSD() response = gapps.get_response(t.PLATFORM_STATUS, {"isField": True}) field_model_mrid = response['fieldModelMrid'] @@ -53,36 +53,44 @@ def _main(): print(response) is_field_initialized = response['data']['initialized'] time.sleep(1) - ''' + field_model_mrid = "49AD8E07-3BF9-A4E2-CB8F-C3722F837B62" system_message_bus_def = get_MessageBusDefinition(field_model_mrid) feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) + #TODO: Remove after topology service test + # with open("ieee13_topo_msg.json",encoding="utf-8") as f: + # feeder_dict = json.load(f)["DistributionArea"]["Substations"][0]["NormalEnergizedFeeder"][0]['FeederArea'] + + #TODO: create access control for agents for different layers feeder_agent = FeederAreaContextManager(system_message_bus_def, feeder_message_bus_def, agent_config, - simulation_id=simulation_id) - - for switch_area in feeder_agent.agent_area_dict['switch_areas']: - switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['message_bus_id'])) - print("Creating switch area agent " + str(switch_area['message_bus_id'])) + simulation_id=simulation_id, + feeder_dict=feeder_dict) + #print(feeder_agent.agent_area_dict) + for switch_area in feeder_agent.agent_area_dict['SwitchAreas']: + switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['@id'])) + print("Creating switch area agent " + str(switch_area['@id'])) switch_area_agent = SwitchAreaContextManager(feeder_message_bus_def, switch_area_message_bus_def, agent_config, - simulation_id=simulation_id) + simulation_id=simulation_id, + switch_area_dict=switch_area) # create secondary area distributed agents - for secondary_area in switch_area['secondary_areas']: + for secondary_area in switch_area['SecondaryAreas']: secondary_area_message_bus_def = get_MessageBusDefinition( - str(secondary_area['message_bus_id'])) - print("Creating secondary area agent " + str(secondary_area['message_bus_id'])) + str(secondary_area['@id'])) + print("Creating secondary area agent " + str(secondary_area['@id'])) secondary_area_agent = SecondaryAreaContextManager(switch_area_message_bus_def, secondary_area_message_bus_def, agent_config, - simulation_id=simulation_id) + simulation_id=simulation_id, + secondary_area_dict=secondary_area) while True: try: diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py index 4d91694..fbb7d08 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py @@ -62,7 +62,8 @@ def _main(): substation_message_bus_def = MessageBusDefinition.load(opts.downstream_substation_message_bus) with open(opts.substation_dict,encoding="utf-8") as f: - substation_dict = json.load(f) + substation_dict = json.load(f)["DistributionArea"]["Substations"][0] + substation_agent = SubstationAreaContextManager(system_message_bus_def, substation_message_bus_def, @@ -70,6 +71,8 @@ def _main(): substation_dict = substation_dict, simulation_id=simulation_id) + print(substation_agent.context) + while True: try: time.sleep(0.1) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py new file mode 100644 index 0000000..f2a438d --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py @@ -0,0 +1,79 @@ +import stomp +import json +from typing import Callable, Dict +from gridappsd import GridAPPSD +from gridappsd import topics + +class FieldProxyForwarder(): + """ + FieldProxyForwarder acts as a bridge between field bus and OT bus + when direct connection is not possible. + """ + + def __init__(self, connection_url: str, username: str, password: str): + + #Connect to proxy + self.broker_url = connection_url + self.username = username + self.password = password + self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))]) + self.proxy_connection.connect(self.username, self.password, wait=True) + + + #Connect to OT + self.ot_connection = GridAPPSD() + + #Subscribe to messages from field + self.proxy_connection.set_listener('', self.on_message_from_field) + self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC, id=1, ack="auto") + + #Subscribe to messages on OT bus + self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot) + + def on_message_from_ot(self, headers, message): + "Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus" + try: + print(f"Received message from OT: {message}") + + if headers["destination"] == topics.field_input_topic(): + self.proxy_connection.send(topics.field_input_topic, message) + + elif "goss.gridappsd.field.simulation.output." in headers["destination"]: + print("Simulation output received at OT. Ignoring.") + + else: + print(f"Unrecognized message received by OT: {message}") + + except Exception as e: + print(f"Error processing message: {e}") + + + + def on_message_from_field(self, headers, message): + "Receives messages coming from Proxy bus (e.g. ARTEMIS) and forwards to OT bus" + try: + print(f"Received message at Proxy: {message}") + + if headers["destination"] == topics.field_output_topic(): + self.ot_connection.send(topics.field_output_topic, message) + + elif "context_manager" in headers["destination"]: + request_data = json.loads(message) + request_type = request_data.get("request_type") + if request_type == "get_context": + response = self.ot_connection.get_response(headers["destination"],message) + self.proxy_connection.send(headers["reply_to"],response) + + else: + print(f"Unrecognized message received by Proxy: {message}") + + except Exception as e: + print(f"Error processing message: {e}") + +if __name__ == "__main__": + + proxy_connection_url = "localhost:61613" + proxy_username = "admin" + proxy_password = "admin" + + proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) \ No newline at end of file From 1a7c729dcff41980bee6d4ab49c41d714b97ae7c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 21 Jan 2025 19:05:58 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .github/workflows/deploy-dev-release.yml | 16 ++++++++-------- .../context_managers/__init__.py | 2 +- .../centralized_context_managers.py | 2 +- .../context_managers/substation.py | 4 ++-- .../field_interface/field_proxy_forwarder.py | 18 +++++++++--------- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/.github/workflows/deploy-dev-release.yml b/.github/workflows/deploy-dev-release.yml index 66ae989..4c459fd 100644 --- a/.github/workflows/deploy-dev-release.yml +++ b/.github/workflows/deploy-dev-release.yml @@ -71,18 +71,18 @@ jobs: id: should_update_version env: PYTHON_FILES_CHANGED: ${{ steps.changed-files.outputs.all_changed_files }} - run: | + run: | should_update=false for file in ${PYTHON_FILES_CHANGED}; do should_update=true break done - + # handle input required update. if ${{ inputs.upgrade-version }} ; then should_update=true fi - + echo "Should update == ${should_update}" echo "value=${should_update}" >> "$GITHUB_OUTPUT" @@ -108,16 +108,16 @@ jobs: if: ${{ steps.should_update_version.outputs.value == 'true' }} run: | ./scripts/run_on_each.sh poetry version prerelease - + git config --global user.name 'Commit Bot' git config --global user.email '3979063+craig8@users.noreply.github.com' git add **/pyproject.toml git commit -m "Auto Update Version Number" - git push + git push - name: Install library if: ${{ steps.should_update_version.outputs.value == 'true' }} - run: | + run: | ./scripts/poetry_install.sh - name: Create build artifacts @@ -126,7 +126,7 @@ jobs: run: | # set the right version in pyproject.toml before build and publish ./scripts/poetry_build.sh - + echo "value=v$(poetry version --short)" >> "$GITHUB_OUTPUT" # TODO: Check for pipy token and only release to github if we have it. @@ -150,7 +150,7 @@ jobs: id: publish-to-pypi if: ${{ steps.should_update_version.outputs.value == 'true' }} run: | - + # This is needed, because the poetry publish will fail at the top level of the project # so ./scripts/run_on_each.sh fails for that. echo "POETRY_PUBLISH_OPTIONS=''" >> $GITHUB_ENV diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py index 1cdd50c..1105445 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py @@ -6,4 +6,4 @@ SecondaryAreaContextManager) -__all__: List[str] = ["SubstationAreaContextManager","FeederAreaContextManager","SwitchAreaContextManager","SecondaryAreaContextManager"] \ No newline at end of file +__all__: List[str] = ["SubstationAreaContextManager","FeederAreaContextManager","SwitchAreaContextManager","SecondaryAreaContextManager"] diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py index 7f85a3a..ef2d997 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py @@ -53,7 +53,7 @@ def _main(): print(response) is_field_initialized = response['data']['initialized'] time.sleep(1) - + field_model_mrid = "49AD8E07-3BF9-A4E2-CB8F-C3722F837B62" diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py index fbb7d08..ea3b4c0 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py @@ -40,7 +40,7 @@ def _main(): help="Yaml file to connect with downstream substation area message bus.", type=str, required=True) - + parser.add_argument( "--substation_dict", help="JSON file containing substation topology dictionary. If this file is not provided then disctionary is requested by Field Bus Manager using upstream message bus.", @@ -58,7 +58,7 @@ def _main(): } - system_message_bus_def = MessageBusDefinition.load(opts.upstream_system_message_bus) + system_message_bus_def = MessageBusDefinition.load(opts.upstream_system_message_bus) substation_message_bus_def = MessageBusDefinition.load(opts.downstream_substation_message_bus) with open(opts.substation_dict,encoding="utf-8") as f: diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py index f2a438d..a5397ee 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py @@ -6,10 +6,10 @@ class FieldProxyForwarder(): """ - FieldProxyForwarder acts as a bridge between field bus and OT bus - when direct connection is not possible. + FieldProxyForwarder acts as a bridge between field bus and OT bus + when direct connection is not possible. """ - + def __init__(self, connection_url: str, username: str, password: str): #Connect to proxy @@ -18,15 +18,15 @@ def __init__(self, connection_url: str, username: str, password: str): self.password = password self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))]) self.proxy_connection.connect(self.username, self.password, wait=True) - - + + #Connect to OT self.ot_connection = GridAPPSD() #Subscribe to messages from field self.proxy_connection.set_listener('', self.on_message_from_field) self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC, id=1, ack="auto") - + #Subscribe to messages on OT bus self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot) @@ -37,7 +37,7 @@ def on_message_from_ot(self, headers, message): if headers["destination"] == topics.field_input_topic(): self.proxy_connection.send(topics.field_input_topic, message) - + elif "goss.gridappsd.field.simulation.output." in headers["destination"]: print("Simulation output received at OT. Ignoring.") @@ -56,7 +56,7 @@ def on_message_from_field(self, headers, message): if headers["destination"] == topics.field_output_topic(): self.ot_connection.send(topics.field_output_topic, message) - + elif "context_manager" in headers["destination"]: request_data = json.loads(message) request_type = request_data.get("request_type") @@ -76,4 +76,4 @@ def on_message_from_field(self, headers, message): proxy_username = "admin" proxy_password = "admin" - proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) \ No newline at end of file + proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) From 25c92d62998c76260a26bcc13332602127dff962 Mon Sep 17 00:00:00 2001 From: poorva1209 Date: Tue, 21 Jan 2025 23:06:41 -0800 Subject: [PATCH 5/6] updated after testing --- .../field_interface/field_proxy_forwarder.py | 96 +++++++++++-------- 1 file changed, 54 insertions(+), 42 deletions(-) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py index f2a438d..5c601c5 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py @@ -1,34 +1,63 @@ import stomp import json +import time from typing import Callable, Dict from gridappsd import GridAPPSD from gridappsd import topics +REQUEST_FIELD = ".".join((topics.PROCESS_PREFIX, "request.field")) + +class FieldListener(): + + def on_message(self, headers, message): + "Receives messages coming from Proxy bus (e.g. ARTEMIS) and forwards to OT bus" + try: + print(f"Received message at Proxy: {message}") + + if headers["destination"] == topics.field_output_topic(): + self.ot_connection.send(topics.field_output_topic(), message) + + elif headers["destination"] == topics.field_input_topic(): + pass + + elif headers["destination"] == REQUEST_FIELD: + request_data = json.loads(message) + request_type = request_data.get("request_type") + if request_type == "get_context": + response = self.ot_connection.get_response(headers["destination"],message) + self.proxy_connection.send(headers["reply_to"],response) + + else: + print(f"Unrecognized message received by Proxy: {message}") + + except Exception as e: + print(f"Error processing message: {e}") + class FieldProxyForwarder(): """ - FieldProxyForwarder acts as a bridge between field bus and OT bus - when direct connection is not possible. + FieldProxyForwarder acts as a bridge between field bus and OT bus + when direct connection is not possible. """ - + def __init__(self, connection_url: str, username: str, password: str): #Connect to proxy self.broker_url = connection_url self.username = username self.password = password - self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))]) + self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))],keepalive=True) + self.proxy_connection.set_listener('', FieldListener()) self.proxy_connection.connect(self.username, self.password, wait=True) - - + print('Connected to Proxy') + #Connect to OT self.ot_connection = GridAPPSD() #Subscribe to messages from field - self.proxy_connection.set_listener('', self.on_message_from_field) - self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC, id=1, ack="auto") + self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto") #Subscribe to messages on OT bus - self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot) + self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot) def on_message_from_ot(self, headers, message): "Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus" @@ -36,44 +65,27 @@ def on_message_from_ot(self, headers, message): print(f"Received message from OT: {message}") if headers["destination"] == topics.field_input_topic(): - self.proxy_connection.send(topics.field_input_topic, message) - - elif "goss.gridappsd.field.simulation.output." in headers["destination"]: - print("Simulation output received at OT. Ignoring.") + self.proxy_connection.send(topics.field_input_topic(), message) else: print(f"Unrecognized message received by OT: {message}") except Exception as e: print(f"Error processing message: {e}") - - - - def on_message_from_field(self, headers, message): - "Receives messages coming from Proxy bus (e.g. ARTEMIS) and forwards to OT bus" - try: - print(f"Received message at Proxy: {message}") - - if headers["destination"] == topics.field_output_topic(): - self.ot_connection.send(topics.field_output_topic, message) - - elif "context_manager" in headers["destination"]: - request_data = json.loads(message) - request_type = request_data.get("request_type") - if request_type == "get_context": - response = self.ot_connection.get_response(headers["destination"],message) - self.proxy_connection.send(headers["reply_to"],response) - - else: - print(f"Unrecognized message received by Proxy: {message}") - - except Exception as e: - print(f"Error processing message: {e}") + if __name__ == "__main__": - - proxy_connection_url = "localhost:61613" - proxy_username = "admin" - proxy_password = "admin" - - proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) \ No newline at end of file + import argparse + parser = argparse.ArgumentParser(prog="TestForwarder") + parser.add_argument("username") + parser.add_argument("passwd") + parser.add_argument("connection_url") + opts = parser.parse_args() + proxy_connection_url = opts.connection_url + proxy_username = opts.username + proxy_password = opts.passwd + + proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) + + while True: + time.sleep(0.1) From a7390ea2f04a13646b4e88472636a55c711aa49a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 31 Jan 2025 22:07:13 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../context_managers/centralized_context_managers.py | 2 +- .../gridappsd/field_interface/field_proxy_forwarder.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py index 3bf5e64..3451448 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py @@ -65,7 +65,7 @@ def _main(): feeder_message_bus_def, agent_config, simulation_id=simulation_id) - + #print(feeder_agent.agent_area_dict) for switch_area in feeder_agent.agent_area_dict['SwitchAreas']: switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['@id'])) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py index 6cc0b60..05628cf 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py @@ -20,7 +20,7 @@ def on_message(self, headers, message): if headers["destination"] == topics.field_output_topic(): self.ot_connection.send(topics.field_output_topic(), message) - + elif headers["destination"] == topics.field_input_topic(): pass @@ -57,11 +57,11 @@ def __init__(self, connection_url: str, username: str, password: str): self.proxy_connection.connect(self.username, self.password, wait=True) print('Connected to Proxy') - + #Subscribe to messages from field self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto") - + #Subscribe to messages on OT bus self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot) @@ -78,7 +78,7 @@ def on_message_from_ot(self, headers, message): except Exception as e: print(f"Error processing message: {e}") - + if __name__ == "__main__": import argparse