diff --git a/.github/workflows/deploy-dev-release.yml b/.github/workflows/deploy-dev-release.yml deleted file mode 100644 index 66ae989..0000000 --- a/.github/workflows/deploy-dev-release.yml +++ /dev/null @@ -1,163 +0,0 @@ -######################################################################################################## -# Deploys a develop release and commits pyproject.toml files with new versions. -# -# When will it run: -# This workflow will only run on push to develop when there is a python file change. -# -# How will it update: -# The script should update based upon the current version in the pyproject.toml file. The workflow -# should run poetry version prerelease to create a new version file. Since this -# repository houses multiple wheels we will be using the ./scripts/runoneach.sh to run this command. -######################################################################################################## -name: Deploy Dev Release Artifacts - -on: - push: - branches: - - develop - workflow_dispatch: - inputs: - upgrade-version: - description: "Upgrade version number regardless of whether a py file is altered." - required: false - default: false - type: boolean - -defaults: - run: - shell: bash - -env: - LANG: en_US.utf-8 - LC_ALL: en_US.utf-8 - PYTHON_VERSION: "3.10" - -jobs: - deploy-dev-release: - runs-on: ubuntu-22.04 - permissions: - contents: write # To push a branch - pull-requests: write # To create a PR from that branch - steps: - - run: echo "🎉 The job was automatically triggered by a ${{ github.event_name }} event." - - run: echo "🐧 This job is now running on a ${{ runner.os }} server hosted by GitHub!" - - run: echo "🔎 The name of your branch is ${{ github.ref }} and your repository is ${{ github.repository }}." - - run: echo "The specified version is ${{ inputs.release-version }}." - - run: | - set -x - set -u - set -e - #---------------------------------------------- - # check-out repo and set-up python - #---------------------------------------------- - - name: Checkout code - uses: actions/checkout@v4 - with: - fetch-depth: 0 - # ref: develop - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Get changed files - id: changed-files - uses: tj-actions/changed-files@v45 - # To compare changes between the current commit and the last pushed remote commit set `since_last_remote_commit: true`. e.g - # with: - # since_last_remote_commit: true - with: - files: | - **.py - - - name: Should change version - id: should_update_version - env: - PYTHON_FILES_CHANGED: ${{ steps.changed-files.outputs.all_changed_files }} - run: | - should_update=false - for file in ${PYTHON_FILES_CHANGED}; do - should_update=true - break - done - - # handle input required update. - if ${{ inputs.upgrade-version }} ; then - should_update=true - fi - - echo "Should update == ${should_update}" - echo "value=${should_update}" >> "$GITHUB_OUTPUT" - - - name: Set up Python ${{ env.PYTHON_VERSION }} - if: ${{ steps.should_update_version.outputs.value == 'true' }} - id: setup-python - uses: actions/setup-python@v5 - with: - python-version: ${{ env.PYTHON_VERSION }} - - #---------------------------------------------- - # ----- install & configure poetry ----- - #---------------------------------------------- - - name: Install Poetry - if: ${{ steps.should_update_version.outputs.value == 'true' }} - uses: snok/install-poetry@v1 - with: - virtualenvs-create: true - virtualenvs-in-project: true - installer-parallel: true - - - name: Update version and commit pyproject.toml file - if: ${{ steps.should_update_version.outputs.value == 'true' }} - run: | - ./scripts/run_on_each.sh poetry version prerelease - - git config --global user.name 'Commit Bot' - git config --global user.email '3979063+craig8@users.noreply.github.com' - git add **/pyproject.toml - git commit -m "Auto Update Version Number" - git push - - - name: Install library - if: ${{ steps.should_update_version.outputs.value == 'true' }} - run: | - ./scripts/poetry_install.sh - - - name: Create build artifacts - id: version - if: ${{ steps.should_update_version.outputs.value == 'true' }} - run: | - # set the right version in pyproject.toml before build and publish - ./scripts/poetry_build.sh - - echo "value=v$(poetry version --short)" >> "$GITHUB_OUTPUT" - - # TODO: Check for pipy token and only release to github if we have it. - - - name: Push artifacts to github - env: - TAG: ${{ steps.version.outputs.value }} - if: ${{ steps.should_update_version.outputs.value == 'true' }} - uses: ncipollo/release-action@v1 - with: - artifacts: "dist/*.gz,dist/*.whl" - artifactErrorsFailBuild: true - generateReleaseNotes: true - commit: ${{ github.ref }} - # check bump-rule and set accordingly - prerelease: true - tag: ${{ env.TAG }} - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Publish to pypi - id: publish-to-pypi - if: ${{ steps.should_update_version.outputs.value == 'true' }} - run: | - - # This is needed, because the poetry publish will fail at the top level of the project - # so ./scripts/run_on_each.sh fails for that. - echo "POETRY_PUBLISH_OPTIONS=''" >> $GITHUB_ENV - cd gridappsd-python-lib - poetry config pypi-token.pypi ${{ secrets.PYPI_TOKEN }} - poetry publish - - cd ../gridappsd-field-bus-lib - poetry config pypi-token.pypi ${{ secrets.PYPI_TOKEN }} - poetry publish diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py index 92f6359..b7a0bdd 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/__init__.py @@ -2,6 +2,6 @@ from gridappsd.field_interface.agents.agents import (FeederAgent, DistributedAgent, CoordinatingAgent, SwitchAreaAgent, - SecondaryAreaAgent) + SecondaryAreaAgent, SubstationAgent) __all__: List[str] = ["FeederAgent", "DistributedAgent", "CoordinatingAgent"] diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py index 01a7aaf..f6b9edf 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/agents/agents.py @@ -233,6 +233,27 @@ def on_control(self, control): self.control_device(device_id, command) ''' +class SubstationAgent(DistributedAgent): + + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + substation_dict=None, + simulation_id=None): + super().__init__(upstream_message_bus_def, downstream_message_bus_def, agent_config, + substation_dict, simulation_id) + self.substation_area = None + self.downstream_message_bus_def = downstream_message_bus_def + + self._connect() + + if self.agent_area_dict is not None: + substation = cim.Substation(mRID=self.downstream_message_bus_def.id) + self.substation_area = DistributedArea(connection=self.connection, + container=substation, + distributed=True) + self.substation_area.build_from_topo_message(topology_dict=self.agent_area_dict) class FeederAgent(DistributedAgent): @@ -250,12 +271,11 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - feeder = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + feeder = cim.FeederArea(mRID=self.downstream_message_bus_def.id) self.feeder_area = DistributedArea(connection=self.connection, container=feeder, distributed=True) - self.feeder_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.feeder_area.build_from_topo_message(topology_dict=self.agent_area_dict) class SwitchAreaAgent(DistributedAgent): @@ -274,12 +294,11 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - container = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + container = cim.SwitchArea(mRID=self.downstream_message_bus_def.id) self.switch_area = DistributedArea(container=container, connection=self.connection, distributed=True) - self.switch_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.switch_area.build_from_topo_message(topology_dict=self.agent_area_dict) class SecondaryAreaAgent(DistributedAgent): @@ -298,16 +317,15 @@ def __init__(self, self._connect() if self.agent_area_dict is not None: - if len(self.agent_area_dict['addressable_equipment']) == 0: + if len(self.agent_area_dict['AddressableEquipment']) == 0: _log.warning( f"No addressable equipment in the secondary area with down stream message bus id: {self.downstream_message_bus.id}." ) - container = cim.EquipmentContainer(mRID=self.downstream_message_bus_def.id) + container = cim.SecondaryArea(mRID=self.downstream_message_bus_def.id) self.secondary_area = DistributedArea(container=container, connection=self.connection, distributed=True) - self.secondary_area.build_from_topo_message(topology_dict=self.agent_area_dict, - centralized_graph=None) + self.secondary_area.build_from_topo_message(topology_dict=self.agent_area_dict) class CoordinatingAgent: diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py new file mode 100644 index 0000000..1105445 --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/__init__.py @@ -0,0 +1,9 @@ +from typing import List + +from gridappsd.field_interface.context_managers.context_manager_agents import (SubstationAreaContextManager, + FeederAreaContextManager, + SwitchAreaContextManager, + SecondaryAreaContextManager) + + +__all__: List[str] = ["SubstationAreaContextManager","FeederAreaContextManager","SwitchAreaContextManager","SecondaryAreaContextManager"] diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py new file mode 100644 index 0000000..3451448 --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/centralized_context_managers.py @@ -0,0 +1,99 @@ +import argparse +import logging +import json +import os +import time +from typing import Dict + + +from cimgraph.data_profile import CIM_PROFILE +from gridappsd import GridAPPSD +import gridappsd.topics as t +import gridappsd.field_interface.agents.agents as agents_mod +from gridappsd.field_interface.context_managers.utils import REQUEST_FIELD, get_MessageBusDefinition +from gridappsd.field_interface.context_managers.context_manager_agents import FeederAreaContextManager, SwitchAreaContextManager, SecondaryAreaContextManager + +cim_profile = CIM_PROFILE.CIMHUB_2023.value +agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) +cim = agents_mod.cim + +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('goss').setLevel(logging.ERROR) +logging.getLogger('stomp.py').setLevel(logging.ERROR) + +_log = logging.getLogger(__name__) + +def _main(): + + time.sleep(10) + parser = argparse.ArgumentParser() + parser.add_argument( + "--simulation_id", + help="Simulation id to use for communicating with simulated devices on the message bus. \ + If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", + required=False) + opts = parser.parse_args() + simulation_id = opts.simulation_id + + agent_config = { + "app_id": + "context_manager", + "description": + "This agent provides topological context information like neighboring agents and devices to other distributed agents" + } + + gapps = GridAPPSD() + response = gapps.get_response(t.PLATFORM_STATUS, {"isField": True}) + field_model_mrid = response['fieldModelMrid'] + + is_field_initialized = False + + while not is_field_initialized: + response = gapps.get_response(REQUEST_FIELD, {"request_type": "is_initilized"}) + print(response) + is_field_initialized = response['data']['initialized'] + time.sleep(1) + + + field_model_mrid = "49AD8E07-3BF9-A4E2-CB8F-C3722F837B62" + + system_message_bus_def = get_MessageBusDefinition(field_model_mrid) + feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) + + #TODO: create access control for agents for different layers + feeder_agent = FeederAreaContextManager(system_message_bus_def, + feeder_message_bus_def, + agent_config, + simulation_id=simulation_id) + + #print(feeder_agent.agent_area_dict) + for switch_area in feeder_agent.agent_area_dict['SwitchAreas']: + switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['@id'])) + print("Creating switch area agent " + str(switch_area['@id'])) + switch_area_agent = SwitchAreaContextManager(feeder_message_bus_def, + switch_area_message_bus_def, + agent_config, + simulation_id=simulation_id, + switch_area_dict=switch_area) + + # create secondary area distributed agents + for secondary_area in switch_area['SecondaryAreas']: + secondary_area_message_bus_def = get_MessageBusDefinition( + str(secondary_area['@id'])) + print("Creating secondary area agent " + str(secondary_area['@id'])) + secondary_area_agent = SecondaryAreaContextManager(switch_area_message_bus_def, + secondary_area_message_bus_def, + agent_config, + simulation_id=simulation_id, + secondary_area_dict=secondary_area) + + while True: + try: + time.sleep(0.1) + except KeyboardInterrupt: + print("Exiting sample") + break + + +if __name__ == "__main__": + _main() diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/context_manager_agents.py similarity index 66% rename from gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py rename to gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/context_manager_agents.py index 6ad6997..6371115 100644 --- a/gridappsd-field-bus-lib/gridappsd/field_interface/context_manager.py +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/context_manager_agents.py @@ -1,21 +1,12 @@ -import argparse import logging -import os import time from typing import Dict -import gridappsd.field_interface.agents.agents as agents_mod import gridappsd.topics as t -from cimgraph.data_profile import CIM_PROFILE from gridappsd import GridAPPSD -from gridappsd.field_interface.agents import (FeederAgent, SecondaryAreaAgent, SwitchAreaAgent) +from gridappsd.field_interface.agents import (SubstationAgent, FeederAgent, SecondaryAreaAgent, SwitchAreaAgent) from gridappsd.field_interface.interfaces import MessageBusDefinition - -cim_profile = CIM_PROFILE.RC4_2021.value - -agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=7) - -cim = agents_mod.cim +from gridappsd.field_interface.context_managers.utils import REQUEST_FIELD logging.basicConfig(level=logging.DEBUG) logging.getLogger('goss').setLevel(logging.ERROR) @@ -23,8 +14,42 @@ _log = logging.getLogger(__name__) -#FieldBusManager's request topics. To be used only by context manager user role only. -REQUEST_FIELD = ".".join((t.PROCESS_PREFIX, "request.field")) +class SubstationAreaContextManager(SubstationAgent): + + def __init__(self, + upstream_message_bus_def: MessageBusDefinition, + downstream_message_bus_def: MessageBusDefinition, + agent_config: Dict, + substation_dict: Dict = None, + simulation_id: str = None): + + self.ot_connection = GridAPPSD() + if substation_dict is None: + request = {'request_type': 'get_context', 'modelId': downstream_message_bus_def.id} + substation_dict = None + while substation_dict is None: + self.ot_connection.get_logger().debug(f"Requesting topology for {self.__class__}") + response = self.ot_connection.get_response(REQUEST_FIELD, request, timeout=10) + if 'DistributionArea' in response: + substation_dict = response['DistributionArea']['Substation']['@id'] + self.ot_connection.get_logger().debug("Topology received at Substation Area Context Manager") + else: + time.sleep(5) + super().__init__(upstream_message_bus_def, downstream_message_bus_def, agent_config, + substation_dict, simulation_id) + + #Override agent_id to a static value + self.agent_id = downstream_message_bus_def.id + '.context_manager' + + self.context = {'data':substation_dict} + + self.registered_agents = {} + self.registered_agents[self.agent_id] = self.get_registration_details() + + self.neighbouring_agents = {} + self.upstream_agents = {} + self.downstream_agents = {} + self.ot_connection.get_logger().info("Substation Area Context Manager Created") class FeederAreaContextManager(FeederAgent): @@ -205,90 +230,3 @@ def on_request(self, message_bus, headers: Dict, message): elif message['request_type'] == 'control_command': simulation_id = message['input']['simulation_id'] self.ot_connection.send(t.simulation_input_topic(simulation_id), message) - - -def get_MessageBusDefinition(area_id: str) -> MessageBusDefinition: - - connection_args = { - "GRIDAPPSD_ADDRESS": os.environ.get('GRIDAPPSD_ADDRESS', "tcp://gridappsd:61613"), - "GRIDAPPSD_USER": os.environ.get('GRIDAPPSD_USER'), - "GRIDAPPSD_PASSWORD": os.environ.get('GRIDAPPSD_PASSWORD'), - "GRIDAPPSD_APPLICATION_ID": os.environ.get('GRIDAPPSD_APPLICATION_ID') - } - - bus = MessageBusDefinition(id=area_id, - is_ot_bus=True, - connection_type="GRIDAPPSD_TYPE_GRIDAPPSD", - conneciton_args=connection_args) - - return bus - - -def _main(): - - time.sleep(10) - parser = argparse.ArgumentParser() - parser.add_argument( - "--simulation_id", - help="Simulation id to use for communicating with simulated devices on the message bus. \ - If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", - required=False) - opts = parser.parse_args() - simulation_id = opts.simulation_id - - agent_config = { - "app_id": - "context_manager", - "description": - "This agent provides topological context information like neighboring agents and devices to other distributed agents" - } - - gapps = GridAPPSD() - response = gapps.get_response(t.PLATFORM_STATUS, {"isField": True}) - field_model_mrid = response['fieldModelMrid'] - - is_field_initialized = False - - while not is_field_initialized: - response = gapps.get_response(REQUEST_FIELD, {"request_type": "is_initilized"}) - print(response) - is_field_initialized = response['data']['initialized'] - time.sleep(1) - - system_message_bus_def = get_MessageBusDefinition(field_model_mrid) - feeder_message_bus_def = get_MessageBusDefinition(field_model_mrid) - - #TODO: create access control for agents for different layers - feeder_agent = FeederAreaContextManager(system_message_bus_def, - feeder_message_bus_def, - agent_config, - simulation_id=simulation_id) - - for switch_area in feeder_agent.agent_area_dict['switch_areas']: - switch_area_message_bus_def = get_MessageBusDefinition(str(switch_area['message_bus_id'])) - print("Creating switch area agent " + str(switch_area['message_bus_id'])) - switch_area_agent = SwitchAreaContextManager(feeder_message_bus_def, - switch_area_message_bus_def, - agent_config, - simulation_id=simulation_id) - - # create secondary area distributed agents - for secondary_area in switch_area['secondary_areas']: - secondary_area_message_bus_def = get_MessageBusDefinition( - str(secondary_area['message_bus_id'])) - print("Creating secondary area agent " + str(secondary_area['message_bus_id'])) - secondary_area_agent = SecondaryAreaContextManager(switch_area_message_bus_def, - secondary_area_message_bus_def, - agent_config, - simulation_id=simulation_id) - - while True: - try: - time.sleep(0.1) - except KeyboardInterrupt: - print("Exiting sample") - break - - -if __name__ == "__main__": - _main() diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py new file mode 100644 index 0000000..ea3b4c0 --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/substation.py @@ -0,0 +1,85 @@ +import argparse +import json +import logging +import time + +from cimgraph.data_profile import CIM_PROFILE + +import gridappsd.field_interface.agents.agents as agents_mod +from gridappsd.field_interface.interfaces import MessageBusDefinition +from gridappsd.field_interface.context_managers.context_manager_agents import SubstationAreaContextManager + + +cim_profile = "cimhub_2023" +agents_mod.set_cim_profile(cim_profile=cim_profile, iec61970_301=8) +cim = agents_mod.cim + +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('goss').setLevel(logging.ERROR) +logging.getLogger('stomp.py').setLevel(logging.ERROR) + +_log = logging.getLogger(__name__) + +def _main(): + + parser = argparse.ArgumentParser() + parser.add_argument( + "--simulation_id", + help="Simulation id to use for communicating with simulated devices on the message bus. \ + If simulation_id is not provided then Context Manager assumes to run on deployed field with real devices.", + required=False) + parser.add_argument( + "-u", + "--upstream_system_message_bus", + help="Yaml file to connect with upstream system(OT) message bus.", + required=True) + + parser.add_argument( + "-d", + "--downstream_substation_message_bus", + help="Yaml file to connect with downstream substation area message bus.", + type=str, + required=True) + + parser.add_argument( + "--substation_dict", + help="JSON file containing substation topology dictionary. If this file is not provided then disctionary is requested by Field Bus Manager using upstream message bus.", + type=str, + required=False) + + opts = parser.parse_args() + simulation_id = opts.simulation_id + + agent_config = { + "app_id": + "context_manager", + "description": + "This agent provides topological context information like neighboring agents and devices to other distributed agents" + } + + + system_message_bus_def = MessageBusDefinition.load(opts.upstream_system_message_bus) + substation_message_bus_def = MessageBusDefinition.load(opts.downstream_substation_message_bus) + + with open(opts.substation_dict,encoding="utf-8") as f: + substation_dict = json.load(f)["DistributionArea"]["Substations"][0] + + + substation_agent = SubstationAreaContextManager(system_message_bus_def, + substation_message_bus_def, + agent_config, + substation_dict = substation_dict, + simulation_id=simulation_id) + + print(substation_agent.context) + + while True: + try: + time.sleep(0.1) + except KeyboardInterrupt: + print("Exiting sample") + break + + +if __name__ == "__main__": + _main() diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py new file mode 100644 index 0000000..d3791d6 --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/context_managers/utils.py @@ -0,0 +1,23 @@ +import os + +import gridappsd.topics as t +from gridappsd.field_interface.interfaces import MessageBusDefinition + +#FieldBusManager's request topics. To be used only by context manager user role only. +REQUEST_FIELD = ".".join((t.PROCESS_PREFIX, "request.field")) + +def get_message_bus_definition(area_id: str) -> MessageBusDefinition: + + connection_args = { + "GRIDAPPSD_ADDRESS": os.environ.get('GRIDAPPSD_ADDRESS', "tcp://gridappsd:61613"), + "GRIDAPPSD_USER": os.environ.get('GRIDAPPSD_USER'), + "GRIDAPPSD_PASSWORD": os.environ.get('GRIDAPPSD_PASSWORD'), + "GRIDAPPSD_APPLICATION_ID": os.environ.get('GRIDAPPSD_APPLICATION_ID') + } + + bus = MessageBusDefinition(id=area_id, + is_ot_bus=True, + connection_type="GRIDAPPSD_TYPE_GRIDAPPSD", + conneciton_args=connection_args) + + return bus diff --git a/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py new file mode 100644 index 0000000..05628cf --- /dev/null +++ b/gridappsd-field-bus-lib/gridappsd/field_interface/field_proxy_forwarder.py @@ -0,0 +1,97 @@ +import stomp +import json +import time +from typing import Callable, Dict +from gridappsd import GridAPPSD +from gridappsd import topics + +REQUEST_FIELD = ".".join((topics.PROCESS_PREFIX, "request.field")) + +class FieldListener(): + + def __init__(self, ot_connection: GridAPPSD, proxy_connection: stomp.Connection): + self.ot_connection = ot_connection + self.proxy_connection = proxy_connection + + def on_message(self, headers, message): + "Receives messages coming from Proxy bus (e.g. ARTEMIS) and forwards to OT bus" + try: + print(f"Received message at Proxy: {message}") + + if headers["destination"] == topics.field_output_topic(): + self.ot_connection.send(topics.field_output_topic(), message) + + elif headers["destination"] == topics.field_input_topic(): + pass + + elif headers["destination"] == REQUEST_FIELD: + request_data = json.loads(message) + request_type = request_data.get("request_type") + if request_type == "get_context": + response = self.ot_connection.get_response(headers["destination"],message) + self.proxy_connection.send(headers["reply_to"],response) + + else: + print(f"Unrecognized message received by Proxy: {message}") + + except Exception as e: + print(f"Error processing message: {e}") + +class FieldProxyForwarder(): + """ + FieldProxyForwarder acts as a bridge between field bus and OT bus + when direct connection is not possible. + """ + + def __init__(self, connection_url: str, username: str, password: str): + + #Connect to OT + self.ot_connection = GridAPPSD() + + #Connect to proxy + self.broker_url = connection_url + self.username = username + self.password = password + self.proxy_connection = stomp.Connection([(self.broker_url.split(":")[0], int(self.broker_url.split(":")[1]))],keepalive=True) + self.proxy_connection.set_listener('', FieldListener(self.ot_connection, self.proxy_connection)) + self.proxy_connection.connect(self.username, self.password, wait=True) + print('Connected to Proxy') + + + + #Subscribe to messages from field + self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto") + + #Subscribe to messages on OT bus + self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot) + + def on_message_from_ot(self, headers, message): + "Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus" + try: + print(f"Received message from OT: {message}") + + if headers["destination"] == topics.field_input_topic(): + self.proxy_connection.send(topics.field_input_topic(), message) + + else: + print(f"Unrecognized message received by OT: {message}") + + except Exception as e: + print(f"Error processing message: {e}") + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(prog="TestForwarder") + parser.add_argument("username") + parser.add_argument("passwd") + parser.add_argument("connection_url") + opts = parser.parse_args() + proxy_connection_url = opts.connection_url + proxy_username = opts.username + proxy_password = opts.passwd + + proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) + + while True: + time.sleep(0.1)