Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 209 additions & 67 deletions 0mq/funbody_zmq.dir/concore2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ast import literal_eval
import sys
import re
import zmq # Added for ZeroMQ

#if windows, create script to kill this process
# because batch files don't provide easy way to know pid of last command
Expand All @@ -12,102 +13,243 @@
with open("concorekill.bat","w") as fpid:
fpid.write("taskkill /F /PID "+str(os.getpid())+"\n")

try:
iport = literal_eval(open("concore.iport").read())
except:
iport = dict()
try:
oport = literal_eval(open("concore.oport").read())
except:
oport = dict()
# --- ZeroMQ Integration Start ---
class ZeroMQPort:
def __init__(self, port_type, address, zmq_socket_type):
self.context = zmq.Context()
self.socket = self.context.socket(zmq_socket_type)
self.port_type = port_type # "bind" or "connect"
self.address = address
if self.port_type == "bind":
self.socket.bind(address)
print(f"ZMQ Port bound to {address}")
else:
self.socket.connect(address)
print(f"ZMQ Port connected to {address}")

# Global ZeroMQ ports registry
zmq_ports = {}

def init_zmq_port(port_name, port_type, address, socket_type_str):
"""
Initializes and registers a ZeroMQ port.
port_name (str): A unique name for this ZMQ port.
port_type (str): "bind" or "connect".
address (str): The ZMQ address (e.g., "tcp://*:5555", "tcp://localhost:5555").
socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB").
"""
if port_name in zmq_ports:
print(f"ZMQ Port {port_name} already initialized.")
return # Avoid reinitialization

