diff --git a/seaweb.py b/seaweb.py index aa6023d..c608a90 100755 --- a/seaweb.py +++ b/seaweb.py @@ -51,13 +51,12 @@ def guess_mimetype(filename): # SSE 'protocol' is described here: http://mzl.la/UPFyxY def to_json_sse(msg): txt = json.dumps(msg, separators=(',',': ')) - # print 'MSG(size='+str(len(txt))+')', txt[:256] return 'data: %s\n\n' % txt app = flask.Flask(__name__) update_rider = circularlog.Rider("upd") -pollinterval = 0.1 +pollinterval = 0.2 @app.route('/update') def get_update(path=None): @@ -83,7 +82,6 @@ def get_update(path=None): yield to_json_sse(msg) if messages: lastmsg = time.time() - gevent.sleep(pollinterval) else: if time.time() > lastmsg + 30: if not client.info(): @@ -170,7 +168,7 @@ def reply(): logging.error('%s', traceback.format_exc()) circularlog.log() msg = dict(type='error', request=path[1:], error=repr(e)) - resp = flask.Response(json.dumps(msg), mimetype='application/json') + resp = flask.Response(json.dumps(msg, cls=MyEncoder), mimetype='application/json') resp.headers['Access-Control-Allow-Origin'] = '*' return resp @@ -236,7 +234,6 @@ def sea_request_reply(socket, command, tmo=5): t += 0.1 if t > 5: logging.warning('unusual wait time %.4g (before command %s)', t, command) - #print(command) socket.busy = True socket.send_line("fulltransact "+command) data = [] @@ -379,7 +376,6 @@ class SeaInstrument(Instrument): gobj = self.groups[path] now = time.time() if now < gobj.lastpoll + 0.5: - # print 'too fast', path continue # do not poll before 500 ms have passed gobj.lastreq = now gobj.lastpoll = now @@ -457,9 +453,6 @@ class SeaInstrument(Instrument): gobj.components = components gobj.empty_values = values gobj.version += 1 - #print 'changed',path, gobj.version - #print 'FROM', gobj.components - #print 'TO ', components if grouptitle != None: gobj.grouptitle = grouptitle @@ -805,12 +798,10 @@ class DummyClient(SeaGraph): msg = json.loads(line) if msg['type'] in self.asynch: t = 0 - # print 'PUSH',msg, replytype self.queue.append(msg) else: break if t >= tmo: - # print 'TIMEOUT' raise Exception("timeout") gevent.sleep(0.1) t += 0.1 @@ -892,6 +883,15 @@ class SecopMsg: value = value[:50] + '...' return "SecopMsg('%s %s %s')" % (self.type, self.par, value) + def toJSON(self): + return json.dumps([self.type, self.par, self.value]) + + +class MyEncoder(json.JSONEncoder): + def default(self, obj): + toJSON = getattr(obj, 'toJSON') + return toJSON() if toJSON else super().default(obj) + def SecopEncode(cmd, par=None, value=None): line = cmd @@ -954,21 +954,19 @@ class SecNodeClient: msg = self.syncreply.pop(0) break line = self.linesocket.get_line() - if line is not None: - if not line.startswith(('update', 'error_update')): - self.consolequeue.append(dict(type='reply',line=line,origin='other')) - if self.out: self.out.write("<"+line+"\n") - msg = SecopMsg(line) - if msg.asynch: # and replytype != msg['type'] + "=" + msg.par: - t = 0 - self.queue.append(msg) - else: - break - if t >= tmo: - raise Exception("timeout") - gevent.sleep(delta) - t += delta - delta += 0.001 + if line is None: + if t >= tmo: + raise Exception("timeout") + gevent.sleep(delta) + t += delta + continue + if not line.startswith(('update', 'error_update')): + self.consolequeue.append(dict(type='reply',line=line,origin='other')) + if self.out: self.out.write("<"+line+"\n") + msg = SecopMsg(line) + if not msg.asynch: + break + self.queue.append(msg) if not replytype.startswith(msg.type): logging.error('REPLY MISMATCH %s <> %s', replytype, repr(msg)) self.replytype = "" @@ -979,19 +977,23 @@ class SecNodeClient: messages = self.consolequeue self.consolequeue = [] return messages - if self.queue: - messages = convert_event(self.queue) - self.queue = [] - return messages - line = self.linesocket.get_line() - if line: + messages = self.queue + self.queue = [] + while 1: + line = self.linesocket.get_line() + if not line: + break if not line.startswith(('update', 'error_update')): self.consolequeue.append(dict(type='reply',line=line,origin='other')) if self.out: self.out.write("<"+line+"\n") msg = SecopMsg(line) if msg.asynch: # and self.replytype != msg['type'] + "=" + msg.par: - return convert_event(SecopMsg(line)) - self.syncreply.append(msg) + messages.append(msg) + else: + self.syncreply.append(msg) + break + if messages: + return convert_event(messages) return [] @@ -1024,7 +1026,7 @@ class SecopClient: else: node = self.node_map[path] module = node.description['modules'][path] - logging.info('MP %r %r', path) + logging.info('MP %r', path) parameters = dict(module["accessibles"]) components = [] for name in SecopClient.skip_par: @@ -1060,9 +1062,20 @@ class SecopClient: logging.info('SENDCOMMAND %r', command) if not command.strip(): return dict(type='accept-command') - # cmd = "change " + command - cmd = command - return self.nodes[0].cmd_reply(cmd, 'event ' + command.split(' ')[0]) + scmd = command.split(' ') + if scmd[1:2]: + module = scmd[1].split(':')[0] + node = self.node_map[module] + else: + node = self.nodes[0] + print('no module given, send command to first node', command) + if scmd[0] == 'change': + replytype = f'changed {scmd[1]}' + else: + replytype = None + # cmd = "change " + command + print(command, replytype) + return node.cmd_reply(command, replytype) def poll(self): messages = [] @@ -1135,8 +1148,14 @@ def handle_pdb(sig, frame): pdb.Pdb().set_trace(frame) +def handle_term(sig, _): + server.stop() + server.close() + + if __name__ == '__main__': signal.signal(signal.SIGUSR1, handle_pdb) + signal.signal(signal.SIGTERM, handle_term) print('PID', os.getpid()) if len(sys.argv) > 3: