Files
seweb/webserver.py
T
zolliker 1201801170 break busy loop blocking the server
- Use gevent.sleep with a tiny value after a message has been
  received instead of no call to sleep at all. This was the
  reason for the webserver to block when used with prep8
  and the leiden dil with a lot of data
- move gevent.monkey.patch_all() to secop-webserver
- properly close clients
2026-05-04 13:19:33 +02:00

471 lines
16 KiB
Python

# gevent.monkey.patch_all() must be called in the main program!
import sys
import time
import signal
import socket
import traceback
import logging
import json
import gevent
import gevent.pywsgi
import gevent.queue
import flask
import circularlog
instruments = {ins: 8642 for ins in
['amor', 'boa', 'camea', 'dmc', 'eiger', 'focus', 'hrpt', 'sans', 'tasp', 'zebra']
}
def guess_mimetype(filename):
if filename.endswith('.js'):
mimetype = 'text/javascript'
elif filename.endswith('.css'):
mimetype = 'text/css'
elif filename.endswith('.ico'):
mimetype = 'image/x-icon'
elif filename.endswith(".png"):
mimetype = "image/png"
else:
mimetype = 'text/html'
return mimetype
class MyEncoder(json.JSONEncoder):
def default(self, obj):
try:
return super().default(obj)
except TypeError:
return int(obj) # try to convert SECoP Enum
# SSE 'protocol' is described here: https://bit.ly/UPFyxY
def to_json_sse(msg):
txt = json.dumps(msg, separators=(',', ': '), cls=MyEncoder)
logging.debug('data: %s', txt)
return 'data: %s\n\n' % txt
class Server:
"""singleton: created once in this module"""
interactor_classes = None
client_cls = None
history_cls = None
history = None
single_instrument = None
db = None
def __init__(self):
self.instruments = {}
self.clients = {}
def remove(self, client):
try:
try:
self.clients[client.id].close()
except AttributeError:
pass
del self.clients[client.id]
except KeyError:
logging.warning('client already removed %s', client.id)
def lookup_streams(self, instrument, stream=None, device=None):
if self.single_instrument:
instrument = self.single_instrument
if stream:
if isinstance(stream, str):
streams = stream.split(',') if stream else []
else:
streams = stream
else:
streams = []
device_names = devices = device.split(',') if device else []
tags = {}
if instrument:
# tags['instrument'] = instrument
stream_dict = self.db.get_streams(instrument, stream=list(streams), device=devices)
streams.extend((s for s in stream_dict if s not in streams))
if not devices:
device_names = list(filter(None, (t.get('device') for t in stream_dict.values())))
if streams:
tags['stream'] = streams[0] if len(streams) == 1 else streams
if devices:
tags['device'] = devices[0] if len(devices) == 1 else devices
return streams, tags, ','.join(device_names)
def register_client(self, instrument=None, stream=None, device=None, history_only=None):
streams, tags, device_name = self.lookup_streams(instrument, stream, device)
if (history_only or '0') != '0':
# create dummy client
client = self.client_cls(self, [], '', '')
else:
client = self.client_cls(self, streams, instrument or '', device_name)
history = self.history_cls(self, instrument, device_name, tags)
# history.db.debug = True
# all relevant methods of the history instance are saved in client.handlers
# so there is no reference needed to history anymore
client.handlers.update(history.handlers)
self.clients[client.id] = client
return client
def run(self, port, db, history_cls, client_cls, single_instrument=None, **interactor_classes):
self.single_instrument = single_instrument
self.db = db
self.history_cls = history_cls
self.client_cls = client_cls
self.interactor_classes = interactor_classes
app.debug = True
logging.basicConfig(filename=f'logfile{port}.log', filemode='w', level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s', force=True)
# srv = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt')
srv = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server'))
def handle_term(sig, frame):
srv.stop()
srv.close()
signal.signal(signal.SIGTERM, handle_term)
# def handle_pdb(sig, frame):
# import pdb
# print('PDB')
# pdb.Pdb().set_trace(frame)
# signal.signal(signal.SIGUSR1, handle_pdb)
srv.serve_forever()
server = Server()
app = flask.Flask(__name__)
update_rider = circularlog.Rider("upd")
pollinterval = 0.2
@app.route('/update')
def get_update(_=None):
# Client Adress: socket.getfqdn(flask.request.remote_addr)
kwargs = {k: flask.request.values.get(k) for k in ('instrument', 'stream', 'device', 'history_only')}
client = server.register_client(**kwargs)
client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1])
@flask.stream_with_context
def generator():
logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1]))
# msg = dict(type='id', id=client.id, title=instrument.title);
# yield to_json_sse(msg)
msg = dict(type='id', id=client.id,
instrument=kwargs.get('instrument') or server.single_instrument or 'n_a',
device=client.device_name)
yield to_json_sse(msg)
try:
lastmsg = time.time()
while True:
if client.info() == "":
print(time.time()-lastmsg)
messages = client.poll()
for msg in messages:
update_rider.put('-', repr(msg))
yield to_json_sse(msg)
delay = pollinterval
if messages:
lastmsg = time.time()
# as we had messages, we might try again immediately
delay = 0.001
else:
if time.time() > lastmsg + 30:
if not client.info():
raise GeneratorExit("no activity")
logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info()))
yield to_json_sse(dict(type='heartbeat'))
lastmsg = time.time()
# we need to specify a value > 0 to give other greenlets a chance to continue
gevent.sleep(delay)
except GeneratorExit as e:
logging.info("except clause %r", repr(e))
logging.info('CLOSED %s', client.id)
print('CLOSE client')
server.remove(client)
except Exception as e:
logging.info('error')
logging.error('%s', traceback.format_exc())
server.remove(client)
# msg = dict(type='error',error=traceback.format_exc())
# yield to_json_sse(msg)
resp = flask.Response(generator(), mimetype='text/event-stream')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/circular')
def dump_circular():
circularlog.log()
return "log"
@app.route('/clients')
def show_clients():
result = ""
for id in server.clients:
c = server.clients[id]
result += c.remote_info + " " + "; ".join(c.info()) + "<br>"
return result
@app.route('/export')
def export():
args = flask.request.args
kwargs = dict((k, args.get(k)) for k in args)
path = flask.request.path
logging.info('GET %s %s', path, repr(kwargs))
try:
id = kwargs.pop('id')
client = server.clients[id]
bytes = client.handlers['export'](**kwargs)
return flask.send_file(
bytes,
as_attachment=True,
download_name='export.tsv',
mimetype='text/tab-separated-values'
)
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
msg = dict(type='error', request=path[1:], error=repr(e))
logging.error('MSG: %r', msg)
resp = flask.Response(json.dumps(msg), mimetype='application/json')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/getblock')
@app.route('/updateblock')
@app.route('/sendcommand')
@app.route('/console')
@app.route('/graph')
@app.route('/updategraph')
@app.route('/gettime')
@app.route('/getvars', methods=["GET", "POST"])
def reply():
args = flask.request.values
kwargs = dict((k, args.get(k)) for k in args)
path = flask.request.path
logging.info('GET %s %r', path, kwargs)
try:
id = kwargs.pop('id')
client = server.clients[id]
msg = client.handlers[path[1:]](**kwargs)
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
msg = dict(type='error', request=path[1:], error=repr(e))
jsonmsg = json.dumps(msg)
if len(jsonmsg) < 120:
logging.info('REPLY %s %s', path, jsonmsg)
else:
logging.info('REPLY %s %s...', path, jsonmsg[:80])
logging.debug('REPLY %s %r', path, jsonmsg)
resp = flask.Response(jsonmsg, mimetype='application/json')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/test/<file>')
def subdir_test_file(file):
gevent.sleep(2)
resp = flask.send_file("client/test/"+file, mimetype=guess_mimetype(file))
return resp
@app.route('/components/curves_settings_popup/color_selector/<file>')
@app.route('/components/curves_settings_popup/<file>')
@app.route('/components/action_entry/<file>')
@app.route('/components/export_popup/<file>')
@app.route('/components/dates_popup/<file>')
@app.route('/components/menu_popup/<file>')
@app.route('/components/help_popup/<file>')
@app.route('/components/help_entry/<file>')
@app.route('/components/control/<file>')
@app.route('/components/divider/<file>')
@app.route('/components/states_indicator/dates/<file>')
@app.route('/res/<file>')
@app.route('/jsFiles/<file>')
@app.route('/cssFiles/<file>')
@app.route('/externalFiles/<file>')
def subdir_file(file):
subdir = "/".join(flask.request.path.split("/")[1:-1])
resp = flask.send_file("client/" + subdir+"/"+file, mimetype=guess_mimetype(file))
# resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';"
return resp
@app.route('/externalFiles/maps/<file>.map')
def replace_by_empty(file):
return ""
@app.route('/')
def default():
if not any(flask.request.values.get(k) for k in ('instrument', 'stream', 'device')):
if not server.single_instrument:
return select_experiment()
return general_file('SEAWebClient.html')
#@app.route('/select_instrument')
#def select_instrument():
# out = ['''<html><body><table>
#<style>
#th {
# text-align: left;
#}
#</style>
#<tr><th>instrument</th><th colspan=99>devices</th></tr>''']
# result = {}
# for stream, tags in server.db.get_streams().items():
# ins = tags.get('instrument', '0')
# result.setdefault(ins, []).append((stream, tags.get('device')))
# bare_streams = result.pop('0', [])
# for ins, streams in result.items():
# out.append(f'<tr><td><a href="/?ins={ins}">{ins}</a></td>')
# out.extend(f'<td>{d or s}</td>' for s, d in streams)
# out.append('</tr>')
# for stream, device in bare_streams:
# out.append(f'<tr><td><a href="/?srv={stream}">{stream}</a></td><td>{device}</td><tr>')
# out.append('</table>')
# out.append('<h3>servers on the instruments:</h3>')
# out.extend([f"<a href='http://{i.lower()}.psi.ch:8642/'>{i}</a>&nbsp;\n" for i in instlist])
# out.extend(['</body></html>', ''])
# return '\n'.join(out)
@app.route('/select_experiment')
def select_experiment():
out = ['''<html><head>
<meta name="viewport"
content="width=device-width, initial-scale=1.0, minimum-scale=1.0, maximum-scale=1.0, user-scalable=no">
<style>
th {
text-align: left;
}
a {
text-decoration: none;
}
</style></head>
<body><table>
''']
ONEMONTH = 30 * 24 * 3600
out.append('<br><i>direct link to instruments:</i><br>')
out.extend([f'<a href="http://{ins}.psi.ch:{port}/">{ins.upper()}</a>&nbsp;\n'
for ins, port in instruments.items()])
if server.db.has_local:
out.append('<h3><a href="http://linse-c.psi.ch:8888/">linse-c (central)</a></h3>')
class prev: # just a namesapce
title = None
legend = None
def change_title(text):
if text == prev.title:
return False
if prev.legend:
out.append(f'<tr>{prev.legend}</tr>')
prev.legend = None
prev.title = text
out.append(f'<tr><td colspan=3><br><i>{text}:</i></td></tr>')
return True
# TODO: sort this by (instrument / device) and list dates
# period format: Ymd..Ymd, Ymd (single date), Ymd..now, HM..now
try:
now = time.time()
timerange = flask.request.values.get('time')
if timerange == 'all':
starttime, endtime = None, None
elif timerange:
timerange = timerange.split(',')
starttime, endtime = [None if timerange[i] == '0' else int(timerange[i]) for i in (0, -1)]
else:
starttime, endtime = now - ONEMONTH, now
chunk_list = []
for key, chunk_dict in server.db.get_experiments(starttime, endtime).items():
for (streams, devices), chunks in chunk_dict.items():
chunk_list.extend((r[1], r[0], key, devices) for r in chunks)
chunk_list.sort(reverse=True)
for end, beg, key, devices in chunk_list:
today, begdate, enddate = (time.strftime("%Y-%m-%d", time.localtime(t)) for t in (now, beg, end))
port = None
if key[0] == 'instrument':
ins = key[1]
port = instruments.get(ins)
left = ins.upper()
else:
left = key[1] # shown in left column
args = ['='.join(key)]
remote = None if port is None else f'http://{ins}.psi.ch:{port}'
history_only = bool(remote)
if end > now:
if begdate == today:
daterange = f'since {time.strftime("%H:%M", time.localtime(beg))}'
else:
daterange = f'since {begdate}'
change_title('currently running')
else:
args.append(f'time={beg},{end}')
history_only = True
remote = None
daterange = begdate if begdate == enddate else f'{begdate}...{enddate}'
if end > now - ONEMONTH:
change_title('recently running (history graphics only)')
else:
change_title('older than 30 days')
if history_only:
args.append('hr=1')
def link(label):
return f'<a href="/?{"&".join(args)}">{label}</a>'
label = " ".join(devices)
if remote:
prev.legend = '<td></td><td></td><td colspan=2>linse-c*: <i>history graphics only</i></td>'
out.append(f'<tr><td><a href="{remote}?instrument={ins}">{ins.upper()}</a></td>'
f'<td>{label}</td><td>{link("linse-c*")}</td>')
else:
out.append(f'<tr><td>{link(left)}</td><td colspan=2>{label}</td>')
out.append(f'<td>{daterange}</td></tr>')
if timerange:
out.append(f'<h3><a href="/select_experiment?time=all">earlier dates</a></h3><br>')
out.append('</table>')
out.extend(['</body></html>', ''])
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
out = [f"ERROR {e!r}"]
return '\n'.join(out)
@app.route('/<file>')
def general_file(file):
subdir = "client/"
try:
resp = flask.send_file(subdir+file, mimetype=guess_mimetype(file))
except FileNotFoundError:
logging.warning('file %s not found', file)
return 'file not found'
# resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';"
return resp
def hostport_split(hostport):
h = hostport.split(':')
return (h[0], int(h[1]))