diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py index 873f2eb..d5041b0 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py @@ -10,6 +10,7 @@ from cimgraph.loaders.gridappsd import GridappsdConnection from cimgraph.models import DistributedModel, SecondaryArea, SwitchArea +from gridappsd import DifferenceBuilder import gridappsd.topics as t from gridappsd.field_interface.context import LocalContext from gridappsd.field_interface.gridappsd_field_bus import GridAPPSDMessageBus @@ -64,7 +65,11 @@ def __init__(self, self.description = agent_config['description'] dt = datetime.now() ts = datetime.timestamp(dt) - self.agent_id = "da_" + self.app_id + "_" + str(int(ts)) + if ('context_manager' not in self.app_id): + self.agent_id = "da_" + self.app_id + else: + self.agent_id = downstream_message_bus_def.id+'.context_manager' + self.agent_area_dict = agent_area_dict if upstream_message_bus_def is not None: @@ -87,7 +92,7 @@ def __init__(self, # self.addressable_equipments = agent_dict['addressable_equipment'] # self.unaddressable_equipments = agent_dict['unaddressable_equipment'] - def connect(self): + def _connect(self): if self.upstream_message_bus is not None: self.upstream_message_bus.connect() @@ -96,7 +101,10 @@ def connect(self): if self.downstream_message_bus is None and self.upstream_message_bus is None: raise ValueError( "Either upstream or downstream bus must be specified!") - + + if ('context_manager' not in self.app_id): + self.agent_id = "da_" + self.app_id + "_" + self.downstream_message_bus.id + if self.agent_area_dict is None: context = LocalContext.get_context_by_message_bus( self.downstream_message_bus) @@ -109,6 +117,13 @@ def connect(self): if ('context_manager' not in self.app_id): LocalContext.register_agent(self.downstream_message_bus, self.upstream_message_bus, self) + + def disconnect(self): + + if self.upstream_message_bus is not None: + self.upstream_message_bus.disconnect() + if self.downstream_message_bus is not None: + self.downstream_message_bus.disconnect() def subscribe_to_measurement(self): if self.simulation_id is None: @@ -125,10 +140,10 @@ def subscribe_to_measurement(self): def subscribe_to_messages(self): self.downstream_message_bus.subscribe( - t.field_message_bus_topic(self.downstream_message_bus), + t.field_message_bus_topic(self.downstream_message_bus.id), self.on_downstream_message) - self.downstream_message_bus.subscribe( - t.field_message_bus_topic(self.upstream_message_bus), + self.upstream_message_bus.subscribe( + t.field_message_bus_topic(self.upstream_message_bus.id), self.on_upstream_message) _log.debug( @@ -139,23 +154,25 @@ def subscribe_to_messages(self): t.field_message_bus_app_topic(self.downstream_message_bus.id, self.app_id), self.on_downstream_message) - self.downstream_message_bus.subscribe( + self.upstream_message_bus.subscribe( t.field_message_bus_app_topic(self.upstream_message_bus.id, self.app_id), self.on_upstream_message) - _log.debug( - f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ - \n {t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id)}" - ) - self.downstream_message_bus.subscribe( - t.field_message_bus_agent_topic(self.downstream_message_bus.id, - self.agent_id), - self.on_downstream_message) - self.downstream_message_bus.subscribe( - t.field_message_bus_agent_topic(self.upstream_message_bus.id, - self.agent_id), - self.on_upstream_message) + + if ('context_manager' not in self.app_id): + _log.debug( + f"Subscribing to message on agents topics: \n {t.field_message_bus_agent_topic(self.downstream_message_bus.id, self.agent_id)} \ + \n {t.field_message_bus_agent_topic(self.upstream_message_bus.id, self.agent_id)}" + ) + self.downstream_message_bus.subscribe( + t.field_message_bus_agent_topic(self.downstream_message_bus.id, + self.agent_id), + self.on_downstream_message) + self.upstream_message_bus.subscribe( + t.field_message_bus_agent_topic(self.upstream_message_bus.id, + self.agent_id), + self.on_upstream_message) def subscribe_to_requests(self): @@ -167,7 +184,7 @@ def subscribe_to_requests(self): t.field_agent_request_queue(self.downstream_message_bus.id, self.agent_id), self.on_request_from_downstream) - self.downstream_message_bus.subscribe( + self.upstream_message_bus.subscribe( t.field_agent_request_queue(self.upstream_message_bus.id, self.agent_id), self.on_request_from_uptream) @@ -205,22 +222,28 @@ def get_registration_details(self): return dataclasses.asdict(details) def publish_downstream(self, message): - self.downstream_message_bus.send(t.field_message_bus_topic(self.downstream_message_bus), message) + self.downstream_message_bus.send(t.field_message_bus_topic(self.downstream_message_bus.id), message) def publish_upstream(self, message): - self.downstream_message_bus.send(t.field_message_bus_topic(self.downstream_message_bus), message) + self.upstream_message_bus.send(t.field_message_bus_topic(self.upstream_message_bus.id), message) + def send_control_command(self, differenceBuilder : DifferenceBuilder): + if self.simulation_id is not None: + LocalContext.send_control_command(self.downstream_message_bus, differenceBuilder) + ''' + TODO This block needs to be tested with device interface + else: + self.downstream_message_bus.send(devie_interface_topic, differenceBuilder) + ''' + ''' TODO this has not been implemented yet, so we are commented them out for now. # not all agent would use this def on_control(self, control): device_id = control.get('device') command = control.get('command') self.control_device(device_id, command) - - def control_device(self, device_id, command): - device_topic = self.devices.get(device_id) - self.secondary_message_bus.publish(device_topic, command)''' +''' class FeederAgent(DistributedAgent): @@ -231,22 +254,15 @@ def __init__(self, agent_config: Dict, feeder_dict=None, simulation_id=None): - super(FeederAgent, - self).__init__(upstream_message_bus_def, + super().__init__(upstream_message_bus_def, downstream_message_bus_def, agent_config, feeder_dict, simulation_id) self.feeder_area = None self.downstream_message_bus_def = downstream_message_bus_def - if self.agent_area_dict is not None: - feeder = cim.Feeder(mRID=self.downstream_message_bus_def.id) - self.feeder_area = DistributedModel(connection=self.connection, - feeder=feeder, - topology=self.agent_area_dict) - - def connect(self): - super().connect() - if self.feeder_area is None: + self._connect() + + if self.agent_area_dict is not None: feeder = cim.Feeder(mRID=self.downstream_message_bus_def.id) self.feeder_area = DistributedModel(connection=self.connection, feeder=feeder, @@ -265,15 +281,10 @@ def __init__(self, agent_config, switch_area_dict, simulation_id) self.switch_area = None self.downstream_message_bus_def = downstream_message_bus_def - if self.agent_area_dict is not None: - self.switch_area = SwitchArea(self.downstream_message_bus_def.id, - self.connection) - self.switch_area.initialize_switch_area(self.agent_area_dict) - - def connect(self): - super().connect() - if self.switch_area is None: + self._connect() + + if self.agent_area_dict is not None: self.switch_area = SwitchArea(self.downstream_message_bus_def.id, self.connection) self.switch_area.initialize_switch_area(self.agent_area_dict) @@ -291,18 +302,17 @@ def __init__(self, agent_config, secondary_area_dict, simulation_id) self.secondary_area = None self.downstream_message_bus_def = downstream_message_bus_def - if self.agent_area_dict is not None: - self.secondary_area = SecondaryArea(self.downstream_message_bus_def.id, - self.connection) - self.secondary_area.initialize_secondary_area(self.agent_area_dict) - - def connect(self): - super().connect() - if self.secondary_area is None: + self._connect() + + if self.agent_area_dict is not None: + if len(self.agent_area_dict['addressable_equipment']) == 0: + _log.warn(f"No addressable equipment in the secondary area with down stream message bus id: {self.downstream_message_bus.id}.") + self.secondary_area = SecondaryArea(self.downstream_message_bus_def.id, self.connection) self.secondary_area.initialize_secondary_area(self.agent_area_dict) + class CoordinatingAgent: @@ -337,12 +347,11 @@ def __init__(self, # self.subscribe_to_feeder_bus() - def spawn_distributed_agent(self, distributed_agent: DistributedAgent): + +''' def spawn_distributed_agent(self, distributed_agent: DistributedAgent): distributed_agent.connect() self.distributed_agents.append(distributed_agent) - -''' def on_control(self, control): device_id = control.get('device') command = control.get('command') diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context.py index 35dec86..d29737f 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context.py @@ -1,3 +1,4 @@ +from gridappsd import DifferenceBuilder from gridappsd.field_interface.interfaces import FieldMessageBus import dataclasses import gridappsd.topics as t @@ -13,6 +14,7 @@ def get_context_by_feeder(cls, downstream_message_bus: FieldMessageBus, feeder_m request = {'request_type' : 'get_context', 'modelId': feeder_mrid, 'areaId': area_id} + print(t.context_request_queue(downstream_message_bus.id)) response = downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request, timeout=10) return response @@ -23,9 +25,9 @@ def get_context_by_message_bus(cls, downstream_message_bus: FieldMessageBus): """ request = {'request_type' : 'get_context', - 'downstream_message_bus_id': downstream_message_bus.id + 'areaId': downstream_message_bus.id } - return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request) + return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request, timeout=10) @classmethod def register_agent(cls, downstream_message_bus: FieldMessageBus, upstream_message_bus: FieldMessageBus, agent): @@ -46,6 +48,17 @@ def get_agents(cls, downstream_message_bus: FieldMessageBus): """ request = {'request_type' : 'get_agents'} return downstream_message_bus.get_response(t.context_request_queue(downstream_message_bus.id), request) + + @classmethod + def send_control_command(cls, downstream_message_bus: FieldMessageBus, difference_builder: DifferenceBuilder): + """ + Sends the control command to device + + """ + request = {'request_type' : 'control_command', + 'difference_builder': difference_builder.get_message()} + downstream_message_bus.send(t.context_request_queue(downstream_message_bus.id), + request) # Provide context based on router (ip trace) or PKI # Maybe able to emulate/simulate diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/gridappsd_field_bus.py b/gridappsd-field-bus-lib/gridappsd/field_interface/gridappsd_field_bus.py index 8cb1fa5..ea9cfda 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/gridappsd_field_bus.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/gridappsd_field_bus.py @@ -53,6 +53,6 @@ def get_response(self, topic, message, timeout=5): def disconnect(self): """ - Disconnect the device from the concrete message bus. + Disconnect from the concrete message bus. """ - pass + self.gridappsd_obj.disconnect() diff --git a/gridappsd-python-lib/gridappsd/topics.py b/gridappsd-python-lib/gridappsd/topics.py index 87bf567..2934abe 100644 --- a/gridappsd-python-lib/gridappsd/topics.py +++ b/gridappsd-python-lib/gridappsd/topics.py @@ -184,8 +184,8 @@ def field_message_bus_topic(message_bus_id:str, app_id: str=None, agent_id: str= :return: """ assert message_bus_id, "message_bus_id cannot be empty" - - return f"{BASE_FIELD_TOPIC}.{message_bus_id}.{app_id}.{agent_id}" + + return f"{BASE_FIELD_TOPIC}.{message_bus_id}" def field_message_bus_app_topic(message_bus_id, app_id=None): diff --git a/pyproject.toml b/pyproject.toml index f8a2099..adc02bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gridappsd-python-workspace" -version = "2023.5.2a0" +version = "2023.9.0" description = "A GridAPPS-D Python Adapter" authors = [ "C. Allwardt <3979063+craig8@users.noreply.github.com>",