Skip to content

Commit b8d764f

Browse files
authored
Merge pull request #156 from saksham-gera/dev
Integrated the ZeroMQ with concore.py, I've tested the previous studies(With only file based communication) are working fine even after integrating the ZeroMQ with concore.py
2 parents 54221b7 + 5f64e20 commit b8d764f

File tree

10 files changed

+1703
-178
lines changed

10 files changed

+1703
-178
lines changed

0mq/funbody_zmq.dir/concore2.py

Lines changed: 209 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from ast import literal_eval
44
import sys
55
import re
6+
import zmq # Added for ZeroMQ
67

78
#if windows, create script to kill this process
89
# because batch files don't provide easy way to know pid of last command
@@ -12,102 +13,243 @@
1213
with open("concorekill.bat","w") as fpid:
1314
fpid.write("taskkill /F /PID "+str(os.getpid())+"\n")
1415

15-
try:
16-
iport = literal_eval(open("concore.iport").read())
17-
except:
18-
iport = dict()
19-
try:
20-
oport = literal_eval(open("concore.oport").read())
21-
except:
22-
oport = dict()
16+
# --- ZeroMQ Integration Start ---
17+
class ZeroMQPort:
18+
def __init__(self, port_type, address, zmq_socket_type):
19+
self.context = zmq.Context()
20+
self.socket = self.context.socket(zmq_socket_type)
21+
self.port_type = port_type # "bind" or "connect"
22+
self.address = address
23+
if self.port_type == "bind":
24+
self.socket.bind(address)
25+
print(f"ZMQ Port bound to {address}")
26+
else:
27+
self.socket.connect(address)
28+
print(f"ZMQ Port connected to {address}")
29+
30+
# Global ZeroMQ ports registry
31+
zmq_ports = {}
32+
33+
def init_zmq_port(port_name, port_type, address, socket_type_str):
34+
"""
35+
Initializes and registers a ZeroMQ port.
36+
port_name (str): A unique name for this ZMQ port.
37+
port_type (str): "bind" or "connect".
38+
address (str): The ZMQ address (e.g., "tcp://*:5555", "tcp://localhost:5555").
39+
socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB").
40+
"""
41+
if port_name in zmq_ports:
42+
print(f"ZMQ Port {port_name} already initialized.")
43+
return # Avoid reinitialization
44+
45+
try:
46+
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
47+
zmq_socket_type = getattr(zmq, socket_type_str.upper())
48+
zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
49+
print(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
50+
except AttributeError:
51+
print(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
52+
except zmq.error.ZMQError as e:
53+
print(f"Error initializing ZMQ port {port_name} on {address}: {e}")
54+
except Exception as e:
55+
print(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
2356

57+
# --- ZeroMQ Integration End ---
58+
59+
def safe_literal_eval(filename, defaultValue):
60+
try:
61+
with open(filename, "r") as file:
62+
return literal_eval(file.read())
63+
except (FileNotFoundError, SyntaxError, ValueError, Exception) as e:
64+
# Keep print for debugging, but can be made quieter
65+
# print(f"Info: Error reading {filename} or file not found, using default: {e}")
66+
return defaultValue
67+
68+
iport = safe_literal_eval("concore.iport", {})
69+
oport = safe_literal_eval("concore.oport", {})
2470

2571
s = ''
2672
olds = ''
2773
delay = 1
2874
retrycount = 0
2975
inpath = "./in" #must be rel path for local
3076
outpath = "./out"
77+
simtime = 0
3178

3279
#9/21/22
3380
try:
34-
sparams = open(inpath+"1/concore.params").read()
35-
if sparams[0] == '"': #windows keeps "" need to remove
36-
sparams = sparams[1:]
37-
sparams = sparams[0:sparams.find('"')]
38-
if sparams != '{':
39-
print("converting sparams: "+sparams)
40-
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
41-
print("converted sparams: " + sparams)
42-
try:
43-
params = literal_eval(sparams)
44-
except:
45-
print("bad params: "+sparams)
46-
except:
81+
sparams_path = os.path.join(inpath + "1", "concore.params")
82+
if os.path.exists(sparams_path):
83+
with open(sparams_path, "r") as f:
84+
sparams = f.read()
85+
if sparams: # Ensure sparams is not empty
86+
if sparams[0] == '"' and sparams[-1] == '"': #windows keeps "" need to remove
87+
sparams = sparams[1:-1]
88+
if sparams != '{' and not (sparams.startswith('{') and sparams.endswith('}')): # Check if it needs conversion
89+
print("converting sparams: "+sparams)
90+
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
91+
print("converted sparams: " + sparams)
92+
try:
93+
params = literal_eval(sparams)
94+
except Exception as e:
95+
print(f"bad params content: {sparams}, error: {e}")
96+
params = dict()
97+
else:
98+
params = dict()
99+
else:
100+
params = dict()
101+
except Exception as e:
102+
# print(f"Info: concore.params not found or error reading, using empty dict: {e}")
47103
params = dict()
104+
48105
#9/30/22
49-
def tryparam(n,i):
50-
try:
51-
return params[n]
52-
except:
53-
return i
106+
def tryparam(n, i):
107+
return params.get(n, i)
54108

55109

56110
#9/12/21
57111
def default_maxtime(default):
58112
global maxtime
59-
try:
60-
maxtime = literal_eval(open(inpath+"1/concore.maxtime").read())
61-
except:
62-
maxtime = default
113+
maxtime_path = os.path.join(inpath + "1", "concore.maxtime")
114+
maxtime = safe_literal_eval(maxtime_path, default)
115+
63116
default_maxtime(100)
64117

65118
def unchanged():
66-
global olds,s
67-
if olds==s:
119+
global olds, s
120+
if olds == s:
68121
s = ''
69122
return True
70-
else:
71-
olds = s
72-
return False
123+
olds = s
124+
return False
125+
126+
def read(port_identifier, name, initstr_val):
127+
global s, simtime, retrycount
128+
129+
default_return_val = initstr_val
130+
if isinstance(initstr_val, str):
131+
try:
132+
default_return_val = literal_eval(initstr_val)
133+
except (SyntaxError, ValueError):
134+
pass
135+
136+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
137+
zmq_p = zmq_ports[port_identifier]
138+
try:
139+
message = zmq_p.socket.recv_json()
140+
return message
141+
except zmq.error.ZMQError as e:
142+
print(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
143+
return default_return_val
144+
except Exception as e:
145+
print(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
146+
return default_return_val
147+
148+
try:
149+
file_port_num = int(port_identifier)
150+
except ValueError:
151+
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
152+
return default_return_val
153+
154+
time.sleep(delay)
155+
file_path = os.path.join(inpath+str(file_port_num), name)
156+
ins = ""
73157

74-
def read(port, name, initstr):
75-
global s,simtime,retrycount
76-
time.sleep(delay)
77158
try:
78-
infile = open(inpath+str(port)+"/"+name);
79-
ins = infile.read()
80-
except:
81-
ins = initstr
82-
while len(ins)==0:
159+
with open(file_path, "r") as infile:
160+
ins = infile.read()
161+
except FileNotFoundError:
162+
ins = str(initstr_val)
163+
except Exception as e:
164+
print(f"Error reading {file_path}: {e}. Using default value.")
165+
return default_return_val
166+
167+
attempts = 0
168+
max_retries = 5
169+
while len(ins) == 0 and attempts < max_retries:
83170
time.sleep(delay)
84-
ins = infile.read()
171+
try:
172+
with open(file_path, "r") as infile:
173+
ins = infile.read()
174+
except Exception as e:
175+
print(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
176+
attempts += 1
85177
retrycount += 1
86-
s += ins
87-
inval = literal_eval(ins)
88-
simtime = max(simtime,inval[0])
89-
return inval[1:]
90-
91-
def write(port, name, val, delta=0):
92-
global outpath,simtime
93-
if isinstance(val,str):
94-
time.sleep(2*delay)
95-
elif isinstance(val,list)==False:
96-
print("mywrite must have list or str")
97-
quit()
178+
179+
if len(ins) == 0:
180+
print(f"Max retries reached for {file_path}, using default value.")
181+
return default_return_val
182+
183+
s += ins
98184
try:
99-
with open(outpath+str(port)+"/"+name,"w") as outfile:
100-
if isinstance(val,list):
101-
outfile.write(str([simtime+delta]+val))
102-
simtime += delta
103-
else:
185+
inval = literal_eval(ins)
186+
if isinstance(inval, list) and len(inval) > 0:
187+
current_simtime_from_file = inval[0]
188+
if isinstance(current_simtime_from_file, (int, float)):
189+
simtime = max(simtime, current_simtime_from_file)
190+
return inval[1:]
191+
else:
192+
print(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
193+
return inval
194+
except Exception as e:
195+
print(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
196+
return default_return_val
197+
198+
199+
def write(port_identifier, name, val, delta=0):
200+
global simtime
201+
202+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
203+
zmq_p = zmq_ports[port_identifier]
204+
try:
205+
zmq_p.socket.send_json(val)
206+
except zmq.error.ZMQError as e:
207+
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
208+
except Exception as e:
209+
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
210+
return
211+
212+
try:
213+
file_port_num = int(port_identifier)
214+
except ValueError:
215+
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
216+
return
217+
218+
file_path = os.path.join(outpath+str(file_port_num), name)
219+
220+
if isinstance(val, str):
221+
time.sleep(2 * delay)
222+
elif not isinstance(val, list):
223+
print(f"File write to {file_path} must have list or str value, got {type(val)}")
224+
return
225+
226+
try:
227+
with open(file_path, "w") as outfile:
228+
if isinstance(val, list):
229+
data_to_write = [simtime + delta] + val
230+
outfile.write(str(data_to_write))
231+
simtime += delta
232+
else:
104233
outfile.write(val)
105-
except:
106-
print("skipping"+outpath+str(port)+"/"+name);
234+
except Exception as e:
235+
print(f"Error writing to {file_path}: {e}")
107236

108-
def initval(simtime_val):
237+
def initval(simtime_val_str):
109238
global simtime
110-
val = literal_eval(simtime_val)
111-
simtime = val[0]
112-
return val[1:]
239+
try:
240+
val = literal_eval(simtime_val_str)
241+
if isinstance(val, list) and len(val) > 0:
242+
first_element = val[0]
243+
if isinstance(first_element, (int, float)):
244+
simtime = first_element
245+
return val[1:]
246+
else:
247+
print(f"Error: First element in initval string '{simtime_val_str}' is not a number. Using data part as is or empty.")
248+
return val[1:] if len(val) > 1 else []
249+
else:
250+
print(f"Error: initval string '{simtime_val_str}' is not a list or is empty. Returning empty list.")
251+
return []
113252

253+
except Exception as e:
254+
print(f"Error parsing simtime_val_str '{simtime_val_str}': {e}. Returning empty list.")
255+
return []

0mq/funbody_zmq.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
context = zmq.Context()
1010
socket = context.socket(zmq.REP)
11-
socket.bind("tcp://*:2346")
11+
socket.bind("tcp://*:2356")
1212

1313
concore.delay = 0.07
1414
concore2.delay = 0.07

0 commit comments

Comments
 (0)