From c87cf2f62f0bba6bf518b073a2e81b60ad578949 Mon Sep 17 00:00:00 2001 From: Saksham Gera Date: Wed, 11 Jun 2025 16:21:17 +0530 Subject: [PATCH 1/2] Added A Functionality to see the ZMQ edge results also even if it is not communicating through file based communication --- .../concore2.py | 0 0mq/test0mq4.graphml | 152 ++++++++++++++++-- concore.py | 2 +- 3 files changed, 143 insertions(+), 11 deletions(-) rename 0mq/{funbody_zmq.dir copy => funbody_zmq.dir}/concore2.py (100%) diff --git a/0mq/funbody_zmq.dir copy/concore2.py b/0mq/funbody_zmq.dir/concore2.py similarity index 100% rename from 0mq/funbody_zmq.dir copy/concore2.py rename to 0mq/funbody_zmq.dir/concore2.py diff --git a/0mq/test0mq4.graphml b/0mq/test0mq4.graphml index ef6b503..08ada0f 100644 --- a/0mq/test0mq4.graphml +++ b/0mq/test0mq4.graphml @@ -2,11 +2,11 @@ - + - + PZ:pmpymax.py @@ -17,7 +17,7 @@ - + CZ:cpymax.py @@ -28,7 +28,7 @@ - + F1:funcall_zmq.py @@ -39,7 +39,7 @@ - + F2:funbody_zmq.py @@ -54,7 +54,7 @@ U - + @@ -66,7 +66,7 @@ Y - + @@ -78,7 +78,7 @@ U2 - + @@ -90,7 +90,7 @@ Y2 - + @@ -102,7 +102,7 @@ 0x1234_U3 - + @@ -599,5 +599,137 @@ 6455bbb0a3114ec3ba44920b64bd0266 + + 1749634829919 + + SET_POS + WyJkOTVmNzg4Ny00MmJmLTRlMTItYTAyMi0wMjE4Njk3MDFjNmEiLHsieCI6MjYwLjIyMzc5NjQ1MDU3NjIsInkiOjMxMS4zMjAxMjcwNjgyNTY0fSx7IngiOjI2MS4yMjM3OTY0NTA1NzYyLCJ5IjozODguMzIwMTI3MDY4MjU2NH1d + + + SET_POS + WyJkOTVmNzg4Ny00MmJmLTRlMTItYTAyMi0wMjE4Njk3MDFjNmEiLHsieCI6MjYxLjIyMzc5NjQ1MDU3NjIsInkiOjM4OC4zMjAxMjcwNjgyNTY0fSx7IngiOjI2MC4yMjM3OTY0NTA1NzYyLCJ5IjozMTEuMzIwMTI3MDY4MjU2NH1d + + 6db1c3f5876e499e3af23bc64c8c5bb0 + + + 1749634829941 + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjAzLCJ5IjozMDYuNzczNjIyODI2MjMzMX0seyJ4Ijo2MDQsInkiOjM4My43NzM2MjI4MjYyMzMxfV0= + + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjA0LCJ5IjozODMuNzczNjIyODI2MjMzMX0seyJ4Ijo2MDMsInkiOjMwNi43NzM2MjI4MjYyMzMxfV0= + + cbf7736ec743327ba43b837143224398 + + + 1749634835985 + + SET_POS + WyJkOTVmNzg4Ny00MmJmLTRlMTItYTAyMi0wMjE4Njk3MDFjNmEiLHsieCI6MjYxLjIyMzc5NjQ1MDU3NjIsInkiOjM4OC4zMjAxMjcwNjgyNTY0fSx7IngiOjMzNS4yMjM3OTY0NTA1NzYyLCJ5IjozODAuMzIwMTI3MDY4MjU2NH1d + + + SET_POS + WyJkOTVmNzg4Ny00MmJmLTRlMTItYTAyMi0wMjE4Njk3MDFjNmEiLHsieCI6MzM1LjIyMzc5NjQ1MDU3NjIsInkiOjM4MC4zMjAxMjcwNjgyNTY0fSx7IngiOjI2MS4yMjM3OTY0NTA1NzYyLCJ5IjozODguMzIwMTI3MDY4MjU2NH1d + + bcd1d06be8ec425b80883b7e8ed78ebb + + + 1749634843769 + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjA0LCJ5IjozODMuNzczNjIyODI2MjMzMX0seyJ4Ijo2MDUsInkiOjM3NS43NzM2MjI4MjYyMzMxfV0= + + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjA1LCJ5IjozNzUuNzczNjIyODI2MjMzMX0seyJ4Ijo2MDQsInkiOjM4My43NzM2MjI4MjYyMzMxfV0= + + d22a89c1436e739adc827b2c9c5ed9fb + + + 1749638394047 + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjA1LCJ5IjozNzUuNzczNjIyODI2MjMzMX0seyJ4Ijo2MDYuMTA2NjIzNzgzOTc3NiwieSI6Mzg0LjYyNjYxMzA5ODA1NDQ2fV0= + + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjA2LjEwNjYyMzc4Mzk3NzYsInkiOjM4NC42MjY2MTMwOTgwNTQ0Nn0seyJ4Ijo2MDUsInkiOjM3NS43NzM2MjI4MjYyMzMxfV0= + + 98a2cdaa9c637e3b1f80e191689ef228 + + + 1749638406118 + + SET_POS + WyJkOTVmNzg4Ny00MmJmLTRlMTItYTAyMi0wMjE4Njk3MDFjNmEiLHsieCI6MzM1LjIyMzc5NjQ1MDU3NjIsInkiOjM4MC4zMjAxMjcwNjgyNTY0fSx7IngiOjM1NS4xNDMwMjQ1NjIxNzQyLCJ5IjozODEuNDI2NzUwODUyMjM0fV0= + + + SET_POS + WyJkOTVmNzg4Ny00MmJmLTRlMTItYTAyMi0wMjE4Njk3MDFjNmEiLHsieCI6MzU1LjE0MzAyNDU2MjE3NDIsInkiOjM4MS40MjY3NTA4NTIyMzR9LHsieCI6MzM1LjIyMzc5NjQ1MDU3NjIsInkiOjM4MC4zMjAxMjcwNjgyNTY0fV0= + + 461addecc2f83c39d144dd59fe140d7a + + + 1749638411387 + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjA2LjEwNjYyMzc4Mzk3NzYsInkiOjM4NC42MjY2MTMwOTgwNTQ0Nn0seyJ4Ijo2MDEuNjgwMTI4NjQ4MDY3LCJ5IjozODIuNDEzMzY1NTMwMDk5MX1d + + + SET_POS + WyJmNTZmOGIyNC00MWE2LTQ1YjQtODgyYi02NTk2MDQwYjZhZjAiLHsieCI6NjAxLjY4MDEyODY0ODA2NywieSI6MzgyLjQxMzM2NTUzMDA5OTF9LHsieCI6NjA2LjEwNjYyMzc4Mzk3NzYsInkiOjM4NC42MjY2MTMwOTgwNTQ0Nn1d + + a65da086d27f4b89fde06386803b6c83 + + + 1749638417166 + + SET_POS + WyJiZDkyZjlkOC03ZDYzLTQyYjItOTY4Yi0zOTA5ZWY0YzcyMzciLHsieCI6NTg3LjE1NzgxMTg0NjY1NTYsInkiOjEwMC40NDIzMjkyNjUzNzgzNn0seyJ4Ijo1OTEuNTg0MzA2OTgyNTY2MiwieSI6MTAxLjU0ODk1MzA0OTM1NjAzfV0= + + + SET_POS + WyJiZDkyZjlkOC03ZDYzLTQyYjItOTY4Yi0zOTA5ZWY0YzcyMzciLHsieCI6NTkxLjU4NDMwNjk4MjU2NjIsInkiOjEwMS41NDg5NTMwNDkzNTYwM30seyJ4Ijo1ODcuMTU3ODExODQ2NjU1NiwieSI6MTAwLjQ0MjMyOTI2NTM3ODM2fV0= + + f084c23a413d578cc6f24c99f4abf593 + + + 1749638421768 + + SET_POS + WyJiZDkyZjlkOC03ZDYzLTQyYjItOTY4Yi0zOTA5ZWY0YzcyMzciLHsieCI6NTkxLjU4NDMwNjk4MjU2NjIsInkiOjEwMS41NDg5NTMwNDkzNTYwM30seyJ4Ijo1OTAuNDc3NjgzMTk4NTg4NSwieSI6OTQuOTA5MjEwMzQ1NDkwMDJ9XQ== + + + SET_POS + WyJiZDkyZjlkOC03ZDYzLTQyYjItOTY4Yi0zOTA5ZWY0YzcyMzciLHsieCI6NTkwLjQ3NzY4MzE5ODU4ODUsInkiOjk0LjkwOTIxMDM0NTQ5MDAyfSx7IngiOjU5MS41ODQzMDY5ODI1NjYyLCJ5IjoxMDEuNTQ4OTUzMDQ5MzU2MDN9XQ== + + 7351dd339614af3284a27265645c683e + + + 1749638428404 + + SET_POS + WyJkYjMxYmUzZi0zZTU4LTRjMzYtOTc5ZC00MWZiMjhhZWZlNjgiLHsieCI6MzM2LjAxODYzNzA4NTU4NjY1LCJ5IjoxMDAuOTM3NjkyNDQ1MzAzNDF9LHsieCI6MzM0LjkxMjAxMzMwMTYwODk3LCJ5Ijo5MC45NzgwNzgzODk1MDQ0fV0= + + + SET_POS + WyJkYjMxYmUzZi0zZTU4LTRjMzYtOTc5ZC00MWZiMjhhZWZlNjgiLHsieCI6MzM0LjkxMjAxMzMwMTYwODk3LCJ5Ijo5MC45NzgwNzgzODk1MDQ0fSx7IngiOjMzNi4wMTg2MzcwODU1ODY2NSwieSI6MTAwLjkzNzY5MjQ0NTMwMzQxfV0= + + e6e0b6e82708953b8f2a3a53d75d7480 + + + 1749638434730 + + SET_POS + WyJkYjMxYmUzZi0zZTU4LTRjMzYtOTc5ZC00MWZiMjhhZWZlNjgiLHsieCI6MzM0LjkxMjAxMzMwMTYwODk3LCJ5Ijo5MC45NzgwNzgzODk1MDQ0fSx7IngiOjMzMi42OTg3NjU3MzM2NTM3LCJ5Ijo5NC4yOTc5NDk3NDE0Mzc0fV0= + + + SET_POS + WyJkYjMxYmUzZi0zZTU4LTRjMzYtOTc5ZC00MWZiMjhhZWZlNjgiLHsieCI6MzMyLjY5ODc2NTczMzY1MzcsInkiOjk0LjI5Nzk0OTc0MTQzNzR9LHsieCI6MzM0LjkxMjAxMzMwMTYwODk3LCJ5Ijo5MC45NzgwNzgzODk1MDQ0fV0= + + 5308877f34d57a16e83da8243a43a98f + \ No newline at end of file diff --git a/concore.py b/concore.py index a018ddf..e90c750 100644 --- a/concore.py +++ b/concore.py @@ -214,7 +214,7 @@ def write(port_identifier, name, val, delta=0): print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") except Exception as e: print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}") - return + try: if isinstance(port_identifier, str) and port_identifier in zmq_ports: file_path = os.path.join("../"+port_identifier, name) From efd7896fafabcf9a7a9ae495848c64ec6cae2c9a Mon Sep 17 00:00:00 2001 From: Saksham Gera Date: Fri, 13 Jun 2025 18:13:21 +0530 Subject: [PATCH 2/2] Implemented ZeroMQ socket timeouts & retry logic fordropped connections. --- concore.py | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/concore.py b/concore.py index e90c750..ce3ab0a 100644 --- a/concore.py +++ b/concore.py @@ -20,12 +20,38 @@ def __init__(self, port_type, address, zmq_socket_type): self.socket = self.context.socket(zmq_socket_type) self.port_type = port_type # "bind" or "connect" self.address = address + + self.socket.setsockopt(zmq.RCVTIMEO, 2000) + self.socket.setsockopt(zmq.SNDTIMEO, 2000) + self.socket.setsockopt(zmq.LINGER, 0) + if self.port_type == "bind": self.socket.bind(address) print(f"ZMQ Port bound to {address}") else: self.socket.connect(address) print(f"ZMQ Port connected to {address}") + + def send_json_with_retry(self, message): + for attempt in range(5): + try: + self.socket.send_json(message) + return + except zmq.Again: + print(f"Send timeout (attempt {attempt + 1}/5)") + time.sleep(0.5) + print("Failed to send after retries.") + return + + def recv_json_with_retry(self): + for attempt in range(5): + try: + return self.socket.recv_json() + except zmq.Again: + print(f"Receive timeout (attempt {attempt + 1}/5)") + time.sleep(0.5) + print("Failed to receive after retries.") + return None # Global ZeroMQ ports registry zmq_ports = {} @@ -143,7 +169,7 @@ def read(port_identifier, name, initstr_val): if isinstance(port_identifier, str) and port_identifier in zmq_ports: zmq_p = zmq_ports[port_identifier] try: - message = zmq_p.socket.recv_json() + message = zmq_p.recv_json_with_retry() return message except zmq.error.ZMQError as e: print(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.") @@ -209,7 +235,7 @@ def write(port_identifier, name, val, delta=0): if isinstance(port_identifier, str) and port_identifier in zmq_ports: zmq_p = zmq_ports[port_identifier] try: - zmq_p.socket.send_json(val) + zmq_p.send_json_with_retry(val) except zmq.error.ZMQError as e: print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") except Exception as e: