-
Notifications
You must be signed in to change notification settings - Fork 113
Description
Discussed in #458
Originally posted by dakotahorstman May 23, 2025
Hello,
I am attempting to receive data change notifications of an array of structures. I've successfully implemented this using some nifty interpretation of the incoming data, but I was wondering if there was a native way to do this with a function or implementation via the library? I took a look at the docs and while I saw the TowerEvent example under the connections page, I found documentation of this rather lacking.
Here's my code:
import time
from ctypes import c_ubyte
import pyads
from loguru import logger
ST_testStruct = (
("bFirstBool", pyads.PLCTYPE_BOOL, 1),
("nSecondInt", pyads.PLCTYPE_INT, 1),
("fFirstReal", pyads.PLCTYPE_REAL, 1),
("eFirstEnum", pyads.PLCTYPE_INT, 1),
)
array_len = 16
struct_size = pyads.size_of_structure(ST_testStruct)
total_size = struct_size * array_len
with logger.catch():
# create some constants for connection
CLIENT_NETID = "100.64.10.2.1.1"
CLIENT_IP = "100.64.10.2"
TARGET_IP = "100.64.50.2"
TARGET_USERNAME = ""
TARGET_PASSWORD = ""
# add a new route to the target plc
pyads.open_port()
pyads.set_local_address(CLIENT_NETID)
pyads.add_route_to_plc(
CLIENT_NETID,
CLIENT_IP,
TARGET_IP,
TARGET_USERNAME,
TARGET_PASSWORD,
)
pyads.close_port()
# connect to plc and open connection
# route is added automatically to client on Linux, on Windows use the TwinCAT router
with pyads.Connection("100.64.50.2.1.1", pyads.PORT_TC3PLC1) as plc:
print(plc.get_local_address())
# check the connection state
plc.read_state()
size_of_struct = pyads.size_of_structure(ST_testStruct)
@logger.catch
@plc.notification(c_ubyte * total_size)
def callback(handle, name, timestamp, value):
# Parse each structure from the byte array
for i in range(array_len):
start = i * struct_size
end = start + struct_size
struct_bytes = value[start:end]
parsed = pyads.dict_from_bytes(struct_bytes, ST_testStruct)
print(f"[{i}] {parsed}")
attr2 = pyads.NotificationAttrib(total_size)
handles = plc.add_device_notification("GVL.aArrayofStruct", attr2, callback)
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("CTRL + C")
plc.del_device_notification(handles)And example output:
[0] OrderedDict({'bFirstBool': True, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[1] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[2] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[3] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[4] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[5] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[6] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 1})
[7] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[8] OrderedDict({'bFirstBool': True, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[9] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[10] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[11] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[12] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[13] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[14] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})
[15] OrderedDict({'bFirstBool': False, 'nSecondInt': 0, 'fFirstReal': 0.0, 'eFirstEnum': 0})(Also, yes, the eFirstEnum is an enum in TC3. Seems to work just fine!)
If there isn't a native solution, I thought I'd provide this anyway to maybe kick start a PR. I'd love to myself, but I do not know enough about the library or ADS protocol in general to ensure proper implementation.
Thank you!