seaweb.py: JSONencode, SECoP requests, terminate

- make class SecopMsg json encodable
- fix handling of SECoP requests
- terminate properly on SIGTERM
This commit is contained in:
l_samenv
2024-09-27 11:02:15 +02:00
parent e71cb74391
commit 15ecaca93e

View File

@ -51,13 +51,12 @@ def guess_mimetype(filename):
# SSE 'protocol' is described here: http://mzl.la/UPFyxY
def to_json_sse(msg):
txt = json.dumps(msg, separators=(',',': '))
# print 'MSG(size='+str(len(txt))+')', txt[:256]
return 'data: %s\n\n' % txt
app = flask.Flask(__name__)
update_rider = circularlog.Rider("upd")
pollinterval = 0.1
pollinterval = 0.2
@app.route('/update')
def get_update(path=None):
@ -83,7 +82,6 @@ def get_update(path=None):
yield to_json_sse(msg)
if messages:
lastmsg = time.time()
gevent.sleep(pollinterval)
else:
if time.time() > lastmsg + 30:
if not client.info():
@ -170,7 +168,7 @@ def reply():
logging.error('%s', traceback.format_exc())
circularlog.log()
msg = dict(type='error', request=path[1:], error=repr(e))
resp = flask.Response(json.dumps(msg), mimetype='application/json')
resp = flask.Response(json.dumps(msg, cls=MyEncoder), mimetype='application/json')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@ -236,7 +234,6 @@ def sea_request_reply(socket, command, tmo=5):
t += 0.1
if t > 5:
logging.warning('unusual wait time %.4g (before command %s)', t, command)
#print(command)
socket.busy = True
socket.send_line("fulltransact "+command)
data = []
@ -379,7 +376,6 @@ class SeaInstrument(Instrument):
gobj = self.groups[path]
now = time.time()
if now < gobj.lastpoll + 0.5:
# print 'too fast', path
continue # do not poll before 500 ms have passed
gobj.lastreq = now
gobj.lastpoll = now
@ -457,9 +453,6 @@ class SeaInstrument(Instrument):
gobj.components = components
gobj.empty_values = values
gobj.version += 1
#print 'changed',path, gobj.version
#print 'FROM', gobj.components
#print 'TO ', components
if grouptitle != None:
gobj.grouptitle = grouptitle
@ -805,12 +798,10 @@ class DummyClient(SeaGraph):
msg = json.loads(line)
if msg['type'] in self.asynch:
t = 0
# print 'PUSH',msg, replytype
self.queue.append(msg)
else:
break
if t >= tmo:
# print 'TIMEOUT'
raise Exception("timeout")
gevent.sleep(0.1)
t += 0.1
@ -892,6 +883,15 @@ class SecopMsg:
value = value[:50] + '...'
return "SecopMsg('%s %s %s')" % (self.type, self.par, value)
def toJSON(self):
return json.dumps([self.type, self.par, self.value])
class MyEncoder(json.JSONEncoder):
def default(self, obj):
toJSON = getattr(obj, 'toJSON')
return toJSON() if toJSON else super().default(obj)
def SecopEncode(cmd, par=None, value=None):
line = cmd
@ -954,21 +954,19 @@ class SecNodeClient:
msg = self.syncreply.pop(0)
break
line = self.linesocket.get_line()
if line is not None:
if not line.startswith(('update', 'error_update')):
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
if self.out: self.out.write("<"+line+"\n")
msg = SecopMsg(line)
if msg.asynch: # and replytype != msg['type'] + "=" + msg.par:
t = 0
self.queue.append(msg)
else:
break
if t >= tmo:
raise Exception("timeout")
gevent.sleep(delta)
t += delta
delta += 0.001
if line is None:
if t >= tmo:
raise Exception("timeout")
gevent.sleep(delta)
t += delta
continue
if not line.startswith(('update', 'error_update')):
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
if self.out: self.out.write("<"+line+"\n")
msg = SecopMsg(line)
if not msg.asynch:
break
self.queue.append(msg)
if not replytype.startswith(msg.type):
logging.error('REPLY MISMATCH %s <> %s', replytype, repr(msg))
self.replytype = ""
@ -979,19 +977,23 @@ class SecNodeClient:
messages = self.consolequeue
self.consolequeue = []
return messages
if self.queue:
messages = convert_event(self.queue)
self.queue = []
return messages
line = self.linesocket.get_line()
if line:
messages = self.queue
self.queue = []
while 1:
line = self.linesocket.get_line()
if not line:
break
if not line.startswith(('update', 'error_update')):
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
if self.out: self.out.write("<"+line+"\n")
msg = SecopMsg(line)
if msg.asynch: # and self.replytype != msg['type'] + "=" + msg.par:
return convert_event(SecopMsg(line))
self.syncreply.append(msg)
messages.append(msg)
else:
self.syncreply.append(msg)
break
if messages:
return convert_event(messages)
return []
@ -1024,7 +1026,7 @@ class SecopClient:
else:
node = self.node_map[path]
module = node.description['modules'][path]
logging.info('MP %r %r', path)
logging.info('MP %r', path)
parameters = dict(module["accessibles"])
components = []
for name in SecopClient.skip_par:
@ -1060,9 +1062,20 @@ class SecopClient:
logging.info('SENDCOMMAND %r', command)
if not command.strip():
return dict(type='accept-command')
# cmd = "change " + command
cmd = command
return self.nodes[0].cmd_reply(cmd, 'event ' + command.split(' ')[0])
scmd = command.split(' ')
if scmd[1:2]:
module = scmd[1].split(':')[0]
node = self.node_map[module]
else:
node = self.nodes[0]
print('no module given, send command to first node', command)
if scmd[0] == 'change':
replytype = f'changed {scmd[1]}'
else:
replytype = None
# cmd = "change " + command
print(command, replytype)
return node.cmd_reply(command, replytype)
def poll(self):
messages = []
@ -1135,8 +1148,14 @@ def handle_pdb(sig, frame):
pdb.Pdb().set_trace(frame)
def handle_term(sig, _):
server.stop()
server.close()
if __name__ == '__main__':
signal.signal(signal.SIGUSR1, handle_pdb)
signal.signal(signal.SIGTERM, handle_term)
print('PID', os.getpid())
if len(sys.argv) > 3: