diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py index d5041b0..41206ad 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py @@ -6,9 +6,10 @@ from datetime import datetime from typing import Dict -from cimgraph.loaders import ConnectionParameters, gridappsd -from cimgraph.loaders.gridappsd import GridappsdConnection -from cimgraph.models import DistributedModel, SecondaryArea, SwitchArea +from cimgraph.databases import ConnectionParameters +from cimgraph.databases.gridappsd import GridappsdConnection +from cimgraph.models import FeederModel +from cimgraph.models.distributed_area import DistributedArea from gridappsd import DifferenceBuilder import gridappsd.topics as t @@ -16,17 +17,22 @@ from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus from gridappsd.field_interface.interfaces import (FieldMessageBus, MessageBusDefinition) + +CIM_PROFILE = None +IEC61970_301 = None cim = None -sparql = None _log = logging.getLogger(__name__) -def set_cim_profile(cim_profile): +def set_cim_profile(cim_profile: str, iec61970_301: int): + global CIM_PROFILE + global IEC61970_301 global cim + CIM_PROFILE = cim_profile + IEC61970_301 = iec61970_301 cim = importlib.import_module('cimgraph.data_profile.' + cim_profile) - gridappsd.set_cim_profile(cim_profile) @dataclass @@ -57,9 +63,12 @@ def __init__(self, self.simulation_id = simulation_id self.context = None - #TODO: Change params and connection to local connection - self.params = ConnectionParameters() + # TODO: Change params and connection to local connection + self.params = ConnectionParameters(cim_profile=CIM_PROFILE, + iec61970_301=IEC61970_301) + self.connection = GridappsdConnection(self.params) + self.connection.cim_profile = cim_profile self.app_id = agent_config['app_id'] self.description = agent_config['description'] @@ -68,7 +77,7 @@ def __init__(self, if ('context_manager' not in self.app_id): self.agent_id = "da_" + self.app_id else: - self.agent_id = downstream_message_bus_def.id+'.context_manager' + self.agent_id = downstream_message_bus_def.id + '.context_manager' self.agent_area_dict = agent_area_dict @@ -88,7 +97,7 @@ def __init__(self, # self.context = ContextManager.get(self.feeder_id, self.area_id) - #if agent_dict is not None: + # if agent_dict is not None: # self.addressable_equipments = agent_dict['addressable_equipment'] # self.unaddressable_equipments = agent_dict['unaddressable_equipment'] @@ -104,7 +113,7 @@ def _connect(self): if ('context_manager' not in self.app_id): self.agent_id = "da_" + self.app_id + "_" + self.downstream_message_bus.id - + if self.agent_area_dict is None: context = LocalContext.get_context_by_message_bus( self.downstream_message_bus) @@ -117,7 +126,7 @@ def _connect(self): if ('context_manager' not in self.app_id): LocalContext.register_agent(self.downstream_message_bus, self.upstream_message_bus, self) - + def disconnect(self): if self.upstream_message_bus is not None: @@ -159,7 +168,6 @@ def subscribe_to_messages(self): self.app_id), self.on_upstream_message) - if ('context_manager' not in self.app_id): _log.debug( f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ @@ -220,22 +228,26 @@ def get_registration_details(self): self.upstream_message_bus.id, self.downstream_message_bus.id) return dataclasses.asdict(details) - + def publish_downstream(self, message): - self.downstream_message_bus.send(t.field_message_bus_topic(self.downstream_message_bus.id), message) - - def publish_upstream(self, message): - self.upstream_message_bus.send(t.field_message_bus_topic(self.upstream_message_bus.id), message) + self.downstream_message_bus.send( + t.field_message_bus_topic(self.downstream_message_bus.id), message) + def publish_upstream(self, message): + self.upstream_message_bus.send( + t.field_message_bus_topic(self.upstream_message_bus.id), message) - def send_control_command(self, differenceBuilder : DifferenceBuilder): + def send_control_command(self, differenceBuilder: DifferenceBuilder): if self.simulation_id is not None: - LocalContext.send_control_command(self.downstream_message_bus, differenceBuilder) + LocalContext.send_control_command(self.downstream_message_bus, + differenceBuilder) + ''' TODO This block needs to be tested with device interface else: self.downstream_message_bus.send(devie_interface_topic, differenceBuilder) - ''' + ''' + ''' TODO this has not been implemented yet, so we are commented them out for now. # not all agent would use this @@ -254,19 +266,21 @@ def __init__(self, agent_config: Dict, feeder_dict=None, simulation_id=None): - super().__init__(upstream_message_bus_def, - downstream_message_bus_def, agent_config, - feeder_dict, simulation_id) + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, feeder_dict, simulation_id) self.feeder_area = None self.downstream_message_bus_def = downstream_message_bus_def self._connect() if self.agent_area_dict is not None: - feeder = cim.Feeder(mRID=self.downstream_message_bus_def.id) - self.feeder_area = DistributedModel(connection=self.connection, - feeder=feeder, - topology=self.agent_area_dict) + feeder = cim.EquipmentContainer( + mRID=self.downstream_message_bus_def.id) + self.feeder_area = DistributedArea(connection=self.connection, + container=feeder, + distributed=True) + self.feeder_area.build_from_topo_message( + topology_dict=self.agent_area_dict, centralized_graph=None) class SwitchAreaAgent(DistributedAgent): @@ -285,9 +299,13 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - self.switch_area = SwitchArea(self.downstream_message_bus_def.id, - self.connection) - self.switch_area.initialize_switch_area(self.agent_area_dict) + container = cim.EquipmentContainer( + mRID=self.downstream_message_bus_def.id) + self.switch_area = DistributedArea(container=container, + connection=self.connection, + distributed=True) + self.switch_area.build_from_topo_message( + topology_dict=self.agent_area_dict, centralized_graph=None) class SecondaryAreaAgent(DistributedAgent): @@ -307,12 +325,16 @@ def __init__(self, if self.agent_area_dict is not None: if len(self.agent_area_dict['addressable_equipment']) == 0: - _log.warn(f"No addressable equipment in the secondary area with down stream message bus id: {self.downstream_message_bus.id}.") - - self.secondary_area = SecondaryArea(self.downstream_message_bus_def.id, - self.connection) - self.secondary_area.initialize_secondary_area(self.agent_area_dict) - + _log.warning( + f"No addressable equipment in the secondary area with down stream message bus id: {self.downstream_message_bus.id}." + ) + container = cim.EquipmentContainer( + mRID=self.downstream_message_bus_def.id) + self.secondary_area = DistributedArea(container=container, + connection=self.connection, + distributed=True) + self.secondary_area.build_from_topo_message( + topology_dict=self.agent_area_dict, centralized_graph=None) class CoordinatingAgent: @@ -336,7 +358,7 @@ def __init__(self, self.system_message_bus = GridAPPSDMessageBus(system_message_bus_def) self.system_message_bus.connect() - #This will change when we have multiple feeders per system + # This will change when we have multiple feeders per system self.downstream_message_bus = self.system_message_bus # self.context = ContextManager.getContextByFeeder(self.feeder_id) diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py new file mode 100644 index 0000000..b8d89c9 --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py @@ -0,0 +1,305 @@ +import argparse +import logging +import os +import time +from typing import Dict + +import gridappsd.field_interface.agents.agents as agents_mod +import gridappsd.topics as t +from cimgraph.data_profile import CIM_PROFILE +from gridappsd import GridAPPSD +from gridappsd.field_interface.agents import (FeederAgent, SecondaryAreaAgent, + SwitchAreaAgent) +from gridappsd.field_interface.interfaces import MessageBusDefinition + + +cim_profile = CIM_PROFILE.RC4_2021.value + +agents_mod.set_cim_profile(cim_profile=cim_profile,iec61970_301=7) + +cim = agents_mod.cim + +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('goss').setLevel(logging.ERROR) +logging.getLogger('stomp.py').setLevel(logging.ERROR) + +_log = logging.getLogger(__name__) + +#FieldBusManager's request topics. To be used only by context manager user role only. +REQUEST_FIELD = ".".join((t.PROCESS_PREFIX, "request.field")) + + +class FeederAreaContextManager(FeederAgent): + + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + feeder_dict: Dict = None, + simulation_id: str = None): + + self.ot_connection = GridAPPSD() + if feeder_dict is None: + request = {'request_type':'get_context', + 'modelId': downstream_message_bus_def.id} + feeder_dict = None + while feeder_dict is None: + self.ot_connection.get_logger().info(f"Requesting topology for {self.__class__}") + response = self.ot_connection.get_response(REQUEST_FIELD, request, timeout=10) + if 'data' in response: + feeder_dict = response['data'] + self.ot_connection.send_status("******RCVD FEEDER *************", '/topic/goss.gridappsd.platform.log', 'DEBUG') + else: + time.sleep(5) + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, feeder_dict, simulation_id) + + #Override agent_id to a static value + self.agent_id = downstream_message_bus_def.id + '.context_manager' + + self.context = None + + self.registered_agents = {} + self.registered_agents[self.agent_id] = self.get_registration_details() + + self.neighbouring_agents = {} + self.upstream_agents = {} + self.downstream_agents = {} + + def on_request(self, message_bus, headers: Dict, message): + + _log.debug(f"Received request: {message}") + + if message['request_type'] == 'get_context': + reply_to = headers['reply-to'] + if self.context is None: + self.context = self.ot_connection.get_response(REQUEST_FIELD, message) + message_bus.send(reply_to,self.context) + + elif message['request_type'] == 'register_agent': + self.ot_connection.send(t.REGISTER_AGENT_QUEUE, message) + self.registered_agents[message['agent'] + ['agent_id']] = message['agent'] + + elif message['request_type'] == 'get_agents': + reply_to = headers['reply-to'] + message_bus.send(reply_to, self.registered_agents) + + elif message['request_type'] == 'is_initialized': + reply_to = headers['reply-to'] + message = {'initialized':True} + message_bus.send(reply_to, message) + + elif message['request_type'] == 'control_command': + simulation_id = message['input']['simulation_id'] + self.ot_connection.send(t.simulation_input_topic(simulation_id), + message) + + +class SwitchAreaContextManager(SwitchAreaAgent): + + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + switch_area_dict: Dict = None, + simulation_id: str = None): + + self.ot_connection = GridAPPSD() + if switch_area_dict is None: + request = {'request_type':'get_context', + 'areaId': downstream_message_bus_def.id} + switch_area_dict = self.ot_connection.get_response( + REQUEST_FIELD, request, timeout=10)['data'] + + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, switch_area_dict, simulation_id) + + #Override agent_id to a static value + self.agent_id = downstream_message_bus_def.id + '.context_manager' + + self.context = None + + self.registered_agents = {} + self.registered_agents[self.agent_id] = self.get_registration_details() + + def on_request(self, message_bus, headers: Dict, message): + + _log.debug(f"Received request: {message}") + + if message['request_type'] == 'get_context': + #TODO: check for initialization + reply_to = headers['reply-to'] + if self.context is None: + self.context = self.ot_connection.get_response(REQUEST_FIELD, message) + message_bus.send(reply_to,self.context) + + elif message['request_type'] == 'register_agent': + #TODO: check for initialization + self.ot_connection.send(t.REGISTER_AGENT_QUEUE, message) + self.registered_agents[message['agent'] + ['agent_id']] = message['agent'] + + elif message['request_type'] == 'get_agents': + #TODO: check for initialization + reply_to = headers['reply-to'] + message_bus.send(reply_to, self.registered_agents) + + elif message['request_type'] == 'is_initialized': + reply_to = headers['reply-to'] + message = {'initialized':True} + message_bus.send(reply_to, message) + + elif message['request_type'] == 'control_command': + simulation_id = message['input']['simulation_id'] + self.ot_connection.send(t.simulation_input_topic(simulation_id), + message) + + +class SecondaryAreaContextManager(SecondaryAreaAgent): + + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + secondary_area_dict: Dict = None, + simulation_id: str = None): + + self.ot_connection = GridAPPSD() + if secondary_area_dict is None: + request = {'request_type':'get_context', + 'areaId': downstream_message_bus_def.id} + secondary_area_dict = self.ot_connection.get_response( + REQUEST_FIELD, request, timeout=10)['data'] + + super().__init__(upstream_message_bus_def, downstream_message_bus_def, + agent_config, secondary_area_dict, simulation_id) + + #Override agent_id to a static value + self.agent_id = downstream_message_bus_def.id + '.context_manager' + + self.context = None + + self.registered_agents = {} + self.registered_agents[self.agent_id] = self.get_registration_details() + + def on_request(self, message_bus, headers: Dict, message): + + _log.debug(f"Received request: {message}") + _log.debug(f"Received request: {headers}") + + if message['request_type'] == 'get_context': + reply_to = headers['reply-to'] + if self.context is None: + self.context = self.ot_connection.get_response(REQUEST_FIELD, message) + message_bus.send(reply_to,self.context) + + + elif message['request_type'] == 'register_agent': + self.ot_connection.send(t.REGISTER_AGENT_QUEUE, message) + self.registered_agents[message['agent'] + ['agent_id']] = message['agent'] + + elif message['request_type'] == 'get_agents': + reply_to = headers['reply-to'] + message_bus.send(reply_to, self.registered_agents) + + elif message['request_type'] == 'is_initialized': + reply_to = headers['reply-to'] + message = {'initialized':True} + message_bus.send(reply_to, message) + + elif message['request_type'] == 'control_command': + simulation_id = message['input']['simulation_id'] + self.ot_connection.send(t.simulation_input_topic(simulation_id), + message) + +def get_MessageBusDefinition(area_id: str) -> MessageBusDefinition: + + connection_args = {"GRIDAPPSD_ADDRESS": os.environ.get('GRIDAPPSD_ADDRESS',"tcp://gridappsd:61613"), + "GRIDAPPSD_USER": os.environ.get('GRIDAPPSD_USER'), + "GRIDAPPSD_PASSWORD": os.environ.get('GRIDAPPSD_PASSWORD'), + "GRIDAPPSD_APPLICATION_ID": os.environ.get('GRIDAPPSD_APPLICATION_ID')} + + bus = MessageBusDefinition(id=area_id, + is_ot_bus= True, + connection_type = "GRIDAPPSD_TYPE_GRIDAPPSD", + conneciton_args = connection_args) + + return bus + +def _main(): + + time.sleep(10) + parser = argparse.ArgumentParser() + parser.add_argument( + "--simulation_id", + help= + "Simulation id to use for communicating with simulated devices on the message bus. \ + If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", + required=False) + opts = parser.parse_args() + simulation_id = opts.simulation_id + + agent_config = { + "app_id": + "context_manager", + "description": + "This agent provides topological context information like neighboring agents and devices to other distributed agents" + } + + + gapps = GridAPPSD() + response = gapps.get_response(t.PLATFORM_STATUS, {"isField":True}) + field_model_mrid = response['fieldModelMrid'] + + is_field_initialized = False + + while not is_field_initialized: + response = gapps.get_response(REQUEST_FIELD, {"request_type":"is_initilized"}) + print(response) + is_field_initialized = response['data']['initialized'] + time.sleep(1) + + system_message_bus_def = get_MessageBusDefinition(field_model_mrid) + feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) + + #TODO: create access control for agents for different layers + feeder_agent = FeederAreaContextManager(system_message_bus_def, + feeder_message_bus_def, + agent_config, + simulation_id=simulation_id) + + + + for switch_area in feeder_agent.agent_area_dict['switch_areas']: + switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['message_bus_id'])) + print("Creating switch area agent " + + str(switch_area['message_bus_id'])) + switch_area_agent = SwitchAreaContextManager( + feeder_message_bus_def, + switch_area_message_bus_def, + agent_config, + simulation_id=simulation_id) + + # create secondary area distributed agents + for secondary_area in switch_area['secondary_areas']: + secondary_area_message_bus_def = get_MessageBusDefinition(str(secondary_area['message_bus_id'])) + print("Creating secondary area agent " + + str(secondary_area['message_bus_id'])) + secondary_area_agent = SecondaryAreaContextManager( + switch_area_message_bus_def, + secondary_area_message_bus_def, + agent_config, + simulation_id=simulation_id) + + while True: + try: + time.sleep(0.1) + except KeyboardInterrupt: + print("Exiting sample") + break + + +if __name__ == "__main__": + _main() diff --git a/gridappsd-field-bus-lib/pyproject.toml b/gridappsd-field-bus-lib/pyproject.toml index 6da09ad..15eee6a 100644 --- a/gridappsd-field-bus-lib/pyproject.toml +++ b/gridappsd-field-bus-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gridappsd-field-bus" -version = "2023.5.2a0" +version = "2023.12.1" description = "GridAPPS-D Field Bus Implementation" authors = [ "C. Allwardt <3979063+craig8@users.noreply.github.com>", @@ -21,11 +21,15 @@ packages = [ { include = 'gridappsd'} ] +[tool.poetry.scripts] +# Add things in the form +# myscript = 'my_package:main' +context_manager = 'gridappsd.context_manager:main' [tool.poetry.dependencies] -python = ">=3.7.9,<4.0" -gridappsd-python = {path="../gridappsd-python-lib", develop=true} -cim-graph = "^2023.5.1a3" +python = ">=3.8.1,<4.0" +gridappsd-python = ">2023" +cim-graph = ">=0.1.1a0" [tool.poetry.group.dev.dependencies] pytest = "^6.2.2" @@ -37,3 +41,17 @@ yapf = "^0.32.0" [build-system] requires = ["poetry-core>=1.2.0"] build-backend = "poetry.core.masonry.api" + +[tool.yapfignore] +ignore_patterns = [ + ".venv/**", + ".pytest_cache/**", + "dist/**", + "docs/**" +] + +[tool.yapf] +based_on_style = "pep8" +spaces_before_comment = 4 +column_limit = 99 +split_before_logical_operator = true diff --git a/gridappsd-python-lib/pyproject.toml b/gridappsd-python-lib/pyproject.toml index 40cafad..249e986 100644 --- a/gridappsd-python-lib/pyproject.toml +++ b/gridappsd-python-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gridappsd-python" -version = "2023.5.2a0" +version = "2023.12.1" description = "A GridAPPS-D Python Adapter" authors = [ "C. Allwardt <3979063+craig8@users.noreply.github.com>", @@ -48,3 +48,17 @@ gitpython = "^3.1.31" [build-system] requires = ["poetry-core>=1.2.0"] build-backend = "poetry.core.masonry.api" + +[tool.yapfignore] +ignore_patterns = [ + ".venv/**", + ".pytest_cache/**", + "dist/**", + "docs/**" +] + +[tool.yapf] +based_on_style = "pep8" +spaces_before_comment = 4 +column_limit = 99 +split_before_logical_operator = true diff --git a/pyproject.toml b/pyproject.toml index 4c6c427..29c98e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gridappsd-python-workspace" -version = "2023.9.1" +version = "2023.12.1" description = "A GridAPPS-D Python Adapter" authors = [ "C. Allwardt <3979063+craig8@users.noreply.github.com>",