try:
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
zmq_socket_type = getattr(zmq, socket_type_str.upper())
zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
print(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
except AttributeError:
print(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
except zmq.error.ZMQError as e:
print(f"Error initializing ZMQ port {port_name} on {address}: {e}")
except Exception as e:
print(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

# --- ZeroMQ Integration End ---

def safe_literal_eval(filename, defaultValue):
try:
with open(filename, "r") as file:
return literal_eval(file.read())
except (FileNotFoundError, SyntaxError, ValueError, Exception) as e:
# Keep print for debugging, but can be made quieter
# print(f"Info: Error reading {filename} or file not found, using default: {e}")
return defaultValue

iport = safe_literal_eval("concore.iport", {})
oport = safe_literal_eval("concore.oport", {})

s = ''
olds = ''
delay = 1
retrycount = 0
inpath = "./in" #must be rel path for local
outpath = "./out"
simtime = 0

#9/21/22
try:
sparams = open(inpath+"1/concore.params").read()
if sparams[0] == '"': #windows keeps "" need to remove
sparams = sparams[1:]
sparams = sparams[0:sparams.find('"')]
if sparams != '{':
print("converting sparams: "+sparams)
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
print("converted sparams: " + sparams)
try:
params = literal_eval(sparams)
except:
print("bad params: "+sparams)
except:
sparams_path = os.path.join(inpath + "1", "concore.params")
if os.path.exists(sparams_path):
with open(sparams_path, "r") as f:
sparams = f.read()
if sparams: # Ensure sparams is not empty
if sparams[0] == '"' and sparams[-1] == '"': #windows keeps "" need to remove
sparams = sparams[1:-1]
if sparams != '{' and not (sparams.startswith('{') and sparams.endswith('}')): # Check if it needs conversion
print("converting sparams: "+sparams)
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
print("converted sparams: " + sparams)
try:
params = literal_eval(sparams)
except Exception as e:
print(f"bad params content: {sparams}, error: {e}")
params = dict()
else:
params = dict()
else:
params = dict()
except Exception as e:
# print(f"Info: concore.params not found or error reading, using empty dict: {e}")
params = dict()

#9/30/22
def tryparam(n,i):
try:
return params[n]
except:
return i
def tryparam(n, i):
return params.get(n, i)


#9/12/21
def default_maxtime(default):
global maxtime
try:
maxtime = literal_eval(open(inpath+"1/concore.maxtime").read())
except:
maxtime = default
maxtime_path = os.path.join(inpath + "1", "concore.maxtime")
maxtime = safe_literal_eval(maxtime_path, default)

default_maxtime(100)

def unchanged():
global olds,s
if olds==s:
global olds, s
if olds == s:
s = ''
return True
else:
olds = s
return False
olds = s
return False

def read(port_identifier, name, initstr_val):
global s, simtime, retrycount

default_return_val = initstr_val
if isinstance(initstr_val, str):
try:
default_return_val = literal_eval(initstr_val)
except (SyntaxError, ValueError):
pass

if isinstance(port_identifier, str) and port_identifier in zmq_ports:
zmq_p = zmq_ports[port_identifier]
try:
message = zmq_p.socket.recv_json()
return message
except zmq.error.ZMQError as e:
print(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
except Exception as e:
print(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val

try:
file_port_num = int(port_identifier)
except ValueError:
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return default_return_val

time.sleep(delay)
file_path = os.path.join(inpath+str(file_port_num), name)
ins = ""

def read(port, name, initstr):
global s,simtime,retrycount
time.sleep(delay)
try:
infile = open(inpath+str(port)+"/"+name);
ins = infile.read()
except:
ins = initstr
while len(ins)==0:
with open(file_path, "r") as infile:
ins = infile.read()
except FileNotFoundError:
ins = str(initstr_val)
except Exception as e:
print(f"Error reading {file_path}: {e}. Using default value.")
return default_return_val

attempts = 0
max_retries = 5
while len(ins) == 0 and attempts < max_retries:
time.sleep(delay)
ins = infile.read()
try:
with open(file_path, "r") as infile:
ins = infile.read()
except Exception as e:
print(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
attempts += 1
retrycount += 1
s += ins
inval = literal_eval(ins)
simtime = max(simtime,inval[0])
return inval[1:]

def write(port, name, val, delta=0):
global outpath,simtime
if isinstance(val,str):
time.sleep(2*delay)
elif isinstance(val,list)==False:
print("mywrite must have list or str")
quit()

if len(ins) == 0:
print(f"Max retries reached for {file_path}, using default value.")
return default_return_val

s += ins
try:
with open(outpath+str(port)+"/"+name,"w") as outfile:
if isinstance(val,list):
outfile.write(str([simtime+delta]+val))
simtime += delta
else:
inval = literal_eval(ins)
if isinstance(inval, list) and len(inval) > 0:
current_simtime_from_file = inval[0]
if isinstance(current_simtime_from_file, (int, float)):
simtime = max(simtime, current_simtime_from_file)
return inval[1:]
else:
print(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
return inval
except Exception as e:
print(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
return default_return_val


def write(port_identifier, name, val, delta=0):
global simtime

if isinstance(port_identifier, str) and port_identifier in zmq_ports:
zmq_p = zmq_ports[port_identifier]
try:
zmq_p.socket.send_json(val)
except zmq.error.ZMQError as e:
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
return

try:
file_port_num = int(port_identifier)
except ValueError:
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return

file_path = os.path.join(outpath+str(file_port_num), name)

if isinstance(val, str):
time.sleep(2 * delay)
elif not isinstance(val, list):
print(f"File write to {file_path} must have list or str value, got {type(val)}")
return

try:
with open(file_path, "w") as outfile:
if isinstance(val, list):
data_to_write = [simtime + delta] + val
outfile.write(str(data_to_write))
simtime += delta
else:
outfile.write(val)
except:
print("skipping"+outpath+str(port)+"/"+name);
except Exception as e:
print(f"Error writing to {file_path}: {e}")

def initval(simtime_val):
def initval(simtime_val_str):
global simtime
val = literal_eval(simtime_val)
simtime = val[0]
return val[1:]
try:
val = literal_eval(simtime_val_str)
if isinstance(val, list) and len(val) > 0:
first_element = val[0]
if isinstance(first_element, (int, float)):
simtime = first_element
return val[1:]
else:
print(f"Error: First element in initval string '{simtime_val_str}' is not a number. Using data part as is or empty.")
return val[1:] if len(val) > 1 else []
else:
print(f"Error: initval string '{simtime_val_str}' is not a list or is empty. Returning empty list.")
return []

except Exception as e:
print(f"Error parsing simtime_val_str '{simtime_val_str}': {e}. Returning empty list.")
return []
2 changes: 1 addition & 1 deletion 0mq/funbody_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:2346")
socket.bind("tcp://*:2356")

concore.delay = 0.07
concore2.delay = 0.07
Expand Down
Loading